Всем привет!
В прошлой статье была поставлена задача о надёжных мутациях и транзакциях в архитектуре Профи, в этой статье разберём один из вариантов решения — применить workflow-engine Temporal.

Почему именно Temporal

Впервые я наткнулся на Temporal в Technology Radar от ThoughtWorks, где запомнилась эта цитата:

Although we don’t recommend using distributed transactions in microservice architectures, if you do need to implement them or long-running Sagas, you may want to look at Temporal. https://www.thoughtworks.com/en-us/radar/platforms/temporal

Помимо Temporal существуют ещё десятки workflow engines, однако, если пройтись по ним и оставить только решения с приличным количеством звёзд, то список сильно сократится. Далее применяем фильтр по потенциальной подходимости нашим критериям и остаётся 2-3 кандидата, один из которых — Temporal. Фичи, которые сходу показались интересными:

  • Алгоритмы транзакций и мутации можно описывать кодом на Go, TypeScript, PHP, Python и т.п.

  • При этом код выполняется децентрализованно — Temporal только координирует.

  • При ошибках поддерживаются повторы.

  • Есть GUI, где можно отслеживать процессы и управлять транзакциями.

  • В целом неплохая документация.

Как работает Temporal

Если коротко и упрощённо то получается:

  • Есть сценарии (далее workflow).

  • Сценарии состоят из шагов (далее activity).

  • Temporal-кластер оркестрирует запуск workflow и activity

  • Сами workflow и activity непосредственно выполняются распределёнными воркерами.

  • Ну и есть приложения (в широком смысле), которые соединяются с Temporal-кластером и говорят — запусти такой-то workflow.

Если хотите разобраться в деталях, то рекомендую следующую последовательность:

Задача про вынос биллинга

Профи — это маркетплейс, где есть специалисты предоставляющие услуги и клиенты, а основной бизнес-сценарий выглядит так:

  • Клиент оставляет заказ на услугу.

  • Специалист, если заказ ему понравился, отправляет на него заявку, которая в общем случае — платная.

  • После оплаты заявки, специалисту открывается чат с контактами клиента, где можно обсудить детали заказа.

Биллинг при этом входит в общую царь-транзакцию, которая обеспечивает консистентность продуктовых и финансовых данных — деньги уплачены, специалист может общаться с клиентом и т.п.

Какие тут есть проблемы:

  • Если падает монолит, то падает и биллинг, который используется не только в процессе заявки.

  • Хранение финансовых и продуктовых данных в одной БД, конечно, не катастрофа. Однако данные, влияющие на финансовую отчётность и пробитие чеков лучше хранить отдельно от продуктовых данных у которых, так сказать, произвольный уровень качества :)

  • Если вдруг падает БД или моргнула сеть, то мы теряем корни заявки. Такое случается крайне редко, так что аргумент скорее теоретический, но круто иметь механизм, который допинает заявку в случае очень неприятных ситуаций.

А вот такую картинку хочется получить в итоге: разделить сценарий на шаги и выполнять их с надёжностью сравнимой с классическими MySQL транзакциями.

Собираем прототип

Очень круто то, что Temporal позволяет собрать прототип используя уже существующий код:

  • Поднимаем Temporal кластер;

  • Создаём Temporal-совместимые классы для workflow и activity;

  • Добавляем вызовы существующей бизнес-логики внутрь созданных классов;

  • Запускаем воркер, который выполняет workflow и activity.

  • Внедряем код запуска workflow в текущую бизнес-логику.

Запускаем кластер

Тут всё просто и хорошо описано в официальной инструкции — пара команд и сервер установлен. Для запуска сервера в dev-режиме вбиваем в консоль temporal server start-dev --ip 0.0.0.0 и проверяем, что по адресу ваш_хост:8233 отдаётся GUI:

Стартовый код

Примерно так выглядит изначальный код сценария заявки с царь-транзакцией:

<?php

class OrderApplier {

  public function apply(
    int $orderId,
    string $specId,
    OrderApplyParams $params,
  ): int
  {
    if (!$this->applyValidator->couldBeApplied($orderId, $specId, $params)) {
      throw new CouldNotApply();
    }
    $apply = null;
    $chatTask = null;
    $this->db->beginTransaction();
    try {
      $apply = $this->applyWriter->createApply($orderId, $specId, $params); 
      $this->ordersWriter->applyForOrder($orderId, $apply); 
      
      // Внутри billApply — Несколько INSERT-ов и UPDATE-ов фин-таблиц
      $this->billing->billApply($apply);

      // Cохраняем таску в outbox таблицу
      $chatTask = $this->tasks->createTask(new CreateApplyChatTask($apply)); 
      $this->db->commit();
    } catch (Throwable $e) {
      $this->db->rollBack();
      throw $e;
    }
    // В конце выполнения этой таски запускается следующая — пост-процессинг 
    // (сброс кешей, аналитика и т.п.)
    $this->tasks->queueTask($chatTask);
    return $apply;
  }
}

