При разработке программных продуктов иногда может появиться задача разработки многосоставного ступенчатого процесса. На каждом этапе которого нужно предусмотреть обработку ошибок, повторное выполнение операции, откат с начальному состоянию в случае ошибки и тд. Возникают вопросы о том, как реализовать обработку критических ситуаций, как хранить состояние процесса и тд. Если процесс сложный и длительный по времени, то реализация этой задачи может быть непростой для разработки и тестирования всех вариантов развития процесса.
В этом случае можно подумать о применении оркестраторов. В этой статье я расскажу про temporal и покажу пример кода. В конце приложу ссылку на исходный код, так что можно будет скачать и самому поиграться с настройками, процессами и тд.
Краткое инфо
Чем temporal не является:
Это не планировщик задач (cron). Хотя он может запускать задачи по расписанию, но он не заменяет cron напрямую, потому что запускать сервер temporal слишком избыточно; нет встроенной визуализации расписаний и есть ограничения в гибкости cron выражениях
Это не очередь сообщений. Temporal - это не kafka, rabbit и тд. Он не предназначен для простой передачи сообщений между сервисами.
Это не база данных. Хотя temporal и хранит состояние процессов, но он не предназначен для использования как СУБД.
А что же это такое:
Temporal - это оркестратор, который управляет вашим кодом. Он гарантирует надежное выполнение долгоживущих операций в распределенных системах. В нем есть встроенная поддержка ретраев с настраиваемыми таймаутами и сохранение состояния процесса с возможностью запуска с места остановки.
Важная особенность - это описание оркестрируемого процесса в коде с небольшим количеством добавок в виде аннотаций и интерфейсов.
На официальном сайте можно найти документацию по инструменту для многих языков (Go, Java, Python ...). Кроме этого на официальном сайте есть ссылка на слак, где можно задать вопросы разработчикам и другим пользователям Temporal. В этом канале еще есть ИИ, которая обучена на документации.
Преимущества temporal
Отказоустойчивость: Выдерживает падения сервисов, сетевые сбои, перезапуски инфраструктуры.
Долгоживущие процессы: Можно легко управлять процессами, которые длятся дни или месяцы (например, пробная подписка или онбординг).
Простота разработки: Вся логика пишется как простой последовательный код. Исчезает необходимость вручную управлять очередями, повторными попытками (retries) и компенсирующими транзакциями (sagas).
Визуализация и отладка: UI Temporal показывает полную историю выполнения любого workflow, что сильно упрощает отладку.
Несколько важных терминов
Вначале нужно упомянуть несколько терминов, чтобы потом все понятно было:
Рабочий процесс (Workflow):
Workflow - это процесс, состоящий из нескольких элементарных шагов. Это может быть, например, процесс модерации поста, процесс перевода денег с одного счета на другой, процесс бронирования жилья, билетов и тд.
Этот процесс мы оп��сываем в коде, так что визуально сложно отличить от обычной функции, в которой есть последовательные вызовы методов.
Детерминированный: При повторных запусках с теми же данными должен давать одинаковый результат, рекомендации ниже раскроют этот пункт.
Рекомендации для написания workflow:
Не используйте глобальные изменяемые переменные в своих имплементациях Workflow. Это гарантирует, что различные Wokrflow изолированы.
Код workflow должен быть детерминирован. Не вызывайте недетерминированные функции (например
UUID.randomUUID()) напрямую из кода Workflow. Temporal SDK предоставляет API для вызова недетерминированного кода.Не используйте конструкции языка для получения системного времени, вместо этого используйте только
Workflow.currentTimeMillis()для получения текущего времени в коде Workflow.Не используйте
Threadили другие классы многопоточности какThreadPoolExecutior. Вместо этого для выполнения асинхронного кода используйтеAsync.funtionилиAsync.procedureпредоставляемые Temporal SDKНе используйте синхронизацию, локи или другие стандартные классы для синхронизации кода кроме тех, что поставляются классом Workflow. Нет явной необходимости в синхронизации кода, потому что код Workflow выполняется в одном потоке под глобальным локом.
Вызывайте
Workflow.sleepвместоThread.sleepИспользуйте
PromiseиCompletablePromiseвместоFutureиCompetableFutureИспользуйте
WorflowQueueвместоBlockingQueueИспользуйте
Workflow.getVersionкогда изменяете Workflow. Без этого старые процессы могут упасть из-за несоответствия процесса
Деятельность (Activity):
Это конкретный шаг нашего процесса. Для примера можно назвать: вызов внешнего API, сохранение состояния в базу, отправка нотификации пользователю и тд
Activity уже может содержать недетерминированный код, а кроме того в активити мы можем внедрять наши бины.
Здесь еще нужно упомянуть, что входные и выходные объекты для Activity должны быть сериализуемыми, чтобы Temporal мог их сохранить в базу. Кроме этого, есть ограничение на объем.
Namespace
Namespace - это пространство для разделения разных Workflow. Разработчики Temporal предлагают использовать этот механизм для разделения процессов в dev и prod средах. Сомнительно, но окей.
В итоге получается вот такая схема из этих компонентов:

