Проблема CRUD-подхода

// Проблемы:
// 1. История изменений теряется
// 2. Конфликты при конкурентных обновлениях (или использование блокировок)
func UpdateOrderStatus(orderID string, status Status) error {
    return db.Exec("UPDATE orders SET status=? WHERE id=?", status, orderID)
}

Решение: CQRS и Event Sourcing

Архитектурное ядро

Архитектура
Архитектура

Ключевые компоненты архитектуры

1. Команда (Command)

Запрос на выполнение действия ("Завершить заказ", "Списать средства")

Компонент на диаграмме
Компонент на диаграмме

Характеристики:

  • Может быть отклонена бизнес-правилами

  • Не возвращает данные (только статус выполнения)

Пример:

type CompleteOrderCommand struct {
    OrderID string
    UserID  string
}

2. Агрегат (Aggregate)

Компонент на диаграмме
Компонент на диаграмме

Хранитель бизнес-правил, который:

  • Восстанавливает текущее состояние из истории событий

  • Проверяет возможность выполнения команды

  • Генерирует новые события при успешной проверке

Важно: Агрегат не сохраняет состояние, только содержит логику.

Пример:

// LoadFromHistory восстанавливает состояние (оптимизировано с учетом снапшотов)
func LoadFromHistory(events []Event, snapshot *Snapshot) *OrderAggregate {
    agg := &OrderAggregate{
        id: events[0].AggregateID(),
    }
    
    if snapshot != nil {
        agg.applySnapshot(snapshot)
    }
    
    for _, event := range events {
        if event.Version() > agg.version {
            agg.applyEvent(event)
        }
    }
    
    return agg
}

// Complete обрабатывает команду
func (a *OrderAggregate) Complete(cmd CompleteOrderCommand) ([]Event, error) {
    if a.status != Paid {
        return nil, fmt.Errorf("order %s must be paid first", a.id)
    }

    return []Event{
        NewOrderCompletedEvent(a.id, cmd.UserID, a.version+1),
    }, nil
}

// applyEvent применяет событие к состоянию
func (a *OrderAggregate) applyEvent(event Event) {
    switch e := event.(type) {
    case OrderCompletedEvent:
        a.status = Completed
        a.version = e.Version()
    // обработка других типов событий
    }
}

// applySnapshot применяет снапшот
func (a *OrderAggregate) applySnapshot(s Snapshot) {
    a.version = s.Version
    a.status = s.Status
    // другие поля
}

// TakeSnapshot создает снапшот
func (a *OrderAggregate) TakeSnapshot() Snapshot {
    return Snapshot{
        AggregateID: a.id,
        Version:    a.version,
        Status:     a.status,
        // другие поля
    }
}

3. Событие (Event)

Неизменяемая запись о произошедшем изменении.

Компонент на диаграмме
Компонент на диаграмме

Свойства:

  • Содержит все релевантные данные

  • Сериализуемо и сохраняемо

Пример:

 // Факт произошедшего изменения
 type OrderCompletedEvent struct {
   OrderID      string
   UserID       string
   CreatedAt    time.Time
 }

4. Хранилище событий (Event Store)

Компонент на диаграмме
Компонент на диаграмме

Append-only журнал, который:

  • Гарантирует сохранение событий

  • Позволяет воспроизвести историю для любого агрегата

  • Реализует оптимистичные блокировки через версии

Пример:

func (es *EventStore) Append(aggregateID string, events []Event, expectedVersion int) error {
    currentVersion := es.GetVersion(aggregateID)
    if expectedVersion != currentVersion {
        return ErrConcurrentModification
    }
    
    // Append-only запись
    for _, event := range events {
        record := EventRecord{
            ID:         uuid.New(),
            AggregateID: aggregateID,
            Version:    currentVersion+1,
            Type:       event.Type(),
            Data:       event.Data(),
            Timestamp:  time.Now(),
        }
        es.db.Create(&record)
        currentVersion++
    }
    
    return nil
}

5. Шина событий (Event Bus)

Компонент на диаграмме
Компонент на диаграмме

Асинхронная доставка событий подписчикам через:

  • Внутрипроцессные каналы

  • Внешние брокеры

6. Проектор (Projector)

Трансформирует события в оптимизированные модели чтения.

Компонент на диаграмме
Компонент на диаграмме

Особенности:

  • Создает несколько различных представлений

  • Работает асинхронно и независимо

  • Допускает eventual consistency

Пример:

func (p *OrderProjector) HandleEvent(event Event) {
        switch e := event.(type) {
        case OrderCompletedEvent:
            return p.updateOrderStatus(e.OrderID, "completed")
        // ... другие типы событий
        }
        return nil
}

7. Обработчик запросов (Query Handler)

Поставщик данных для чтения, работающий с проекциями.

Компонент на диаграмме
Компонент на диаграмме

Пример:

func GetOrderSummary(orderID string) (*OrderSummary, error) {
    var summary OrderSummary
    err := db.Where("id = ?", orderID).First(&summary).Error
    return &summary, err
}

Минимальный цикл

// 1. Получение команды
cmd := CompleteOrderCommand{OrderID: "123", UserID: "u456"}