Создаём activities

Чтобы создать activity нужно определить Temporal-совместимый интерфейс с методами, которые возвращают сериализуемые значения. Но на самом деле работать будет и без отдельного интерфейса — достаточно просто добавить нужные аннотации в обычные PHP классы.

Предварительная проверка (через имплементацию интерфейса)

<?php

#[ActivityInterface]
interface OrderApplyValidatorInterface {
  public function couldBeApplied(
    int $orderId,
    string $specId,
    OrderApplyParams $params
  ): bool;
}

class OrderApplyValidator implements OrderApplyValidatorInterface {
    public function couldBeApplied(
      int $orderId,
      string $specId,
      OrderApplyParams $params
    ): bool { 
      ...
      // бизнес-логика проверки
      ...
      return true;
    }
}

Запись заявки в базу (без имплементации интерфейса)

<?php

class CreateApplyResult {
  public ?Apply $apply;
  public int $errorCode = 0;
}

#[ActivityInterface]
class OrderApplyCreator {
  public function createApply(
    int $orderId,
    string $specId,
    OrderApplyParams $params
  ): CreateApplyResult { 
    $this->db->beginTransaction();
    try {
      $apply = $this->applyWriter->createApply($orderId, $specId, $params); 
      $this->ordersWriter->applyForOrder($orderId, $apply); 
      $this->db->commit();
      return new CreateApplyResult($apply);
    } catch (Throwable $e) {
      $this->db->rollBack();
      return new CreateApplyResult(null, $e->getCode());
    }
  }
}

Биллинг

<?php

class BillApplyResult {
  public int $errorCode = 0;
}

#[ActivityInterface]
interface OrderApplyBillerInterface {
  public function billApply(Apply $apply): BillApplyResult;
}

class OrderApplyBiller implements OrderApplyBillerInterface {
    public function billApply(Apply $apply): BillApplyResult { 
      ...
      // локальная транзакция биллинга
      ...
      return new BillApplyResult(...);
    }
}

Создание чата

<?php

#[ActivityInterface]
interface OrderApplyChatCreatorInterface {
  public function createChat(Apply $apply): void;
}

class OrderApplyChatCreator implements OrderApplyChatCreatorInterface {
  public function createChat(Apply $apply): void { 
    ...
    // бизнес-логика создания чата
    ...
  }
}

Ну в общем понятно — постпроцессинг делается по аналогии.

Важный момент — все аргументы методов и возвращаемые значения должны быть либо сериализуемыми, либо имплементировать интерфейс Data Converter.

Создаём workflow

Как и в случае с activity, для workflow нужно создать Temporal-совместимый интерфейс, опять же — не обязательно, но мы сделаем как в официальных доках:

<?php

class ApplyForOrderResult {
  public ?Apply $apply;
  public int $errorCode = 0;
}

#[WorkflowInterface]
interface ApplyForOrderWorkflowInterface {
  #[WorkflowMethod("applyForOrder")]
  #[ReturnType(ApplyForOrderResult::class)]
  public function applyForOrder(
    int $orderId,
    string $specId,
    OrderApplyParams $params
  );
}

Далее создаём класс, который имплементирует этот интерфейс — в нём и реализуется основная логика нашей надёжной транзакции:

<?php

class ApplyForOrderWorkflow implements ApplyForOrderWorkflowInterface
{
  public function __construct()
  {
    // Создаём activity-стабы (RPC) для вызова activity-методов
    $this->validator = Workflow::newActivityStub(
      OrderApplyValidatorInterface::class,
      ActivityOptions::new()->withStartToCloseTimeout(
        \DateInterval::createFromDateString('3 seconds')
      )
    );

    $this->creator = Workflow::newActivityStub(
      OrderApplyCreator::class,
      //...
    );

    // Пример того, как можно создать activity-стаб для activity,
    // которая реализована в другом сервисе или на другом стеке
    $this->biller = Workflow::newUntypedActivityStub(
      ActivityOptions::new()->withStartToCloseTimeout(
        \DateInterval::createFromDateString('5 seconds')
      )
    );

    $this->chat = Workflow::newActivityStub(
      OrderApplyChatCreatorInterface::class,
      //...
    );
  }

