Привет, Хабр! Меня зовут Никита, я являюсь разработчиком в направлении SSL инфраструктурной команды биллинга в Timeweb Cloud. Сегодня я хочу рассказать, как мы наводили порядок в коде одного из наших микросервисов, почему отказались от лапши в контроллерах, и главное — почему мы решили выложить наш внутренний архитектурный фреймворк в Open Source. Если вы пишете на Python и хоть раз сталкивались с болью распределенных транзакций, отваливающихся внешних API и проблемой dual-write (двойной записи) — присаживайтесь поудобнее. Речь пойдет про наш open-source фреймворк python-cqrs (он же доступен на PyPI).

❯ 1. Введение: Боль и проблема

В нашей инфраструктуре есть бизнес-процессы, которые не сводятся к простому синхронному HTTP-запросу. Отличный пример — автоматизированный выпуск SSL-сертификатов. Казалось бы, что сложного? Принял запрос от клиента, сходил по API к внешнему провайдеру, получил данные, отдал клиенту. Но реальность сурова. Взаимодействие с внешними провайдерами — это распределенный, долгоживущий процесс (long-running process), который может занимать часы. Типичный флоу заказа выглядит примерно так:

  1. Подать заявку (сделать заказ через внешнее API).

  2. Получить токены (challenges) для подтверждения прав.

  3. Установить эти токены на серверах.

  4. Ждать. Обновление систем (например, DNS) может занимать от 15 минут до нескольких часов.

  5. Инициировать проверку на стороне провайдера.

  6. Получить финальный результат и применить его.

В чем боль? На любом из этих шагов что-то может пойти не так. Внешний сервер ответит 500-кой, данные не успеют обновиться, процесс упадет и т.д.

Раньше, в «старой парадигме», вся логика таких процессов часто лежала прямо в Celery-тасках или «толстых» сервисах. Мы писали бесконечные try/except, заводили отдельные таски на ожидание, плодили десятки флагов состояний в базе данных и писали скрипты-чистильщики, которые пытались восстановить зависшие заявки. Со временем разобраться в этом графе переходов становилось невозможно, код был хрупким, а добавление интеграции с новым провайдером сертификатов превращалось в боль.

К какому решению пришли? Мы поняли, что нам нужен строгий архитектурный подход. Мы форкнули библиотеку diator, сильно ее переработали, обкатали на наших high-load сервисах и выпустили в Open Source как python-cqrs.

Давайте посмотрим, как она решает проблемы в коде, не раскрывая нашу внутреннюю бизнес-логику, а используя упрощенные концептуальные примеры.

❯ 2. CQRS: Изолируем намерения от состояния

Первое, что мы сделали — «похудели» наши контроллеры в FastAPI. HTTP-ручка больше не должна ничего знать про внешние API, сложную валидацию или работу с базой данных. Ее единственная задача — принять JSON и задекларировать намерение пользователя (Команду).

Смотрите, насколько лаконичным становится FastAPI роут при использовании python-cqrs:

# src/presentation/api/routes.py
import cqrs
import fastapi
from my_app.presentation import dependencies
from my_app.services import commands, responses
from my_app.presentation.api import schemas

router = fastapi.APIRouter(prefix="/orders", tags=["Заказы"])


@router.post("/", response_model=responses.OrderCreationResult)
async def create_order(
    data: schemas.CreateOrderSchema,
    # Внедряем медиатор запросов
    mediator: cqrs.RequestMediator = fastapi.Depends(dependencies.request_mediator_factory),
):
"""Создание заказа"""
    # Создаем команду (они наследуются от cqrs.Request)
    command = commands.CreateOrderCommand(
        customer_id=data.customer_id,
        target_domain=data.domain,
        # ...
    )
    # Роут просто кидает CQRS-команду в медиатор и ждет результат
    result: responses.OrderCreationResult = await mediator.send(command)
    return result

Никакой бизнес-логики. Медиатор сам найдет нужный обработчик (cqrs.RequestHandler), передаст ему зависимости и выполнит команду. Внедрение зависимостей настраивается один раз на старте приложения:

# src/presentation/dependencies.py
import functools
import di
from cqrs.requests import bootstrap as requests_bootstrap
from my_app.services import commands, handlers

container = di.Container()

# ... здесь мы биндим зависимости: БД, клиенты к внешним API, репозитории ...
def commands_mapper(m: cqrs.RequestMap) -> None:
    # Явно связываем команду с обработчиком
    m.bind(commands.CreateOrderCommand, handlers.CreateOrderHandler)


# ... маппинг других команд
@functools.lru_cache
def request_mediator_factory() -> cqrs.RequestMediator:
    # Собираем медиатор из коробки, прокидывая контейнер и маппинг команд
    return requests_bootstrap.bootstrap(
        di_container=container,
        commands_mapper=commands_mapper,
        # также можно передать queries_mapper и events_mapper
    )

Итог: Код легко тестировать, он предсказуем. Можно дергать команду CreateOrderCommand из API роута, из Kafka-консьюмера или из CLI-скрипта — бизнес-логика останется неизменной и изолированной.

❯ 3. Оркестрация долгих процессов: State Machine vs Saga

Фреймворк python-cqrs имеет «из коробки» поддержку паттерна Saga (Саги) для распределенных транзакций. Об этом подробно рассказал основатель фреймворка Вадим (на тот момент — тимлид нашей команды) в предыдущей статье, где приводился классический кейс биллинга: списание денег >> закрытие инвойса.

Саги идеальны для цепочек быстрых действий в разных сервисах, где при ошибке на любом из шагов можно сразу вызвать компенсирующую транзакцию (compensate) и откатить все назад.

В Timeweb Cloud мы активно используем Саги там, где это оправдано (например, в самом биллинге). Но для интеграций с внешними провайдерами сертификатов такой подход неприменим. Выпуск SSL — это долгий процесс. Нам не нужно «откатывать» заявку клиента и возвращать ему деньги только потому, что DNS-провайдер обновляет записи два часа. Нам нужно просто приостановить процесс, сохранить текущий стейт и продолжить работу (запустить поллинг) позже.

Поэтому здесь мы элегантно скрестили команды из python-cqrs со стейт-машиной (например, на базе python-statemachine). Стейт-машина выступает оркестратором, который просто кидает идемпотентные CQRS-команды на каждом шаге.

Вот как концептуально выглядит такая стейт-машина:

# src/services/state_machines/order_sm.py
from statemachine.states import States
from statemachine import Event
from my_app.services import commands

class OrderStatuses(enum.StrEnum):
    initial = enum.auto()
    making_external_order = enum.auto()
    installing_challenge = enum.auto()
    checking_challenge = enum.auto()
    finalizing = enum.auto()


class OrderStateMachine(BaseStateMachine):
    states = States.from_enum(OrderStatuses, initial=OrderStatuses.initial)
    # Описываем граф возможных переходов (наш флоу)
    process = Event(
        states.initial.to(states.making_external_order)
        | states.making_external_order.to(states.installing_challenge)
        | states.installing_challenge.to(states.checking_challenge)
        | states.checking_challenge.to(states.finalizing)
    )

    @states.making_external_order.enter
    @safe_call(need_retry_delay=True) # Декоратор для обработки ошибок внешних API
    async def making_external_order(self):
       # 1. Отправляем CQRS-команду на создание заказа на стороне провайдера
       result = await self.mediator.send(commands.ProviderMakeOrderCommand(
           domain=self.model.domain,
       ))
       # 2. Сохраняем полученные токены в нашу модель
       self.model.add_challenge(token=result.challenge_token)

    @states.installing_challenge.enter
    @safe_call(need_retry_delay=False)
    async def installing_challenge(self):
        challenge = self._get_challenge()
        # 1. Отправляем команду на установку токена (например, в DNS)
        await self.mediator.send(commands.InstallChallengeCommand(
            domain=self.model.domain,
            challenge=challenge.token,
        ))
        # 2. Нам нужно подождать обновления (уходим в сон)
        self.model.set_schedule_time(dt=datetime.datetime.now() + datetime.timedelta(minutes=15))
        self.should_stop = True # Приостанавливаем выполнение стейт-машины