// 2. Загрузка событий
events := eventStore.Load("123")

// 3. Восстановление агрегата
order := LoadOrder(events)

// 4. Обработка команды
newEvents, err := order.Complete(cmd)

// 5. Сохранение событий
eventStore.Save("123", newEvents, order.Version)

// 6. Публикация событий
for _, event := range newEvents {
    eventBus.Publish(event)
}

// 7. Запрос данных (где-то в другом месте)
summary := GetOrderSummary("123")
fmt.Println(summary.Status) // "completed"

Ключевые потоки данных

Командный поток

Клиент → CommandHandler → EventStore → Aggregate → Сохранение → EventBus

Поток запросов

EventBus → Projector → ReadDB ← QueryHandler ← Клиент

Сравнение с традиционным подходом

Традиционный CRUD

CQRS/ES

UPDATE orders SET status=?

Append OrderCompleted event

Текущее состояние в БД

Состояние = Σ всех событий

Нет истории изменений

Полный аудит автоматически

Блокировки для консистентности

Оптимистичные блокировки через версии

Одна модель для чтения/записи

Раздельные оптимизированные модели

Ключевые выводы

  1. Фундаментальный сдвиг парадигмы
    Состояние системы = история всех событий, а не последний snapshot данных.
    Преимущество: Полный аудит и возможность "переиграть" историю.

  2. Жесткое разделение ответственности

    Команды (Write): "Сделай что-то" → Генерируют события

    Запросы (Read): "Покажи данные" → Читают оптимизированные проекции Результат: Независимое масштабирование операций записи и чтения.

  3. Агрегаты - хранители бизнес-логики
    Не хранят состояние постоянно, а воссоздают его из событий и принимают решения.

  4. События как источник истины

    Неизменяемые факты

    Содержат всю информацию об изменении

    Append-only хранилище = гарантия сохранности истории

  5. Проекции - гибкие представления
    Можно создавать несколько специализированных моделей для разных задач.

Комментарии (7)


  1. kulaginds
    30.07.2025 15:16

    Звучит вкусно, но есть множество подводных камней.

    Есть проект, который построен на этой архитектуре - Zitadel.

    Вместе с плюсами, есть минусы:

    • каждое обновление агрегата сопровождается блокировкой агрегата для вычисления currentSequence

    • для логина и других чувствительных к актуальности операциях придется ходить в этот append-log, на-лету вычислять актуальное состояние и производить операцию; а теперь представьте какая нагрузка на базу будет, если несколько тысяч пользователей пойдет логиниться

    • таблицу event store будет очень сложно шардировать, так как у каждого агрегата будет свой ключ; а эта таблица будет самой большой

    • можно словить приколов с большим потоком записи (как вы построите восстановление проекции без перехода по позиции сообщения?) - события будут пропускаться

    • еще можно словить приколов с Projector, когда он не сможет обработать сообщение: что с ним делать: пропускать или повторять? А как быть с консистентностью после пропуска?

    В заключение скажу, что команда разработки Zitadel словила множество проблем с этой архитектурой и планирует перейти к традиционной архитектуре реляционной базы, а event store оставить для аудита и внешних интеграций: https://github.com/zitadel/zitadel/issues/9599


    1. kulaginds
      30.07.2025 15:16

      Кстати про сравнение:

      • историю изменений можно складывать рядом с главной таблицей

      • оптимистичные блокировки можно реализовать и без CQRS

      • модели для чтения/записи можно дополнить вьюхами или материализованными вьюхами

      Еще в CQRS проекции всегда будут отставать и никогда не сможете это контролировать, так как это будет зависеть от нагрузки на приложение в данный момент.


  1. alserok Автор
    30.07.2025 15:16

    Добрый день!
    Cогласен со всеми замечаниями, однако хочу добавить:

    CQRS/ES — это инструмент для специфичных сценариев, где важны: Полный аудит и трассируемость изменений, Сложная бизнес-логика с частыми изменениями правил, Гибкость в создании новых представлений данных, Приемлемость получения данных с задержкой проекций (Eventual Consistency)

    Для высоконагруженных сценариев вроде аутентификации можно использовать гибрид: ES для "долгой" логики (заказы, платежи) + CRUD/кэш для "горячих" путей (сессии, профили).

    P.S. Если проект не требует этих преимуществ — действительно, проще обойтись CRUD операциями. ES добавляет сложности, которые должны окупаться бизнес-требованиями.


  1. Dhwtj
    30.07.2025 15:16

    Дороговато валидация будет обходиться. Каждый раз восстанавливать состояние прочитав десятки сотни событий.

    Тут нужен валидатор на основе конечного автомата


    1. PrinceKorwin
      30.07.2025 15:16

      Каждый раз восстанавливать состояние прочитав десятки сотни событий

      Как решение предлагается делать промежуточные снапшоты с которых пойдет восстаровление.


      1. Dhwtj
        30.07.2025 15:16

        Если снапшоты ставить часто то теряются преимущества.


        1. PrinceKorwin
          30.07.2025 15:16

          CQRS подход он вообще из одних компромиссов состоит.

          Очень специфический паттерн и нужно осознано его применять.