  public function applyForOrder(
    int $orderId,
    string $specId,
    OrderApplyParams $params
  )
  {
    $couldBeApplied = yield $this->validator->couldBeApplied(
      $orderId, $specId, $params
    );
    if (!$couldBeApplied) {
      return new ApplyForOrderResult(null, 1);
    }

    $createResult = yield $this->creator->createApply(
      $orderId, $specId, $params
    );
    if ($createResult->errorCode != 0) {
      return new ApplyForOrderResult(null, $createResult->error_code);
    }

    // Пример того, как можно вызвать activity из другой кодовой базы — по имени.
    $billingResult = yield $this->biller->execute(
      'billApply', 
      [$createResult->apply], 
      BillApplyResult::class // Мапим результат на внутренний класс
    );
    if ($billingResult->errorCode != 0) {
      // 
      // @todo сюда можно добавить логику отката заявки т.к. биллинг не прошёл
      //
      return new ApplyForOrderResult(null, $billingResult->error_code); 
    }

    yield $this->chat->createChat($createResult->apply);

    return new ApplyForOrderResult($createResult->apply);
  }
}

На что здесь стоит обратить внимание:

  • При создании activity-стабов можно указать различные настройки — таймауты, рейт-лимиты и т.п.

  • Activity могут находиться в разных микросервисах и могут быть написаны на разных языках.

  • Конструкцию $foo = yield bar() можно воспринимать как аналогию await в TypeScript или Python. Если интересно, то можно почитать про механику этой непривычной для PHP конструкции.

Создаём и запускаем воркер

Temporal работает через RoadRunner — высокопроизводительный инструмент, позволяющий эффективно запускать PHP-приложения в долгоживущем режиме.

Настраиваем RoadRunner

temporal:
  address: ${хост_порт_temporal_кластера}
  activities:
    num_workers: 8
  
rpc:
  enable: true
  listen: tcp://127.0.0.1:6001

server:
  command: "php worker.php"

Пишем скрипт воркера

<?php

...

$container = Container::getInstance();
$factory = WorkerFactory::create();
$worker = $factory->newWorker();

// Регистрируем какие workflow будет выполнять воркер
$worker->registerWorkflowTypes(
    ApplyForOrderWorkflow::class    
);

// Аналогично регистрируем activity

$worker->registerActivity(
    OrderApplyValidator::class,
    fn(ReflectionClass $class) => $container->get($class->getName())
);

... 

$worker->registerActivity(
    OrderApplyChatCreator::class,
    fn(ReflectionClass $class) => $container->get($class->getName())
);

$factory->run();

Запускам воркер через RoadRunner и слушаем очередь

Наблюдаем за воркерами

Запускаем workflow

Осталось последнее — добавить запуск workflow в бизнес логику и протестить, что всё работает. Workflow можно запускать синхронно и асинхронно. В нашем случае мы пойдём по синхронному пути т.к. фронтенд пока не готов к асинхронному созданию заявки, а для прототипа достаточно будет и синхронного варианта. Однако, 99% workflow, конечно же, должны работать асинхронно т.к. повторы, задержки и вот это вот всё.

Код синхронного запуска workflow выглядит так:

<?php
...
$result = $this->apply_workflow->applyForOrder(
  $orderId,
  $specId,
  $params
);
...

Тестируем создание заявки

Находим активного специалиста с балансом, заходим под ним в тестовый бекофис и отправляем заявку:

Заявка успешно отправлена, деньги списаны, чат создан:

Идём в GUI Temporal и смотрим на логи нашего workflow:

Обработка падений

Во время работы над прототипом я случайно и намеренно создавал ситуации, при которых workflow не отрабатывал:

  • Исключения и ошибки в коде activity.

  • Выключенные воркеры.

  • Недетерминированный код workflow.

  • Ошибка MySQL has gone away т.к. код не был приспособлен к долгоиграющим скриптам.

  • и т.п.

Все эти кейсы Temporal успешно отработал, даже сценарии с недетерминированным кодом, а вот так выглядит ошибки в GUI:

Вроде бы всё рассказал, пора подводить итоги.

Плюсы Temporal