Декоратор @safe_call перехватывает сетевые ошибки (например, если API провайдера упало). В этом случае стейт-машина не двигается дальше, а просто планирует повторную попытку на тот же шаг (retry delay). Так как наши CQRS-команды идемпотентны, мы можем безопасно повторять их сколько угодно раз.

❯ 4. Transactional Outbox: Гарантия доставки событий

Представьте: заказ успешно завершен и сохранен в нашу БД. Теперь нам нужно уведомить другие сервисы (биллинг, панель управления), что услуга активна. Мы кидаем ивент в Kafka. Но что, если процесс внезапно завершится ровно между коммитом в БД и отправкой события в Kafka? Заказ будет в базе, но клиенту панель управления его никогда не покажет. Это классическая проблема Dual-Write.

В python-cqrs эта проблема решается паттерном Transactional Outbox, который работает «из коробки».

Сначала мы объявляем доменные события как датаклассы:

# src/domain/events.py
import cqrs
import dataclasses

@dataclasses.dataclass(frozen=True)
class OrderCompleted(cqrs.DCDomainEvent):
    order_id: int

Затем используем готовый репозиторий, который умеет перехватывать эти события и сохранять их в таблицу Outbox в той же транзакции БД:

# src/infrastructure/db/repositories/outbox.py
import cqrs

class OutboxRepository(cqrs.SqlAlchemyOutboxedEventRepository):
    pass

Всё! При сохранении бизнес-модели фреймворк сам найдет сгенерированные ею события и сложит их в БД в рамках той же транзакции. Отдельный воркер из библиотеки (cqrs.EventProducer) асинхронно их прочитает и надежно закинет в шину. Никаких потерянных статусов.

❯ 5. Результаты: Как это влияет на продукт Timeweb Cloud?

Архитектура ради архитектуры не имеет смысла, если она не приносит продуктовой ценности. Внедрение python-cqrs и стейт-машин позволило нам добиться главного — скорости и отказоустойчивости обработки клиентских заявок. Раньше любые проблемы на стороне внешних провайдеров или задержки в обновлении DNS могли привести к тому, что заявки зависали, и их приходилось «проталкивать» вручную техподдержке. Автоматизация была, но она сыпалась при малейшем сбое.

Переход на python-cqrs позволил нам превратить процесс выпуска сертификатов (наш микросервис выпуска SSL) в надежный, отказоустойчивый конвейер:

  • Укротили хаос разных API: У провайдеров абсолютно разная логика подтверждения доменов. Паттерн CQRS из нашей библиотеки позволил скрыть эту специфику за едиными интерфейсами команд. Вся грязь ушла из контроллеров в изолированные обработчики, а оркестрация — в провайдер-специфичные стейт-машины.

  • Сделали сложные шаги безопасными: Установка челленджей (TXT/URL) через внутренние API (DNS-менеджеры, панели) теперь реализована как набор изолированных идемпотентных команд. Если внутренний сервис недоступен, мы безопасно повторяем команду, не ломая весь флоу заявки.

  • Решили проблему поллинга без блокировок: Вместо зависающих тасок мы реализовали надежное ожидание. Если DNS обновляется долго, стейт-машина рассчитывает время следующей проверки, сохраняет стейт и «засыпает». Позже планировщик просто кидает команду проверки через медиатор. Никаких заблокированных ресурсов.

  • Гарантировали консистентность: Как только сертификат установлен, библиотека python-cqrs через свой встроенный Outbox автоматически сохраняет доменное событие в одной транзакции с БД. Событие 100% улетает в Kafka, биллинг и панель управления. Клиент сразу видит защищенный сайт, и мы больше не теряем статусы из-за морганий сети.

