Сегодня рассмотрю архитектурный паттерн CQRS и его возможное место в вашей архитектуре. Также осуществим его реализацию на языке golang.

Проблематика

В некоторых проектах Каруны мы стремимся к микросервисной архитектуре. У этой концепции много плюсов, но она создаёт некоторые трудности. Одну из таких трудностей и метод её преодоления я хочу рассмотреть в данной статье. 

Для простоты возьмём универсальный пример в виде приложения интернет-магазина. Предположим, он имеет микросервисную архитектуру и следует доменной модели. Одной из главных частей нашего приложения является функционал, связанный с заказами пользователей и с товарами в этих заказах. В нашей архитектуре есть два соответствующих сервиса: order и goods. Сервис order отвечает за создание, обновление, удаление и чтение сущностей заказа (order), а сервис goods реализует тот же CRUD с товарами заказа. Наши клиенты (мобильные приложения, браузерное приложение, и т.д.) взаимодействуют с этими сервисами, и для удобства у нас реализован паттерн объединение API. Т.е разработан сервис, выполняющий роль API-композитора, работающий с данными наших сервис-провайдеров order и goods. Общую архитектуру можно представить следующим образом:

Что касается API-композитора, то его роль может выполнять веб-приложение, API-шлюз или отдельный сервис. Но выбор варианта в нашем случае выходит за рамки темы данной статьи.

Вроде бы всё неплохо: есть одна точка входа в приложение, логика разнесена по доменным областям. Но что делать в случае, когда API-композитору нужно выполнить нетривиальные выборки и объединять большие наборы данных?

Бизнес просит, чтобы были реализованы сложные фильтры и пагинация. Например, нужно выбрать заказы, в которых количество товаров больше заданного N. В этом случае нам нужно делать полную выборку из сервиса goods, и на композитор будет ложиться задача объединения и фильтровки большого количества данных. Это неэффективно и задействует большое количество памяти. Как раз в этом случае на помощь приходит шаблон CQRS, который и решает проблему.

Проектирование

Суть шаблона CQRS (command query respon­sibility segregation — разделение ответственности командных запросов) заключается в разделении модулей и данных на две отдельные части: команды и запросы. Командные модули реализуют операции: create, update, delete. Модуль запросов реализует получение данных (get). Помимо улучшенного разделения ответственностей, преимущество данного шаблона заключается в том, что сервисы могут хранить данные в таком виде, в каком это удобно для более эффективных запросов. В нашем случае сервисы order, goods будут выполнять только команды, а новый сервис order-history возьмёт на себя ответственность в реализации запросов. Посмотрим, как меняется архитектура с внедрением шаблона CQRS:

Теперь сервисы order и goods отвечают только за изменение соответствующих им сущностей: заказов и товаров. Все эти изменения, помимо записи в базы данных сервисов, публикуются в виде событий. Новый сервис order-history отвечает только за запросы на чтение данных. Он подписывается на события из order, goods и заполняет/изменяет свою базу данных.

Реализация

Попробуем реализовать с помощью следующего стэка: golang как язык сервисов, postgresql для баз данных и kafka для обмена сообщениями между сервисами.

Создадим локальное рабочее окружение, а именно — сервисы order, goods, order-history, БД для каждого сервиса и брокер kafka:

docker-compose.yml
version: '3.9'

services:
  order:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order
    environment:
      - HTTP_BIND=8080
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
      - HOST_DB=db-order
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - ORDER_CREATED_TOPIC=order_created_v1
    depends_on:
      - db-order
      - kafka
    volumes:
      - ./order:/app/order:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8080:8080"
    networks:
      - cqrs

  db-order:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders
      - POSTGRES_USER=orders_user
      - POSTGRES_PASSWORD=orders_password
    ports:
      - "5441:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  goods:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: goods
    environment:
      - HTTP_BIND=8081
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
      - HOST_DB=db-goods
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - GOODS_CREATED_TOPIC=goods_created_v1
    volumes:
      - ./goods:/app/goods:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    depends_on:
      - db-goods
      - kafka
    ports:
      - "8081:8081"
    networks:
      - cqrs

  db-goods:
    image: postgres:14
    environment:
      - POSTGRES_DB=goods
      - POSTGRES_USER=goods_user
      - POSTGRES_PASSWORD=goods_password
    ports:
      - "5442:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  order-history:
    build:
      dockerfile: .docker/app.Dockerfile
      context: ./
      args:
        SERVICE_NAME: order-history
    environment:
      - HTTP_BIND=8082
      - POSTGRES_DB=orders_history
      - POSTGRES_USER=orders_history_user
      - POSTGRES_PASSWORD=orders_history_password
      - HOST_DB=db-order-history
      - PORT_DB=5432
      - KAFKA_ADDR=kafka:9092
      - GOODS_CREATED_TOPIC=goods_created_v1
      - ORDER_CREATED_TOPIC=order_created_v1
    depends_on:
      - db-order-history
      - kafka
    volumes:
      - ./order-history:/app/order-history:delegated
      - ./.docker/entrypoint.sh:/entrypoint.sh:ro
    entrypoint: /entrypoint.sh
    ports:
      - "8082:8082"
    networks:
      - cqrs

  db-order-history:
    image: postgres:14
    environment:
      - POSTGRES_DB=orders_history
      - POSTGRES_USER=orders_history_user
      - POSTGRES_PASSWORD=orders_history_password
    ports:
      - "5443:5432"
    volumes:
      - data:/var/lib/postgresql
    networks:
      - cqrs

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    networks:
      - cqrs
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: order_created_v1:1:1,goods_created_v1:1:1
    networks:
      - cqrs