Универсальность

  • Поддерживает весь наш стек — PHP, TypeScript и Python.

  • Позволяет решать широкий класс задач: надёжные мутации, транзакции, отложенные задания, рассылки, кроны, флоу бизнес-моделей и т.п. — всё это можно реализовать через Temporal. Например, особенно взрывает мозг workflow подписок, который работает на протяжении всего жизненного цикла пользователя.

  • Текущую бизнес логику довольно легко засунуть в activity и написать императивный алгоритм в workflow. Правда у кода самого workflow есть ограничения, но в activity можно творить всё что угодно.

  • Более простой путь по интеграции ML-алгоритмов в наши бизнес-процессы.

  • В будущем можно запилить инструмент для продактов, которые смогут в GUI собирать бизнес-сценарии из activity и играть между ними в А/Б тесты.

Надёжность

  • Temporal изначально был разработан в Uber, также его используют такие бигтехи как Snap и Netflix — это внушает доверие.

  • Если workflow стартовал, то Temporal выполнит его до конца c учётом retry policy. Даже если упали воркеры, отвалились сервисы, что-то пошло не так — workflow будет выполнен.

  • Для activity можно определять rate limits и retry policy.

  • Ошибки можно обработать в коде workflow и тут же запустить откат-сценарии.

Разные плюшки

Минусы Temporal

Риск развития микросервисного монолита

  • Хоть номинально все RPC вызовы и проходят через один Temporal кластер, но гибких возможностей выстраивать политику кто и что может, вроде как, нельзя. Поэтому сервисы практически бесконтрольно могут дёргать друг-друга.

Высокий порог входа

  • Разработчикам обязательно нужен онбординг и обучение — едва ли можно сходу разобраться в том, как эффективно “программировать” на Temporal.

  • Поскольку Temporal становится одним из основных компонентов инфраструктуры, то devops-ы должны знать все детали и особенности его работы в production, а также уметь быстро чинить проблемы.

Разные неприятности

  • Нужно следить за зоопарком воркеров — за статусом, за актуальностью кода и т.п. Это важно как для прода, так и для окружения разработки.

  • Код workflow должен быть детерминирован, однако встроенная поддержка версионирования для workflow и воркеров позволяет реализовать обратную совместимость.

  • Сигнатуры методов workflow и activity в идеале не должны меняться.

  • 99% workflow будут запускаться асинхронно — нужно допиливать клиентов, чтобы статус процесса отображался корректно.

  • Оверхед на код — придётся писать больше кода, но во времена copilot-а это не такая уж и серьезная проблема.

Выводы

На уровне прототипа Temporal впечатляет универсальностью и надёжностью. Однако, без осознанного потребления может превратить нормальную распределённую систему в микросервисный монолит со всеми вытекающими.

Очень хочется попробовать — начать с каких-то простых задач и постепенно переходить к чему-то более серьёзному. Если в мы итоге заюзаем этот фреймворк, то я обязательно напишу о реальном опыте в проде.

Спасибо что дочитали!
Пока :)

P.S. Если среди вас есть те, кто уже применяет Temporal, то буду благодарен за кейсы в комментариях.