Некоторые детали реализации Temporal
В Temporal реализован подход
Event Sourcing. Temporal не хранит текущее состояние вашего workflow. Вместо этого он хранит историю событий (запущен, вызвана активность X, активность X завершена успешно, и т.д.). При перезапуске воркер «проигрывает» эту историю, пропуская шаги, которые уже были выполнены, чтобы восстановить последнее состояние. По этой причине большое количество активностей вызывает большое количество записей в бд. И именно поэтому изменение Workflow может вызывать ошибку, если сделать это неправильно.В Temporal используется версионирование процессов, поэтому при модификации с нарушением совместимости можно выбрать подходящий процесс, чтобы не поломать старое и добавить новое. Это стоит применять, если есть активные workflow со старым алгоритмом.
Гарантия «ровно-один раз»: Благодаря сохранению состояний и ретраям, каждая деятельность будет выполнена ровно один раз, даже если для этого потребуется несколько попыток. Кроме этого, у каждого workflow есть идентификатор, поэтому запустить дублирующий workflow со второго инстанса вашего приложения не получитчся.
Пример кода
Идея проекта
Напишем пример на java для модерации поста. Этот процесс будет состоять из 3х шагов:
Проверка поста на количество символов
Проверка поста на содержание запрещенных слов
Проверка на везение Если какой-то из этап возвращает отрицательный ��езультат, то мы заканчиваем процесс модерации и сохраняем отрицательный результат модерации. В дальнейшем можно сюда добавить еще функциональности, чтобы показать разные возможности temporal.
Полный код проекта можно посмотреть в git репозитории, ссылку смотрите в конце.
Развертывание temporal
Temporal состоит из нескольких компонентов, которые можно развернуть в докере или кубере. Для простоты напишем docker compose для поднятия всех необходимых компонентов. В первом приближении есть такие компоненты:
Сервер temporal
UI temporal
База данных для temporal
Docker compose
services:
postgresql:
container_name: temporal-postgresql
environment:
POSTGRES_PASSWORD: temporal
POSTGRES_USER: temporal
image: postgres:15
networks:
- temporal-network
ports:
- 5454:5432
volumes:
- /var/lib/postgresql/data
temporal:
container_name: temporal
depends_on:
- postgresql
environment:
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
- TEMPORAL_NAMESPACE_DEFAULT=default
- TEMPORAL_DEFAULT_NAMESPACE=default
image: temporalio/auto-setup:1.29.0
networks:
- temporal-network
ports:
- 7233:7233
volumes:
- ./dynamicconfig:/etc/temporal/config/dynamicconfig
temporal-admin-tools:
container_name: temporal-admin-tools
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.29
networks:
- temporal-network
stdin_open: true
tty: true
temporal-ui:
container_name: temporal-ui
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
image: temporalio/ui:2.41.0
networks:
- temporal-network
ports:
- 8888:8080
networks:
temporal-network:
driver: bridge
name: temporal-networkКонфигурация
Для начала нам нужно настроить конфигурацию для temporal в нашем приложении.
В этом проекте я буду использовать стартер спринга для простоты настройки, но можно использовать голый jdk для temporal. Ниже приведу пример настройки для обоих вариантов :
Конфигурация через Spring starter
Ниже дан пример yaml конфигурации для стартера:
spring:
application:
name: temporal-demo
temporal:
connection:
target: 127.0.0.1:7233
target.namespace: default
workers:
- task-queue: MODERATION_TASK_QUEUE
capacity:
max-concurrent-workflow-task-pollers: 6
max-concurrent-activity-task-pollers: 6
rate-limits:
max-worker-activities-per-second: 2
max-task-queue-activities-per-second: 2
workflow-cache:
max-instances: 10
max-threads: 10
workersAutoDiscovery:
packages: com.moderation
В секции connection мы указываем параметры подключения к temporal и наш namespace.
В секции workers мы указываем параметры воркеров для нашего workflow. Worker - это внутренний компонент в temporal, который выполняет наши activity, иными словами - рабочая лошадка.
В секции workersAutoDiscovery мы указываем пакет с activity и workflow, которые будут автоматически сканироваться спрингом и регистрироваться в воркерах.
Конфигурация через голый jdk
Для начала нам нужно создать бин WorkflowClient, который позволит нам взаимодействовать с temporal.
@Configuration
public class TemporalConfig {
@Bean
public WorkflowClient workflowClient() {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
return WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder()
.setNamespace("default")
.build());
}
}
Создание воркера
Голый jdk ничего не знает про бины, поэтому приходится самому регистрировать activity в воркере, чтобы он мог их выполнять. Если мы этого не сделаем, то в ui будет висеть уведомление, что подходящего воркера не найдено:
@Component
@RequiredArgsConstructor
public class TemporalWorker implements CommandLineRunner {
private final WorkflowClient workflowClient;
private final Moderationservice moderationService;
private final ModerationDao moderationDao;
@Override
public void run(String... args) {
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker("MODERATION_TASK_QUEUE");
worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);
worker.registerActivitiesImplementations(new SymbolCountModerationActivityImpl(moderationService, moderationdao));
worker.registerActivitiesImplementations(new ForbiddenWordModerationActivityImpl(moderationService, moderationdao));
worker.registerActivitiesImplementations(new LuckTestModerationActivityImpl(moderationService, moderationdao));
factory.start();
}
}
Теперь реализуем все шаги модерации. Я буду каждый шаг реализовать в отдельном activity для возможности разной настройки таймаутов каждого шага. Если в этом нет необходимости, то можно и в одном интерфейсе написать три метода для каждого из трех шагов.
Реализуем первый шаг
В первом шаге нужно проверить пост на количество символов.
В temporal для каждого activity нужно создать интерфейс с определенными аннотациями:
@ActivityInterface
public interface SymbolCountModerationActivity {
@ActivityMethod
ModerationStatus symbolCountModerate(Moderation moderation);
}
Ну а далее мы пишем реализацию этого интерфейса:
@Component
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")
@RequiredArgsConstructor
public class SymbolCountModerationActivityImpl implements SymbolCountModerationActivity {
private final ModerationDao moderationDao;
private final ModerationService moderationService;
@Override
public ModerationStatus symbolCountModerate(Moderation moderation) {
ModerationStatus status = moderationService.symbolCountModeration(moderation);
if (!status.isApproved()) {
moderationDao.updateFinalStatus(moderation.id(), true, false);
}
return status;
}
}
Далее представлен код сервиса модерации:
// Методы из ModerationService
public ModerationStatus symbolCountModeration(Moderation moderation) {
log.info("Started first stage for {}", moderation);
simulateLongExecution();
var isApproved = moderation.text().length() < CONTENT_LENGTH;
return new ModerationStatus(true, isApproved);
}
private void simulateLongExecution() {
try {
sleep(10_000L);
} catch (InterruptedException ignored) {
}
}
// Dto статуса модерации
public record ModerationStatus(
boolean isFinished,
boolean isApproved
) {}
Важный нюанс состоит в том, что в сам workflow мы не можем внедрять бины, а в активити мы можем это делать.
Реализуем второй шаг
На втором шаге будем проверять наличие в тексте поста запрещенных слов.
Далее представлен интерфейс второго активити (шага).
@ActivityInterface
public interface ForbiddenWordModerationActivity {
@ActivityMethod
ModerationStatus forbiddenWordModerate(Moderation moderation);
}
Далее представлена реализация второго шага:
@Component
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")
@RequiredArgsConstructor
public class ForbiddenWordModerationActivityImpl implements ForbiddenWordModerationActivity {
private final ModerationService moderationService;
private final ModerationDao moderationDao;
@Override
public ModerationStatus forbiddenWordModerate(Moderation moderation) {
ModerationStatus status = moderationService.forbiddenWordModeration(moderation);
if (!status.isApproved()) {
moderationDao.updateFinalStatus(moderation.id(), true, false);
}
return status;
}
}
Ну и сам код модерации:
private static final List forbiddenFords = List.of("box", "men", "noigram", "metan");
public ModerationStatus forbiddenWordModeration(Moderation moderation) {
log.info("Started second stage for {}", moderation);
simulateLongExecution();
var content = moderation.text();
boolean isApproved = forbiddenFords.stream()
.noneMatch(content::contains);
return new ModerationStatus(true, isApproved);
}
Реализуем третий шаг
На третьем этапе у нас будет проверка на везение: с некоторой вероятностью будем выбрасывать ошибку. Эта ошибка будет означать недоступность внешнего сервиса или какую-то другую нештатную ситуацию в ходе работы нашей программы.
Это поможет нам увидеть поведение temporal в случае ошибки в activity и настроить политику ретраев и таймаутов.
Интерфейс activity:
@ActivityInterface
public interface LuckTestModerationActivity {
@ActivityMethod
ModerationStatus luckyModerate(Moderation moderation);
}
Реализация activity:
@Component
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")
@RequiredArgsConstructor
public class LuckTestModerationActivityImpl implements LuckTestModerationActivity {
private final ModerationService moderationService;
private final ModerationDao moderationDao;
@Override
public ModerationStatus luckyModerate(Moderation moderation) {
try {
ModerationStatus status = moderationService.luckyModeration(moderation);
moderationDao.updateFinalStatus(moderation.id(), true, true);
return status;
} catch (Exception e) {
if (Activity.getExecutionContext().getInfo().getAttempt() == 5){
moderationDao.updateFinalStatus(moderation.id(), true, false);
}
throw e;
}
}
}
Код модерации:
public ModerationStatus luckyModeration(Moderation moderation) {
log.info("Started third stage for {}", moderation);
checkLuck();
return new ModerationStatus(true, true);
}
private void checkLuck() {
if (Math.random() * 10 > 4) {
throw new RuntimeException("You are not lucky!");
}
}
Собираем наш workflow
После того как мы описали отдельно каждый из шагов модерации самое время собрать и запустить наш процесс.
Для этого нужно создать интерфейс для процесса с определенными аннотациями:
@WorkflowInterface
public interface ModerationWorkflow {
@WorkflowMethod
void moderate(Moderation moderation);
}
Далее представлена реализация workflow:
@WorkflowImpl(taskQueues = "MODERATION_TASK_QUEUE")
public class ModerationWorkflowImpl implements ModerationWorkflow {
/*
Объявление activity
...
*/
@Override
public void moderate(Moderation moderation) {
var symbolModerationStatus = symbolCountModerationActivity.symbolCountModerate(moderation);
if (!symbolModerationStatus.isApproved()) {
return;
}
var forbiddenWordsModerationStatus = forbiddenWordModerationActivity.forbiddenWordModerate(moderation);
if (!forbiddenWordsModerationStatus.isApproved()) {
return;
}
luckTestModerationActivity.luckyModerate(moderation);
}
}
Как видим, в этом нет ничего сложного. Визуально это просто просто код с последовательным вызовом методов, но в реальности это набор атомарных шагов с ретраями и сохранением состояния каждого шага.
Важно задавать id workflow при его создании, потому что если такой процесс уже есть, то повторно он запускаться не будет. В качестве id можно выбирать идентификатор записи в бд, чтобы потом было легко понять, какую запись смотреть.
Кроме этого можно для идентификатора использовать более читабельные названия. Например, что-то вроде customer1_order_1. Этот идентификатор дает информацию о конкретном заказчике, текущем процессе (заказе) и идентификаторе заказа. В этом случае в UI temporal получать информацию о процессе сможет не только разработчик, но и аналитик или тестировщик.
Этот процесс можно запускать из любой точки входа в приложение. Выберем рест запрос для простоты. Workflow процесс может быть длительный, даже в нашем случае, если мы будем проверять картинки в посте на предмет чего-то запрещённого с помощью llm, ml или вручную. Чтобы не зависеть от длительности выполнения процесса в рест запросе асинхронно запустим процесс.
В реальности нужно подумать о том, чтобы сохранение в бд и запуск workflow не аффектили друг друга в случае ошибки, а в pet проекте можно на это забить.
Ниже представлен пример запуска процесса модерации:
public Integer startModeration(Moderation moderation) {
Moderation createdModeration = moderationDao.create(moderation);
ModerationWorkflow workflow = workflowClient.newWorkflowStub(
ModerationWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(MODERATION_TASK_QUEUE)
.setWorkflowId(createdModeration.id().toString())
.build()
);
WorkflowClient.start(workflow::moderate, createdModeration);
moderationDao.updateInProgress(createdModeration.id());
return createdModeration.id();
}
Просмотр информации этапов в GUI
У temporal есть UI, в котором можно смотреть статус и состояния текущих или завершенных процессов.

