

В жизни каждого человека бывают моменты, когда приходит время писать бизнес логику и чтобы вернувшись через полгода все было более чем понятно. Все зависит от того, зайдете ли вы в ту дверь.
Поделюсь небольшим опытом, который сформировался в процессе перехода с "dumb rpc поверх nats" на полноценные воркфлоу в рамках разработки слоя биллинга для хостинга.
Lets get it started
Рассмотрим классический случай, когда стоит задача организовать взаимодействие с другим сервисом. Шаги, которые вы пройдете будут следующими:
Поднимаете любую известную шину данных (Rabbit, Kafka, Nats, etc);
Строите связь между сервисами через kafka топики/nats сабжекты/очереди rabbitmq;
Делаете де-/сериализацию данных;
Попутно костылите логику, например, для retry и backoff;
Какие моменты возникают:
Отсутствие контроля выполнения. Упал воркер - вы потеряли весь прогресс;
Зависимость от стабильности шины данных;
Необходимость в скотче для скрепления всей конструкции;

Temporal как гейм-ченжер бизнес логики вашего приложения
Temporal закрывает все 4 базовых пункта сразу, но при этом предоставляет ещё больше инструментов для создания легко масштабируемых сервисов.
В Temporal есть 2 основные сущности:
Workflow - ближайшая аналогия это CI пайплайны. Workflow просты и изящны в своей реализации. Вызывают необходимую логику через Activity.
Отказоустойчивы по умолчанию;Activity - своего рода RPC методы. Могут жить где угодно. Не зависят от ЯП и встраиваются буквально в любом месте приложения.
Например, в случае с NestJS activity методы могут жить в основном инстансе вашего api сервиса и использовать DI без каких-либо ограничений.
Отказоустойчивы по умолчанию;
А также 4 дополнительных, которые вам скорее всего понадобятся:
Query - метод workflow, который может вернуть хранимое состояние. Например, на каком этапе находится выполнение воркфлоу.
Signal - callback от workflow, который может выполнять любую логику. Наиболее частое использование - обработка событий извне, чтобы как-либо влиять на выполнение воркфлоу.
Timer - аналог sleep.
Condition - асинхронная функция, которая вернет true/false если выполнилось обозначенное условие. Например, ждем событие, обрабатываем в signal callback и condition вернет true, после чего выполнение воркфлоу может продолжиться.
Практическое применение. Учимся делать красиво c Temporal
Нам нужен durable execution. Завершить любой ценой. Упали - поднялись. Поднялись - продолжили.
Основная логика хранится в виде Workflow (те-самые ci-like пайплайны).
retry/backoff made right. Temporal сам повторит вызов в случае ошибки (вы по-прежнему можете контролировать и выбрасывать nonRetryable исключение);
cron made right. Temporal из коробки умеет вызов задач по крону, при этом вам не нужно возиться с реализацией как самого крона, так и с блокировками через Redis/Consul/etc в случае работы нескольких инстансов приложения (привет разработчикам Spring). Запуск произойдет строго на одном воркере за раз.
Я не стану придумывать эфемерные ситуации, а опишу то, с чем столкнулся сам в рамках одного из проектов хостинга, в котором принимаю участие.
Управление услугами должно быть надежным как для клиента, так и администратора. От этапа создания услуги до этапа освобождение ресурсов без оставления dangling сущностей.
Вводные:
REST API (далее backend) для личного кабинета, принимает все запросы с UI;
Внешний сервис на Python (далее py-worker), работающий с виртуализацией;
Критическая необходимость гарантированного предоставления услуги без всяких "но". Получили запрос на создание - обязательно обработали;
Как было раньше
Вызовы от backend к py-worker проходили через NATS по модели request-reply в рамках транзакции с блокировками на уровне базы данных.
Хоть это и работало, но с массой минусов, одни из которых:
Никакой гарантии, что NATS будет доступен в момент вызова (в том числе повторного вызова).
Необходимость переизобретения retry + backoff и логика постепенно усложнялась;
Переизобретение своего "формата" общения между сервисами с `{status: "OK"}` или `{error: "Something has failed"}`
Как это реализовано сейчас:
Вся основная логика сформирована в workflow;
Добавляется новый сервис billing-worker. Он запускает воркер для Temporal и готов запускать workflow.
Итоговый воркфлоу (упрощен в целях демонстрации декларативности логики) получился следующим:
type CreateHipletWorkflowArgs = {
hiplet: { id: number };
region: { options: { default_network_uuid: string } };
};
export const hipletCreatedSignal = wf.defineSignal('hiplet_created');
const { set_instance_id, set_hiplet_ip } = wf.proxyActivities<IHipletsActivities>({
startToCloseTimeout: '1m',
});
// Кросс вызовы методов воркера для облака
const cloud_activities = wf.proxyActivities({
taskQueue: 'cloud-queue',
startToCloseTimeout: '1m',
});
export async function hipletCreate(args: CreateHipletWorkflowArgs): Promise<void> {
const { hiplet, region } = args;
// Переменные могут выступать как место для хранения состояния воркфлоу
let is_created_by_signal = false;
// Callback, который выполнится в случае если в воркфлоу придет сигнал
wf.setHandler(hipletCreatedSignal, () => {
is_created_by_signal = true;
});
// Вызов метода от py-worker, который общается с системой управления виртуализацией
const port = await cloud_activities['port.create']({
name: `hiplet-${hiplet.id}`,
network_id: region.options.default_network_uuid,
});
const ips = extractFixedIPs(port.fixed_ips);
await set_hiplet_ip({
hiplet_id: hiplet.id,
ipv4: ips.ipv4,
ipv6: ips.ipv6,
});
wf.log.info(`Port created. ID: ${port.id}`);
const payload = {
hiplet_id: hiplet.id,
port_id: port.id,
ips: ips,
};
const server = await cloud_activities['instance.create'](payload);
await set_instance_id({
hiplet_id: hiplet.id,
instance_id: server.id,
});
// Ждем в течение 5 минут сигнал от биллинга, что услуга создана
const is_created = await wf.condition(() => is_created_by_signal, '5m');
if (!is_created) {
// Если сигнал не пришел, то выбрасываем ошибку. Перезапускаем воркфлоу
throw wf.ApplicationFailure.retryable('Hiplet create timeout');
}
}
Отправка сигнала с биллинга происходит при получении события от виртуализации, что услуга была создана. С точки зрения разработчика все как нельзя просто:
await this.temporal.signalWorkflow(`hiplet:create:${hiplet.id}`, 'hiplet_created');
Что получаем благодаря Temporal:
Декларативная бизнес-логика в одном месте;
Не зависимы от стабильности внешних шин данных;
Temporal будет пытаться выполнить все описанные этапы в воркфлоу. Если воркер упадет на одном из этапов, то выполнение продолжится с того же места на другом воркере. Применимо в том числе к for loop с индексами.
Рассмотрен базовый спектр возможностей, который закрывает бОльшую часть потребностей в рамках durable execution.
Итог
Какие пл��сы вынесли с Temporal:
Детальный observability по времени выполнения каждого воркфлоу и каждого этапа;

Выпилили NATS для работы с клиентскими услугами. Все действия переведены в формат воркфлоу, что гарантирует надежность выполнения.
Тысячи выполненных воркфлоу. Ни одна клиентская услуга не потерялась.
Например, иногда могут закончиться IP или вычислительные ресурсы. Temporal будет из раза в раз запускать воркфлоу пока услуга не будет создана.Возможность изменения логики без серьезных последствий.
Temporal поддерживает версионирование воркеров.Нововведения на уровне Temporal никак не затрагивают другие компоненты биллинга. Зоны ответственности разделены.
В случае если у вас появятся вопросы в процессе прочтения, буду рад ответить и поделиться опытом.