Комментарии (16)


  1. baldr
    06.11.2023 11:18

    Спасибо за статью. Однако, это больше на PHP нацелено.

    Лично мне был бы больше интересен пример с Python, а особенно бы интересовало сравнение с, например, Airflow. Если выбирать для Python-стека - то вы бы выбрали Temporal, Airflow или просто на базе Celery построили свой фреймворк?


    1. Lachezis
      06.11.2023 11:18
      +2

      Основная разница в подходе управления потоком workflow. В Airtable как и большистве похожих решений идет распаковка DAG, т.е. если ваша логика ложится на него - все супер. В темпорал все управление императивное - простой код, т.е. можно и DAG (да, многие переносят свой DSL на Temporal), а можно и без него.

      Если упрощенно - Airflow управляет стейтом сам, Temporal дает вам дергать за ниточки.

      Разница становится очень заметной когда ваши бизнес процессы не ложаться на простую цепочку вызовов - циклы, условия, динамическое кол-во действий. Ну и плюс - все работает на множестве языков из коробки.


  1. Lachezis
    06.11.2023 11:18
    +1

    У нас весь ML стек гоняется через Temporal, нарадоваться не можем. Часть логики на Python, все управление на PHP.

    Из интересных паттернов что могу отметить при работе, очень удобно упаковывать определение стабов активити в трейты.

    class MyWorkflow {
        use SomeActivity;
    
        public function run() {
             yield $this->someActivity()->doSomething();
        }
    }
    

    Сильно упрощяет управление теми активити где политики запуска, task queue и ретраи не меняются в зависимости от родительского воркфлоу.


    1. em1nx Автор
      06.11.2023 11:18

      Спасибо за идею )
      А где у вас лежат эти трейты? Все вместе в одном неймспейсе или раскиданы по кодовой базе? Как разработчики ищут activity, чтобы случайно не написать уже существующий?


      1. Lachezis
        06.11.2023 11:18

        Каждый сервис имеет свой нейспейс где описаны интефейсы его активити + трейт для PHP. Это в идеале.


    1. roxblnfk
      06.11.2023 11:18
      +1

      А ещё можно пойти по пути упрощения фабрик Activity/Workflow. Код выглядит так:

      StubFactory::activity(FooActivity::class, retryAttempts: 1)->foo($foo, $bar)

      TaskQueue и прочие настройки, если постоянные, берутся из атрибутов. Те, что определяются по месту - передаются аргументами.


      1. Lachezis
        06.11.2023 11:18

        Учитывая что Workflow это статичный фасад на контекст то писать такие фабрики вполне допустимо, как и заворачивать часть логики в хелперы.


  1. roxblnfk
    06.11.2023 11:18
    +2

    • Хоть номинально все RPC вызовы и проходят через один Temporal кластер, но гибких возможностей выстраивать политику кто и что может, вроде как, нельзя. Поэтому сервисы практически бесконтрольно могут дёргать друг-друга.

    Этот недостаток можно попробовать решить интерцепторами. Тут сложность возникнет с тем, чтобы описать их на всех языках, на которых написаны Workflow. Но если у вас только PHP, то аля deptrac|nocolor на интерцепторах - интересный вызов :) Интерцепторы уже можно щупать, установив SDK версии 2.7.0@dev.

    Примеры некоторых доступны в семплах https://github.com/temporalio/samples-php/pull/40/files


    1. em1nx Автор
      06.11.2023 11:18

      О, это интересно, спасибо!

      Но если у вас только PHP

      У нас TypeScript, PHP и ещё немного Python. П

      интересный вызов

      Основная сложность в том, что у нас уже есть компонент, через который проходит весь трафик — единое API, на нём мы и отслеживаем выполнение разных политик. Добавляя Temporal, мы получаем второй компонент, хотя, наверное, можно придумать как пропускать Temporal трафик через единое API.


  1. ggo
    06.11.2023 11:18
    +3

    • Хоть номинально все RPC вызовы и проходят через один Temporal кластер, но гибких возможностей выстраивать политику кто и что может, вроде как, нельзя. Поэтому сервисы практически бесконтрольно могут дёргать друг-друга.

    Странный тезис.

    Что мешает передавать в аргументах любую авторизационную информацию и проверять ее в провайдерах? В случае ошибки - бросать Non retryable exception.

    Аналогично.

    Любые сигнатуры - это API. Что внешний API (например REST), что внутренний API (например IoC) налагают требования по контролю целостности API со стороны консьюмера и провайдера. Темпорал в этом плане ничего нового не привносит.

    • Оверхед на код — придётся писать больше кода, но во времена copilot-а это не такая уж и серьезная проблема.

    Оверхед по сравнению с совсем простым кодом - без перевызова, без таймаутов, без альтернативных сценариев, без ролбеков.

    А если сравнивать код со всем этим, в двух реализациях - с темпорал и без, то код с темпорал существенно меньше.

    В целом, Temporal - must have инструмент, в микросервисной архитектуре.


    1. em1nx Автор
      06.11.2023 11:18

      Что мешает передавать в аргументах любую авторизационную информацию и проверять ее в провайдерах? В случае ошибки - бросать Non retryable exception.

      В провайдерах проверять — можно. Я имел ввиду, что нет общего компонента, который бы авторизовывал вызовы между сервисами на основе заголовков/директив и т.п.


      1. ggo
        06.11.2023 11:18

        С этим согласен


      1. Lachezis
        06.11.2023 11:18

        Всегда можно сделать заголовки авторизации и ловить их в интерсепторах.


    1. em1nx Автор
      06.11.2023 11:18

      Темпорал в этом плане ничего нового не привносит.

      По сравнению с REST и GQL, которым мы пользуемся сейчас, Temporal работает без схемы (OpenAPI, GQL и т.п.) поэтому следить за контрактами будет сложнее. Наверное так можно переформулировать "минус" про сигнатуры.


      1. roxblnfk
        06.11.2023 11:18

        Переход на protobuf не решит вашу проблему?
        В сигнатуре InputDto и ResultDto. Контракты в proto-файлах.


        1. em1nx Автор
          06.11.2023 11:18

          Не очень понял что такое "переход на protobuf" в контексте решение проблемы отсутствия схемы у Temporal.