На следующем скрине видно, как третий этап модерации ретраится

В UI есть подробный лог событий процесса

Тестирование
Для тестового проекта напишем один интеграционный тест для случая успешного прохождения процесса модерации.
В temporal jdk есть необходимые инструменты для тестирования. Создаем тестовое окружение, чтобы не использовать реальный temporal.
Ниже представлена эта конфигурация:
@BeforeEach
void setUp() {
applicationContext.start();
Worker worker = testWorkflowEnvironment.newWorker(MODERATION_TASK_QUEUE);
worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);
worker.registerActivitiesImplementations(symbolCountModerationActivity);
worker.registerActivitiesImplementations(forbiddenWordModerationActivity);
worker.registerActivitiesImplementations(luckTestModerationActivity);
testWorkflowEnvironment.start();
}
Далее представлен сам тест:
@Test
void shouldSuccessWorkflow() {
ModerationWorkflow workflow = workflowClient.newWorkflowStub(
ModerationWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(MODERATION_TASK_QUEUE).build()
);
Moderation moderation = new Moderation(0, "Text", false, false);
moderationHandler.startModeration(moderation);
workflow.moderate(moderation);
await()
.atMost(10, SECONDS)
.pollInterval(1, SECONDS)
.until(() -> {
Moderation actual = moderationDao.getById(0);
assertThat(actual.isApproved()).isTrue();
return true;
});
}
Кроме этого можно отдельно тестировать каждое activity, использовать моки и все, что захотите.
Заключение
В этой статье рассмотрели важные базовые моменты, связанные с temporal, а также пример проекта с этим инструментом. Этот проект можно скачать и самому потыкать: в нем есть swagger для отправки запроса, вся необходимая информация для запуска проекта есть в Readme. Скачивайте и запускайте.
Подводя итог, можно сказать, что temporal — это не просто еще одна библиотека для разработчика, а подход к построению надежных систем. Он абстрагирует всю сложность распределенных систем — обработку сбоев, повторные попытки, масштабирование — давая возможность разработчику сосредоточиться на написании бизнес-логики.
Ссылки:
Официальный сайт темпорал - https://temporal.io/
Github с кодом проекта - https://github.com/RinatBeybutov/Temporal
Ссылка на docker compose для темпорал - https://github.com/temporalio/docker-compose/blob/main/docker-compose-postgres.yml
P.S.
У меня есть репозиторий с полезными ссылками видео, статей и прочего. Можете найти там что-то полезное:
https://github.com/RinatBeybutov/Links-and-materials
Folko85
Судя по сайту оно платное. Стоит ли своих денег? Или в разработке не применяете?