После того, как я написал статью про паттерн CQRS, мне захотелось описать ещё один интересный шаблон для микросервисной архитектуры, а именно saga (он же повествование).

Проблематика

Проекты Каруны, как и многие современные приложения, следуют парадигме микросервисной архитектуры. Поэтому паттерн в настоящее время актуален для нас как никогда. Для начала рассмотрим какой-нибудь простой кейс на примере интернет-магазина. Предположим, что некоторая часть нашей архитектуры разделена на микросервисы: order, который отвечает за создание самого заказа, и сервис goods, который отвечает за создание товаров в этом заказе. Создание заказа можно представить следующей схемой:

Клиент обращается непосредственно к сервису order, в котором создаётся сущность заказа. Сервис order создает в своей БД сущность заказа и посылает запрос на создание товаров в этом заказе в сервис goods. Если в сервисе goods произойдет ошибка, то нарушается согласованность данных в нашей системе — заказ уже записан в базу данных сервиса order, но товары не будут записаны в БД сервиса goods.

Методы решения

Многие слышали о так называемых распределённых транзакциях, которые, как предполагается, должны решать описанную выше проблему. Для управления такими транзакциями используется стандарт X/Open XA, следование которому гарантирует как сохранение всей транзакции, так и её откат. Одной из главных проблем распределённых транзакций является то, что их не поддерживают многие современные инструменты — такие как MongoDB, Kafka и т. д. 

Другая проблема связана с жёстким требованием: 100-процентная доступность всех сервисов во время исполнения такой транзакции. А так как каждый новый сервис понижает доступность системы в целом (доступность системы = произведению доступности каждого сервиса), использование распределённых транзакций будет хуже и хуже сказываться на приложении в целом при масштабировании. По этим причинам использование данного метода в современных приложениях не желательно.

Для решения нашей задачи отлично подходит паттерн saga. Он обеспечивает согласованность данных между сервисами, используя локальные транзакции и асинхронные сообщения. Логика исполнения паттерна размазана на несколько сервисов: после фиксации транзакции в одном сервисе он публикует асинхронное сообщение о завершении транзакции. Это сообщение инициализирует следующий этап выполнения паттерна, и так до конца "повествования".

Для отката изменений на N-ом этапе повествование использует компенсирующие транзакции, которые откатывают изменения и устраняют последствия выполнения предыдущих N-1 транзакций. Выполнение таких транзакций тоже происходит последовательно, но в обратном порядке. Паттерн также оперирует такими понятиями как поворотная транзакция (дальнейшие транзакции никогда не отказывают) и доступная для повторения транзакция (всегда заканчивается успешно).

Управление (координация) повествований может осуществляться 2 способами:

  • Хореография — решения о следующих шагах и откатах принимаются распределённо каждым сервисом по отдельности.

  • Оркестрация — логика координации централизуется в одном месте, из которого шлются сообщения в другие сервисы.

Проектирование

В нашем случае реализация паттерна будет выглядеть так:

Сервис

Транзакция

Компенсирующая транзакция

order

CreateOrder()

Доступная для компенсации

RemoveOrder()

goods

CreateGoods()

Поворотная

-

order

AcceptOrder()

Доступная для повторения

-

Реализуем saga паттерн, используя хореографию. Она проще в реализации, чем оркестрация, и хорошо ложится на наш случай небольшого количества сервисов. Применительно к описанной задаче, повествование можно представить следующей схемой:

Рассмотрим две возможных ситуации для нашего функционала . В первом случае клиент посылает запрос в сервис order POST /orders, и всё проходит отлично:

  1. Успешное создание сущности заказа в сервисе order инициализирует событие order_created_v1, на которое подписан сервис goods.

  2. После чтения этого события и успешного создания сущностей товаров в сервисе goods посылается событие goods_created_v1 в сервис order.

  3. При получении этого события сервис order меняет статус заказа в своей БД на approved.

Другой случай, это когда во время заказа что-то пошло не так:

  1. Снова создаётся заказ в сервисе order, и генерируется событие order_created_v1.

  2. Сервис goods, получив событие, пытается создать товары для этого заказа. Но что-то идёт не так, и товары не заносятся в БД. Посылается событие goods_rejected_v1 в сервис order.

  3. При получении этого события сервис order откатывает свои изменения из шага 1 — удаляет заказ.

Реализация

Реализовывать проект будем с помощью golang как языка сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами. Создадим локальное рабочее окружение, а именно — сервисы order, goods, БД для каждого сервиса и брокер kafka:

docker-compose.yml
version: '3.9'