volumes:
  data:

networks:
  cqrs:

Сервис order будет сохранять заказы в таблицу:

CREATE TABLE orders (
    id         BIGSERIAL PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

И реализовывать эндпоинт для создания заказа:

func NewServer(db *pgxpool.Pool, kafkaProducer sarama.SyncProducer) Server {
	s := Server{}
	s.kafkaProducer = kafkaProducer
	s.db = db
	s.router = mux.NewRouter()

	s.router.HandleFunc("/v1/order", s.CreateOrderV1).Methods(http.MethodPost)

	return s
}

Который обрабатывает создание заказа, сохраняет его в БД и шлет событие в топик order_created_v1:

обработчик CreateOrderV1
func (s Server) CreateOrderV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	userID := r.Form.Get("userId")

	order := model.Order{}
	err = s.db.QueryRow(context.Background(), `INSERT INTO orders (user_id, created_at) VALUES ($1, NOW()) RETURNING id, user_id, created_at`, userID).Scan(&order.ID, &order.UserID, &order.CreatedAt)
	if err != nil {
		log.Error().Err(err).Msg("Order hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedOrderMsg{Data: order}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("ORDER_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

Сервис goods реализуется аналогично. Таблица для хранения товаров заказов:

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

Эндпоинт для создания товара в заказе:

s.router.HandleFunc("/v1/goods", s.CreateGoodsV1).Methods(http.MethodPost)

И обработчик для создания товара в заказе и отсылки события в топик goods_created_v1:

обработчик CreateGoodsV1
func (s Server) CreateGoodsV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	orderID := r.Form.Get("orderId")

	goods := model.Goods{}
	err = s.db.QueryRow(context.Background(), `INSERT INTO goods (order_id, created_at) VALUES ($1, NOW()) RETURNING id, order_id, created_at`, orderID).Scan(&goods.ID, &goods.OrderID, &goods.CreatedAt)
	if err != nil {
		log.Error().Err(err).Msg("Goods hasn't been created.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := model.CreatedGoodsMsg{Data: goods}
	msgStr, err := json.Marshal(msg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	producerMsg := &sarama.ProducerMessage{Topic: os.Getenv("GOODS_CREATED_TOPIC"), Value: sarama.StringEncoder(msgStr)}
	_, _, err = s.kafkaProducer.SendMessage(producerMsg)
	if err != nil {
		log.Error().Err(err).Msg("Message hasn't been sent.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
}

БД сервиса order-history имеет следующую структуру:

CREATE TABLE orders (
    id         BIGINT PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE TABLE goods (
    id         BIGSERIAL PRIMARY KEY,
    order_id   BIGINT NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders (id)
);

Сервис слушает события order_created_v1 и goods_created_v1 и записывает данные в свою БД:

обработка события order_created_v1
...
type OrderCreatedEvent struct {
	Data struct {
		ID        int64     `json:"id"`
		UserID    int64     `json:"user_id"`
		CreatedAt time.Time `json:"created_at"`
	} `json:"data"`
}

func (oh OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		oce := OrderCreatedEvent{}
		err := json.Unmarshal(msg.Value, &oce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = oh.db.Exec(context.Background(), `INSERT INTO orders (id, user_id, created_at) VALUES ($1, $2, $3)`, oce.Data.ID, oce.Data.UserID, oce.Data.CreatedAt)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
...
обработка события goods_created_v1
...
type GoodsCreatedEvent struct {
	Data struct {
		ID        int64     `json:"id"`
		OrderID   int64     `json:"order_id"`
		CreatedAt time.Time `json:"created_at"`
	} `json:"data"`
}

func (gh GoodsHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		gce := GoodsCreatedEvent{}
		err := json.Unmarshal(msg.Value, &gce)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been handled.")
			session.MarkMessage(msg, "")
			continue
		}

		_, err = gh.db.Exec(context.Background(), `INSERT INTO goods (id, order_id, created_at) VALUES ($1, $2, $3)`, gce.Data.ID, gce.Data.OrderID, gce.Data.CreatedAt)
		if err != nil {
			log.Error().Err(err).Msg("Event hasn't been inserted.")
		}

		session.MarkMessage(msg, "")
	}

	return nil
}
...

Сервис будет реализовывать эндпоинт получения данных:

	s.router.HandleFunc("/v1/order-history", s.GetOrderHistoryV1).Methods(http.MethodGet)

И сама реализация метода:

обработчик GetOrderHistoryV1
func (s Server) GetOrderHistoryV1(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		log.Error().Err(err).Msg("Data hasn't been parsed.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	threshold := r.Form.Get("threshold")
	offset := r.Form.Get("offset")
	limit := r.Form.Get("limit")

	rows, err := s.db.Query(context.Background(), `SELECT orders.id, orders.user_id, orders.created_at FROM orders
    	INNER JOIN goods ON goods.order_id = orders.id 
		GROUP BY orders.id 
		HAVING COUNT(goods.id) > $1
		LIMIT $2 OFFSET $3`, threshold, limit, offset)
	if err != nil {
		log.Error().Err(err).Msg("Goods haven't been got.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	data := make([]model.Order, 0)
	for rows.Next() {
		o := model.Order{}
		sErr := rows.Scan(&o.ID, &o.UserID, &o.CreatedAt)
		if sErr != nil {
			log.Error().Err(err).Msg("Reading error.")
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		data = append(data, o)
	}

	ordersRsp := model.OrdersResponse{Data: data}
	response, err := json.Marshal(ordersRsp)
	if err != nil {
		log.Error().Err(err).Msg("Response hasn't been marshaled.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	w.Write(response)
}

Теперь мы без лишних накладных расходов можем выполнить запрос на получение заказов, в которых количество товаров больше 1.

curl 'http://localhost:8082/v1/order-history?limit=2&threshold=1&offset=0'

Таким образом, мы получаем данные без сложной обработки, вся логика получения данных и обработки реализована в сервисе order-history.

Для простоты мы реализовали только создание сущностей и чтение их в сервисе order-history. Реализация обновления данных может усложнить логику сервисов, т.к нужно поддерживать конкурентное обновление данных и как-то обрабатывать повторяющиеся события.

Полный листинг реализации данного функционала вы можете найти на моем github.

Заключение

  • Реализация шаблона CQRS позволяет эффективно разделить логику приложения: эффективно реализуются запросы и улучшить общее разделение ответственности

  • В модуле запросов можно использовать другие СУБД помимо PostgreSQL — в том числе и аналитические. Например, clickhouse или vertica. Также можно использовать NoSQL хранилища типа MongoDB или DynamoDB.

  • Несмотря на преимущества, CQRS влечёт за собой усложнение архитектуры (администрирование и обслуживание БД). Может появиться рассинхронизация между представлениями для БД команд и запросов. За этим тоже необходимо следить.

  • Модуль представлений сложен в обслуживании: проблемы конкурентного обновления данных и повторяющихся событий.

  • CQRS хорошо совместим с event sourcing.

Комментарии (8)


  1. physics7
    26.10.2021 16:52
    +2

    Патерн любопытный, но не совсем улавливаю смысл разделения сервисов с дополнительными базами. Это выглядит так - хочу избавиться от геморроя при помощи наждачки (извините за такое сравнение). Предположим, у меня крутятся REST сервисы (порядка 20 на данный момент) в среде кубрернетис кластера. Каждый из сервисов требует определенные ресурсы для своего контейнера, в котором он запускается. В рамках патерна CQRS мне необходимо разделить все сервисы на сервисы только чтения и на сервисы "изменения" данных, то есть увеличивается количество требуемых ресурсов в кластере уже для 40 сервисов плюс сервисы для синхронизации баз данных, да и, возможно, для самих БД.
    На данном этапе мы решаем проблему "больших" объемов - пагинацией, то есть обработки данных порциями, при чем такая порционность приводит к возможности распараллеливания процесса.
    Я ни в коем случае не критикую этот паттерн, просто хочу лучше понять когда его лучше использовать...


    1. slava-a Автор
      26.10.2021 17:01
      +1

      Паттерны должны решать проблемы, а не создавать их. Возможно, у Вас простая бизнес-логика приложения и паттерн действительно не применим, а только усложнит все.

      А смысл его в повышении эффективности запросов на SELECT. Например, если какая-то бизнес логика с множественными JOIN'ами, группировками, агрегациями, то сервис агрегатор может невероятно усложниться и начать не справляться с таким по ресурсам.

      Что касается параллельности: опять же, результат запроса с одного сервиса может использоваться для запроса в другой сервис. Тут уже нужно как-то убирать параллельность и получаем усложнение сервиса.


      1. physics7
        26.10.2021 17:13

        Спасибо за ответ. На мой взгляд этот паттерн применим в случае множественности источников создания (изменения) данных - например, датчики каких-либо данных, а вот для наблюдателя (анализатора) достаточно только чтение...
        Вы правы, у нас нет настолько сложной логики в запросах...


        1. Deosis
          27.10.2021 06:45
          +1

          Паттерн вообще никак не связан с дроблением сервисов. Можно его реализовать и на одном сервисе.

          Основная идея в том, что запросы только возвращают данные, а команды только меняют их.


    1. boblgum
      26.10.2021 17:32

      все ниже сказанное сугубо имхо

      как я понимаю и преподношу CQRS это в первую очередь "смысловое" разделение бизнеса, его логики и соответственно исполнения. все основывается на том, что относительно "одни и те же данные" в разных контекстах, оказывается, совсем не обязательно "совместимы".

      например, в разных контекстах условного онлайн-магазина данные пользователя магазином могут быть совершенно разными. для формирования заказа нужен адрес доставки, а для бухгалтерии эта информация совершенно неинтересна.

      повышенная эффективность запросов данных это только второстепенный аспект. в первую очередь достигается чистота и порядок в данных нужных для реализации бизнеса в определенном контексте.

      по отношению к REST данный паттерн также имеет преимущество в том, что он более соответствует реальности. так как коммуникация осуществляется путем конкретно определенных команд и запросов. каждая команда или запрос соответсвует определенному бизнес-кейсу. а вот REST относительно далек от бизнеса пытаясь "переписать" процессы с помощью жестко ограниченного набора глаголов.

      ради забавы. попробуйте "понятно" описать в REST такой процесс как "расторжение договора" со всеми (ну, хотя бы несколько) истекающими из этого последствиями


    1. onets
      26.10.2021 18:11

      CQRS - разделение контекстов на чтение и запись. БД кстати на первых порах может быть тоже одна. Позже могут появится вьюхи/денормализация, которые будут отражены только в контексте на чтение. А еще позже можно выделить отдельную базу на отдельном сервере для построения отчетов например.

      Оба эти контекста можно использовать и внутри одного сервиса. А можно и наплодить сервисов, как автор данной статьи.

      Но что на самом деле в статье не рассмотрено, так это действительно сложная логика. И вот там начинается веселье. Лично у меня складывается мнение, что CQRS без «надстроек» подходит только, чтобы обернуть в запросы и команды select/insert/update/delete.


  1. no_future
    26.10.2021 23:32
    +1

    Имхо. CQRS как идея попахивает фанатизмом, мне больше нравится угол зрения «сложные запросы требуют специализированных СУБД».

    Есть микросервисы-владельцы мастер-данных по товарам и заказам — отлично, значит, они знают обо всех изменениях товаров и заказов и могут паблишить эту информацию для всех желающих. Например, для микросервиса аналитических запросов по товарам-заказам.

    Конечно, в нагрузку мы получаем дублирование данных в системе и риск их рассинхронизации, но это совсем другая история.


  1. AlexSpaizNet
    28.10.2021 15:20

    CQRS отличный паттерн. Особенно в связке с NoSQL базами. Позволяет создавать кайнд оф матириалайзд вьюз, просто на апликативной уровне а не на уровне базы (оракл, майкрософт). Как результат, позволяет не только строить независимые сервисы, но и быстрые и оптимизированные, потому что структура данных в базе заточена под определенные запросы.

    Используем во многих местах, и пока довольны. Read сервисы получаются маленькими, оптимизированными, с предиктэбл перформансом, легко поддерживаемые и тестируемые.

    Особенный профит в местах где количество запросов на чтение превосходит количество запросов на запись. Разделение ворклоада позволяет базе на запись не тормозить потому что с нее никто не читает и не нужно выделать ресурсы на чтение.