Что мы получили в сухом остатке:

  1. Скорость: За счет асинхронности и умного поллинга мы свели к минимуму время от оплаты до готового SSL-сертификата. Заявки больше не ждут ручного вмешательства.

  2. Атомарность и изоляция: Благодаря разделению на Команды (CQRS), каждая операция внутри стейт-машины абсолютно атомарна и идемпотентна. Упал процесс на моменте скачивания сертификата? Не беда — команда просто выполнится заново.

  3. Удобство расширения: Интеграция нового провайдера больше не требует переписывания ядра сервиса. Мы просто добавляем новые Command Handlers, а HTTP-роуты и ядро стейт-машины остаются неизменными.

❯ 6. Заключение

Внедрение строгого разделения на Команды/Запросы (CQRS) и оркестрации через стейт-машину полностью преобразило нашу разработку. Бизнес-флоу больше не размазан по контроллерам, ошибки внешних API стали рутинным событием, которое обрабатывается автоматическими ретраями, а Kafka-ивенты никогда не теряются благодаря Transactional Outbox.

Мы решили не держать этот инструмент внутри Timeweb Cloud и выложили его для всех, потому что сами часто искали подобное решение для Python, но находили лишь обрывки паттернов. Если вы устали от лапшикода в распределенных транзакциях, хотите навести порядок в FastAPI контроллерах и забыть про проблему двойной записи — попробуйте python-cqrs.

А как вы у себя в проектах на Python решаете проблемы оркестрации долгих бизнес-процессов и гарантированной доставки событий? Делитесь болью (и решениями) в комментариях!


Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале

Комментарии (2)


  1. killersssurprise
    28.05.2026 12:22

    Горжусь своими студентами! Никита К., молодец!

    Вопрос возник небольшой.

    Для работы SqlAlchemyOutboxedEventRepository требуется SQLAlchemy сессия. В примере показано, что в хендлере команды вызывается self.outbox.add() и затем await self.outbox.commit(). Предполагается, что эта же сессия используется и для сохранения бизнес-агрегатов в базу данных?

    Как фреймворк заставляет или помогает разработчику использовать одну и ту же сессию и одну транзакцию для сохранения и агрегата, и записи в Outbox? В примере кода хендлера этого не видно. Если разработчик случайно создаст две разные сессии или не обернет вызовы в единую транзакцию, то паттерн Transactional Outbox сломается (возможна ситуация: бизнес-данные сохранены, а запись в Outbox — нет, или наоборот).

    Какие инструменты предоставляет фреймворк для внедрения зависимости (OutboxedEventRepository), чтобы она была привязана к scope запроса и всегда использовала ту же сессию, что и репозиторий агрегата? В README упоминается di контейнер и scope="request", но нет примеров или проверок, гарантирующих, что оба репозитория получат один и тот же объект сессии.


    1. DioneyaOfCthulhu Автор
      28.05.2026 12:22

      Привет! Вопрос хороший. Это действительно один из самых тонких и важных моментов в реализации Transactional Outbox, потому что без гарантии использования одной сессии весь паттерн теряет смысл.
      Сам фреймворк python-cqrs намеренно не управляет транзакциями (чтобы не привязывать вас жестко к конкретной ORM), он лишь предоставляет инструмент — SqlAlchemyOutboxedEventRepository.

      Гарантия использования одной сессии обеспечивается на уровне архитектуры самого приложения. В SSL сервисе (и в документации к библиотеке) это решается двумя классическими способами:

      1. Через паттерн Unit of Work (самый надежный). Создается единый фасад UoW, который внутри себя инициализирует session и явно прокидывает один и тот же инстанс сессии как в бизнес-репозиторий, так и в OutboxRepository. Хендлер работает только с UoW и делает единый uow.commit().

      2. Через DI-контейнер (Scope Request). При инициализации DI мы регистрируем AsyncSession со скоупом request. Контейнер (например, di или FastAPI.Depends) сам гарантирует, что и бизнес-репозиторию, и Outbox-репозиторию при инъекции достанется один и тот же синглтон сессии в рамках текущего запроса.

      То есть фреймворк дает "кубик" Outbox'а, а связать его с агрегатом единой транзакцией помогает паттерн UoW или грамотно настроенный DI.