services:
  order:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order
    environment:
      - HTTP_BIND=8080
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
      - HOST_DB=db-order
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
      - ORDER_CREATED_TOPIC=goods_created_v1
      - GOODS_REJECTED_TOPIC=goods_rejected_v1
    depends_on:
      - db-order
      - kafka
    volumes:
      - ./order:/app/order:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8080:8080"
    networks:
      - saga

  db-order:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
    ports:
      - "5441:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - saga

  goods:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: goods
    environment:
      - HTTP_BIND=8081
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
      - HOST_DB=db-goods
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
      - GOODS_CREATED_TOPIC=goods_created_v1
      - GOODS_REJECTED_TOPIC=goods_rejected_v1
    volumes:
      - ./goods:/app/goods:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    depends_on:
      - db-goods
      - kafka
    ports:
      - "8081:8081"
    networks:
      - saga

  db-goods:
    image: postgres:14
    environment:
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
    ports:
      - "5442:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - saga

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    networks:
      - saga

  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1,goods_rejected_v1:1:1
    networks:
      - saga

volumes:
  data:

networks:
  saga:

Сервис order включает в себя таблицы со статусами заказов и самими заказами:

CREATE TABLE statuses (
    id   BIGSERIAL PRIMARY KEY,
    name TEXT NOT NULL
);

INSERT INTO statuses (id, name) VALUES (1, 'PENDING'), (2, 'CREATED');

CREATE TABLE orders (
    id         BIGSERIAL PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    status_id  BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,

    FOREIGN KEY (status_id) REFERENCES statuses (id)
);

И реализует эндпоинт создания заказа:

...
server.router.HandleFunc("/v1/orders", s.CreateOrderV1).Methods(http.MethodPost)
...

В котором сохраняется сущность заказа в БД и продьюсится сообщение в kafka:

обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been read.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	orderData := model.OrderData{}
	err = json.Unmarshal(body, &orderData)
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	var orderID int64
	err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, status_id, created_at) VALUES ($1, 1, NOW()) RETURNING id`, orderData.UserID).Scan(&orderID)
	if err != nil {
		log.Error().Err(err).Msg("Order hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedOrderMsg{Data: model.Order{
		ID:       orderID,
		GoodsIds: orderData.GoodsIds,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
}

Cервис слушает топики goods_created_v1 и goods_rejected_v1 и в зависимости от полученного изменяет созданную сущность заказа:

обработка события goods_created_v1
func (gch GoodsCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gce := GoodsCreatedEvent{}
		err := json.Unmarshal(msg.Value, &gce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = gch.db.Exec(context.Background(), `UPDATE orders SET status_id = 2 WHERE id = $1`, gce.Data.OrderID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
обрабокта события goods_rejected_v1
func (grh GoodsRejectedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gre := GoodsRejectedEvent{}
		err := json.Unmarshal(msg.Value, &gre)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = grh.db.Exec(context.Background(), `DELETE FROM orders WHERE id = $1`, gre.Data.OrderID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}

Сервис goods содержит таблицу с заказами:

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    goods_id   BIGINT NOT NULL,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

И слушает событие order_created_v1, которое обрабатывает следующим образом:

обработrа события order_created_v1
func (och OrderCreatedHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		oce := OrderCreatedEvent{}
		err := json.Unmarshal(msg.Value, &oce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		ctx := context.Background()
		tx, err := och.db.Begin(ctx)
		for _, goodsID := range oce.Data.GoodsIds {
			_, err = och.db.Exec(context.Background(), `INSERT INTO goods (goods_id, order_id, created_at) VALUES ($1, $2, NOW())`, goodsID, oce.Data.ID)
			if err != nil {
				tx.Rollback(ctx)
				log.Error().Err(err).Msg("Inserting error.")

				err := och.sendRejected(oce.Data.ID)
				if err != nil {
					log.Error().Err(err).Msg("Event hasn't been sent.")
				}
				session.MarkMessage(msg, "")
				continue
			}
		}

		err = tx.Commit(ctx)
		if err != nil {
			err := tx.Rollback(ctx)
			log.Error().Err(err).Msg("Transaction commit error.")
			rErr := och.sendRejected(oce.Data.ID)
			if rErr != nil {
				log.Error().Err(rErr).Msg("Event hasn't been sent.")
			}
			session.MarkMessage(msg, "")
			continue
		}

		err = och.sendCreated(oce.Data.ID)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been sent.")
		}
		session.MarkMessage(msg, "")
	}

	return nil
}

func (och OrderCreatedHandler) sendRejected(orderID int64) error {
	msg := model.RejectedGoodsMsg{Data: model.Goods{
		OrderID: orderID,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_REJECTED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = och.producer.SendMessage(producerMsg)
	return err
}

func (och OrderCreatedHandler) sendCreated(orderID int64) error {
	msg := model.CreatedGoodsMsg{Data: model.Goods{
		OrderID: orderID,
	}}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = och.producer.SendMessage(producerMsg)
	return err
}

Теперь мы может сделать запрос к сервису и создать заказ:

curl --request POST \
   --header "Content-Type: application/json" \
   --data '{"user_id":1,"goods_ids":[1,2]}' \
   'http://localhost:8080/v1/orders'

В успешном варианте развития событий мы получаем полностью сформированную согласованную сущность заказа с соответствующими товарами. В противном случае неудача в сервисе goods влечёт за собой откат транзакции в order. Таким образом мы добились, чего хотели — согласованности данных в распределённой системе. На моем github представлен полный листинг описанной архитектуры. Для реализации повествования на основе оркестрации есть готовая библиотека здесь, заодно можно подробнее ознакомиться с принципами её работы.

Заключение

  • Паттерн saga хорошо решает проблему согласованности в распределённых системах сервисов.

  • Одна из главных проблем повествований — что они являются ACD. У них нет изолированности, это вызывает аномалии (по аналогии с аномалиями в СУБД). Одни повествования могут влиять на данные, с которыми работают другие повествования. Для компенсации этого недостатка паттерн должен реализовывать контрмеры.

  • Если сравнивать повествование на основе оркестрации и хореографии, то второй метод проще в реализации, сложнее для понимания и подходит лучше для простых кейсов. Среди явных недостатков можно выделить возможность возникновения жёсткого связывания, т.к каждый сервис подписывается на все события, которые на него влияют.

  • Повествование на основе оркестрации не создает циклических зависимостей, и бизнес-логика значительно проще, чем в хореографии. Меньше связывания — у каждого сервиса своё API, которое вызывается оркестратором. Лучше подходит для сложных повествований.

Комментарии (15)


  1. crocodile2u
    06.12.2021 14:19

    INSERT INTO statuses (id, name) VALUES (1, 'PENDING'), (2, 'CREATED');

    Для двух статусов табличка, конечно, совершенно необходима.


    1. KislyFan
      07.12.2021 19:04
      +1

      Почему нет? C точки зрения поддержки целостности тут есть только одна альтернатива, это postgresql enums. Но это ни разу не серебрянная пуля, особенно если список значений надо будет расширять..


      1. crocodile2u
        07.12.2021 19:34

        Я за ENUM. Нагляднее, проще, на один JOIN меньше.
        Но я придрался скорее к тому, что такая табличка используется в примере, который не должен отвлекать от главного.


  1. host13
    06.12.2021 14:25
    +4

    Что делать, если не удалось выполнить компенсирующую транзакцию?


    1. alexg-nn
      06.12.2021 14:44
      +3

      Великая тайна сия есть. Но вообще компенсирующая операция должна быть идемпотентной, так что её можно повторять до упора, пока не получится хоть что-то в ответ получить.


      1. marshinov
        06.12.2021 15:33

        угу и за DDOS-ить себя этим "повторным выполнением". @host13 Вам вот [типа этого](https://getakka.net/articles/concepts/supervision.html) надо. В оригинальной Akka тоже эти концепты есть, но в .NET с картинками.


        1. alexg-nn
          06.12.2021 15:51

          В общем случае там логика, конечно, сложнее простого retry. Вот тут интересно про это написано https://habr.com/ru/company/oleg-bunin/blog/418235/


          1. marshinov
            06.12.2021 15:54
            +1

            Нормально так сложнее. В целом, saga работает примерно так, как работают распределенные транзакции, т.е. в общем случае не работает.


    1. slava-a Автор
      06.12.2021 14:48

      Как вариант, сохранять в бд «неудачу». Демоном уже обрабатывать отдельно неудавшиейся компенсирующие транзакции


      1. PrinceKorwin
        06.12.2021 15:47

        Это только половина дела. Нужно уметь пока они наказываются как-то ведь фильтровать данные которые "на половину" верны.

        Так называемое грязное чтение избежать.


        1. slava-a Автор
          06.12.2021 15:59

          Один из вариантов решения такой проблемы как раз в описанном примере. А именно статусы, по которым можно осуществлять фильтрацию


  1. OlegStrekalovsky
    06.12.2021 18:58
    +1

    В ваших примерах обработчик CreateOrderV1 и обработчик события order_created_v1 совершенно не готовы к повторам операций, которые кажется должны случиться когда вы закроете транзакцию, но не сможете отправить сообщение: в базе будут задваиваться и заказы и товары.


    1. slava-a Автор
      06.12.2021 19:04

      да, вы правы, много чего нет т.к пример чисто концептуальный

      со всеми деталями он бы был раза 3 больше


      1. OlegStrekalovsky
        06.12.2021 19:29

        Можете хотя бы словами описать как вы эти проблемы решали? Это достаточно важные детали, чтобы даже этот более простой пример саги, когда можно положиться на надёжный транспорт сообщений, стал более похож на то, что можно реально использовать.


        1. slava-a Автор
          07.12.2021 11:39

          1) использовать паттерн дроссельная заслонка (throttle), чтобы избежать одинаковых запросов в единицу времени
          2) формировать ключ идемпотентности (включить в него ip и метку времени, например) для каждого запроса. проверяя его, можно отсеить лишние запросы