При разработке программных продуктов иногда может появиться задача разработки многосоставного ступенчатого процесса. На каждом этапе которого нужно предусмотреть обработку ошибок, повторное выполнение операции, откат с начальному состоянию в случае ошибки и тд. Возникают вопросы о том, как реализовать обработку критических ситуаций, как хранить состояние процесса и тд. Если процесс сложный и длительный по времени, то реализация этой задачи может быть непростой для разработки и тестирования всех вариантов развития процесса.

В этом случае можно подумать о применении оркестраторов. В этой статье я расскажу про temporal и покажу пример кода. В конце приложу ссылку на исходный код, так что можно будет скачать и самому поиграться с настройками, процессами и тд.

Краткое инфо

Чем temporal не является:

  • Это не планировщик задач (cron). Хотя он может запускать задачи по расписанию, но он не заменяет cron напрямую, потому что запускать сервер temporal слишком избыточно; нет встроенной визуализации расписаний и есть ограничения в гибкости cron выражениях

  • Это не очередь сообщений. Temporal - это не kafka, rabbit и тд. Он не предназначен для простой передачи сообщений между сервисами.

  • Это не база данных. Хотя temporal и хранит состояние процессов, но он не предназначен для использования как СУБД.

А что же это такое:

Temporal - это оркестратор, который управляет вашим кодом. Он гарантирует надежное выполнение долгоживущих операций в распределенных системах. В нем есть встроенная поддержка ретраев с настраиваемыми таймаутами и сохранение состояния процесса с возможностью запуска с места остановки.

Важная особенность - это описание оркестрируемого процесса в коде с небольшим количеством добавок в виде аннотаций и интерфейсов.

На официальном сайте можно найти документацию по инструменту для многих языков (Go, Java, Python ...). Кроме этого на официальном сайте есть ссылка на слак, где можно задать вопросы разработчикам и другим пользователям Temporal. В этом канале еще есть ИИ, которая обучена на документации.

Преимущества temporal

  • Отказоустойчивость: Выдерживает падения сервисов, сетевые сбои, перезапуски инфраструктуры.

  • Долгоживущие процессы: Можно легко управлять процессами, которые длятся дни или месяцы (например, пробная подписка или онбординг).

  • Простота разработки:  Вся логика пишется как простой последовательный код. Исчезает необходимость вручную управлять очередями, повторными попытками (retries) и компенсирующими транзакциями (sagas).

  • Визуализация и отладка: UI Temporal показывает полную историю выполнения любого workflow, что сильно упрощает отладку.

Несколько важных терминов

Вначале нужно упомянуть несколько терминов, чтобы потом все понятно было:

Рабочий процесс (Workflow):

Workflow - это процесс, состоящий из нескольких элементарных шагов. Это может быть, например, процесс модерации поста, процесс перевода денег с одного счета на другой, процесс бронирования жилья, билетов и тд.

Этот процесс мы оп��сываем в коде, так что визуально сложно отличить от обычной функции, в которой есть последовательные вызовы методов.

Детерминированный: При повторных запусках с теми же данными должен давать одинаковый результат, рекомендации ниже раскроют этот пункт.

Рекомендации для написания workflow:

  • Не используйте глобальные изменяемые переменные в своих имплементациях Workflow. Это гарантирует, что различные Wokrflow изолированы.

  • Код workflow должен быть детерминирован. Не вызывайте недетерминированные функции (например UUID.randomUUID()) напрямую из кода Workflow. Temporal SDK предоставляет API для вызова недетерминированного кода.

  • Не используйте конструкции языка для получения системного времени, вместо этого используйте только Workflow.currentTimeMillis() для получения текущего времени в коде Workflow.

  • Не используйте Thread или другие классы многопоточности как ThreadPoolExecutior. Вместо этого для выполнения асинхронного кода используйте Async.funtion или Async.procedure предоставляемые Temporal SDK

  • Не используйте синхронизацию, локи или другие стандартные классы для синхронизации кода кроме тех, что поставляются классом Workflow. Нет явной необходимости в синхронизации кода, потому что код Workflow выполняется в одном потоке под глобальным локом.

  • Вызывайте Workflow.sleep вместо Thread.sleep

  • Используйте Promise и CompletablePromise вместо Future и CompetableFuture

  • Используйте WorflowQueue вместо BlockingQueue

  • Используйте Workflow.getVersion когда изменяете Workflow. Без этого старые процессы могут упасть из-за несоответствия процесса

Деятельность (Activity):

Это конкретный шаг нашего процесса. Для примера можно назвать: вызов внешнего API, сохранение состояния в базу, отправка нотификации пользователю и тд

Activity уже может содержать недетерминированный код, а кроме того в активити мы можем внедрять наши бины.

Здесь еще нужно упомянуть, что входные и выходные объекты для Activity должны быть сериализуемыми, чтобы Temporal мог их сохранить в базу. Кроме этого, есть ограничение на объем.

Namespace

Namespace - это пространство для разделения разных Workflow. Разработчики Temporal предлагают использовать этот механизм для разделения процессов в dev и prod средах. Сомнительно, но окей.

В итоге получается вот такая схема из этих компонентов:

Схема компонентов
Схема компонентов

Некоторые детали реализации Temporal

  • В Temporal реализован подход Event Sourcing. Temporal не хранит текущее состояние вашего workflow. Вместо этого он хранит историю событий (запущен, вызвана активность X, активность X завершена успешно, и т.д.). При перезапуске воркер «проигрывает» эту историю, пропуская шаги, которые уже были выполнены, чтобы восстановить последнее состояние. По этой причине большое количество активностей вызывает большое количество записей в бд. И именно поэтому изменение Workflow может вызывать ошибку, если сделать это неправильно.

  • В Temporal используется версионирование процессов, поэтому при модификации с нарушением совместимости можно выбрать подходящий процесс, чтобы не поломать старое и добавить новое. Это стоит применять, если есть активные workflow со старым алгоритмом.

  • Гарантия «ровно-один раз»: Благодаря сохранению состояний и ретраям, каждая деятельность будет выполнена ровно один раз, даже если для этого потребуется несколько попыток. Кроме этого, у каждого workflow есть идентификатор, поэтому запустить дублирующий workflow со второго инстанса вашего приложения не получитчся.

Пример кода

Идея проекта

Напишем пример на java для модерации поста. Этот процесс будет состоять из 3х шагов:

  • Проверка поста на количество символов

  • Проверка поста на содержание запрещенных слов

  • Проверка на везение Если какой-то из этап возвращает отрицательный ��езультат, то мы заканчиваем процесс модерации и сохраняем отрицательный результат модерации. В дальнейшем можно сюда добавить еще функциональности, чтобы показать разные возможности temporal.

Полный код проекта можно посмотреть в git репозитории, ссылку смотрите в конце.

Развертывание temporal

Temporal состоит из нескольких компонентов, которые можно развернуть в докере или кубере. Для простоты напишем docker compose для поднятия всех необходимых компонентов. В первом приближении есть такие компоненты:

  • Сервер temporal

  • UI temporal

  • База данных для temporal

Docker compose
services:
 postgresql:
 container_name: temporal-postgresql
 environment:
 POSTGRES_PASSWORD: temporal
 POSTGRES_USER: temporal
 image: postgres:15
 networks:
 - temporal-network
 ports:
 - 5454:5432
 volumes:
 - /var/lib/postgresql/data

temporal:
 container_name: temporal
 depends_on:
 - postgresql
 environment:
 - DB=postgres12
 - DB_PORT=5432
 - POSTGRES_USER=temporal
 - POSTGRES_PWD=temporal
 - POSTGRES_SEEDS=postgresql
 - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CLI_ADDRESS=temporal:7233
 - TEMPORAL_NAMESPACE_DEFAULT=default
 - TEMPORAL_DEFAULT_NAMESPACE=default
 image: temporalio/auto-setup:1.29.0
 networks:
 - temporal-network
 ports:
 - 7233:7233
 volumes:
 - ./dynamicconfig:/etc/temporal/config/dynamicconfig

temporal-admin-tools:
 container_name: temporal-admin-tools
 depends_on:
 - temporal
 environment:
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CLI_ADDRESS=temporal:7233
 image: temporalio/admin-tools:1.29
 networks:
 - temporal-network
 stdin_open: true
 tty: true

temporal-ui:
 container_name: temporal-ui
 depends_on:
 - temporal
 environment:
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CORS_ORIGINS=http://localhost:3000
 image: temporalio/ui:2.41.0
 networks:
 - temporal-network
 ports:
 - 8888:8080
 networks:
 temporal-network:
 driver: bridge
 name: temporal-network

Конфигурация

Для начала нам нужно настроить конфигурацию для temporal в нашем приложении.

В этом проекте я буду использовать стартер спринга для простоты настройки, но можно использовать голый jdk для temporal. Ниже приведу пример настройки для обоих вариантов :

Конфигурация через Spring starter

Ниже дан пример yaml конфигурации для стартера:

spring:  
  application:  
    name: temporal-demo  
  temporal:
    connection:  
      target: 127.0.0.1:7233  
      target.namespace: default  
    workers:  
      - task-queue: MODERATION_TASK_QUEUE  
        capacity:  
          max-concurrent-workflow-task-pollers: 6  
          max-concurrent-activity-task-pollers: 6  
        rate-limits:  
          max-worker-activities-per-second: 2  
          max-task-queue-activities-per-second: 2  
    workflow-cache:  
      max-instances: 10  
      max-threads: 10 
    workersAutoDiscovery:  
      packages: com.moderation

В секции connection мы указываем параметры подключения к temporal и наш namespace.
В секции workers мы указываем параметры воркеров для нашего workflow. Worker - это внутренний компонент в temporal, который выполняет наши activity, иными словами - рабочая лошадка.
В секции workersAutoDiscovery мы указываем пакет с activity и workflow, которые будут автоматически сканироваться спрингом и регистрироваться в воркерах.

Конфигурация через голый jdk

Для начала нам нужно создать бин WorkflowClient, который позволит нам взаимодействовать с temporal.

@Configuration  
public class TemporalConfig {  
  
  @Bean  
  public WorkflowClient workflowClient() {  
    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();  
    return WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder()  
        .setNamespace("default")  
        .build());  
  }  
}

Создание воркера

Голый jdk ничего не знает про бины, поэтому приходится самому регистрировать activity в воркере, чтобы он мог их выполнять. Если мы этого не сделаем, то в ui будет висеть уведомление, что подходящего воркера не найдено:

@Component  
@RequiredArgsConstructor  
public class TemporalWorker implements CommandLineRunner {  
  
  private final WorkflowClient workflowClient;  
  private final Moderationservice moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public void run(String... args) {  
    WorkerFactory factory = WorkerFactory.newInstance(workflowClient);  
    Worker worker = factory.newWorker("MODERATION_TASK_QUEUE");  
  
    worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);  
    
    worker.registerActivitiesImplementations(new SymbolCountModerationActivityImpl(moderationService, moderationdao));  
    worker.registerActivitiesImplementations(new ForbiddenWordModerationActivityImpl(moderationService, moderationdao));  
    worker.registerActivitiesImplementations(new LuckTestModerationActivityImpl(moderationService, moderationdao));  
  
    factory.start();  
  }  
}

Теперь реализуем все шаги модерации. Я буду каждый шаг реализовать в отдельном activity для возможности разной настройки таймаутов каждого шага. Если в этом нет необходимости, то можно и в одном интерфейсе написать три метода для каждого из трех шагов.

Реализуем первый шаг

В первом шаге нужно проверить пост на количество символов.

В temporal для каждого activity нужно создать интерфейс с определенными аннотациями:

@ActivityInterface  
public interface SymbolCountModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus symbolCountModerate(Moderation moderation);  
}

Ну а далее мы пишем реализацию этого интерфейса:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class SymbolCountModerationActivityImpl implements SymbolCountModerationActivity {  
  
  private final ModerationDao moderationDao;  
  private final ModerationService moderationService;  
  
  @Override  
  public ModerationStatus symbolCountModerate(Moderation moderation) {  
    ModerationStatus status = moderationService.symbolCountModeration(moderation);  
    if (!status.isApproved()) {  
      moderationDao.updateFinalStatus(moderation.id(), true, false);  
    }  
    return status;  
  }  
}

Далее представлен код сервиса модерации:

// Методы из ModerationService
public ModerationStatus symbolCountModeration(Moderation moderation) {  
  log.info("Started first stage for {}", moderation);  
  simulateLongExecution();  
  var isApproved = moderation.text().length() < CONTENT_LENGTH;  
  return new ModerationStatus(true, isApproved);
}
  
private void simulateLongExecution() {  
  try {  
    sleep(10_000L);  
  } catch (InterruptedException ignored) {  
  }
}

// Dto статуса модерации 
public record ModerationStatus(  
  boolean isFinished,  
  boolean isApproved  
) {}

Важный нюанс состоит в том, что в сам workflow мы не можем внедрять бины, а в активити мы можем это делать.

Реализуем второй шаг

На втором шаге будем проверять наличие в тексте поста запрещенных слов.

Далее представлен интерфейс второго активити (шага).

@ActivityInterface  
public interface ForbiddenWordModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus forbiddenWordModerate(Moderation moderation);  
}

Далее представлена реализация второго шага:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class ForbiddenWordModerationActivityImpl implements ForbiddenWordModerationActivity {  
  
  private final ModerationService moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public ModerationStatus forbiddenWordModerate(Moderation moderation) {  
    ModerationStatus status = moderationService.forbiddenWordModeration(moderation);  
    if (!status.isApproved()) {  
      moderationDao.updateFinalStatus(moderation.id(), true, false);  
    }  
    return status;  
  }  
}

Ну и сам код модерации:

private static final List forbiddenFords = List.of("box", "men", "noigram", "metan");

public ModerationStatus forbiddenWordModeration(Moderation moderation) {  
  log.info("Started second stage for {}", moderation);  
  simulateLongExecution();  
  var content = moderation.text();  
  boolean isApproved = forbiddenFords.stream()  
      .noneMatch(content::contains);  
  return new ModerationStatus(true, isApproved);  
}

Реализуем третий шаг

На третьем этапе у нас будет проверка на везение: с некоторой вероятностью будем выбрасывать ошибку. Эта ошибка будет означать недоступность внешнего сервиса или какую-то другую нештатную ситуацию в ходе работы нашей программы.

Это поможет нам увидеть поведение temporal в случае ошибки в activity и настроить политику ретраев и таймаутов.

Интерфейс activity:

@ActivityInterface  
public interface LuckTestModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus luckyModerate(Moderation moderation);  
}

Реализация activity:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class LuckTestModerationActivityImpl implements LuckTestModerationActivity {  
  
  private final ModerationService moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public ModerationStatus luckyModerate(Moderation moderation) {  
    try {  
      ModerationStatus status = moderationService.luckyModeration(moderation);  
      moderationDao.updateFinalStatus(moderation.id(), true, true);  
      return status;  
    } catch (Exception e) {  
      if (Activity.getExecutionContext().getInfo().getAttempt() == 5){  
        moderationDao.updateFinalStatus(moderation.id(), true, false);  
      }  
      throw e;  
    }  
  }  
}

Код модерации:

public ModerationStatus luckyModeration(Moderation moderation) {  
  log.info("Started third stage for {}", moderation);  
  checkLuck();  
  return new ModerationStatus(true, true);  
}

private void checkLuck() {  
  if (Math.random() * 10 > 4) {  
    throw new RuntimeException("You are not lucky!");  
  }  
}

Собираем наш workflow

После того как мы описали отдельно каждый из шагов модерации самое время собрать и запустить наш процесс.

Для этого нужно создать интерфейс для процесса с определенными аннотациями:

@WorkflowInterface  
public interface ModerationWorkflow {  
  
  @WorkflowMethod  
  void moderate(Moderation moderation);  
}

Далее представлена реализация workflow:

@WorkflowImpl(taskQueues = "MODERATION_TASK_QUEUE")  
public class ModerationWorkflowImpl implements ModerationWorkflow {  
  
  /*
	Объявление activity
	...
  */
  
  @Override  
  public void moderate(Moderation moderation) {  
    var symbolModerationStatus = symbolCountModerationActivity.symbolCountModerate(moderation);  
    if (!symbolModerationStatus.isApproved()) {  
      return;  
    }  
  
    var forbiddenWordsModerationStatus = forbiddenWordModerationActivity.forbiddenWordModerate(moderation);  
    if (!forbiddenWordsModerationStatus.isApproved()) {  
      return;  
    }  
  
    luckTestModerationActivity.luckyModerate(moderation);  
  }  
}

Как видим, в этом нет ничего сложного. Визуально это просто просто код с последовательным вызовом методов, но в реальности это набор атомарных шагов с ретраями и сохранением состояния каждого шага.

Важно задавать id workflow при его создании, потому что если такой процесс уже есть, то повторно он запускаться не будет. В качестве id можно выбирать идентификатор записи в бд, чтобы потом было легко понять, какую запись смотреть.
Кроме этого можно для идентификатора использовать более читабельные названия. Например, что-то вроде customer1_order_1. Этот идентификатор дает информацию о конкретном заказчике, текущем процессе (заказе) и идентификаторе заказа. В этом случае в UI temporal получать информацию о процессе сможет не только разработчик, но и аналитик или тестировщик.

Этот процесс можно запускать из любой точки входа в приложение. Выберем рест запрос для простоты. Workflow процесс может быть длительный, даже в нашем случае, если мы будем проверять картинки в посте на предмет чего-то запрещённого с помощью llm, ml или вручную. Чтобы не зависеть от длительности выполнения процесса в рест запросе асинхронно запустим процесс.

В реальности нужно подумать о том, чтобы сохранение в бд и запуск workflow не аффектили друг друга в случае ошибки, а в pet проекте можно на это забить.

Ниже представлен пример запуска процесса модерации:

public Integer startModeration(Moderation moderation) {  
  Moderation createdModeration = moderationDao.create(moderation);  
  
  ModerationWorkflow workflow = workflowClient.newWorkflowStub(  
      ModerationWorkflow.class,  
      WorkflowOptions.newBuilder()  
          .setTaskQueue(MODERATION_TASK_QUEUE)  
          .setWorkflowId(createdModeration.id().toString())  
          .build()  
  );  
  WorkflowClient.start(workflow::moderate, createdModeration);  
  
  moderationDao.updateInProgress(createdModeration.id());  
  
  return createdModeration.id();  
}

Просмотр информации этапов в GUI

У temporal есть UI, в котором можно смотреть статус и состояния текущих или завершенных процессов.

Окно с информацией о текущем workflow
Окно с информацией о текущем workflow

На следующем скрине видно, как третий этап модерации ретраится

Retry activity
Retry activity

В UI есть подробный лог событий процесса

Полный лог событий workflow
Полный лог событий workflow

Тестирование

Для тестового проекта напишем один интеграционный тест для случая успешного прохождения процесса модерации.

В temporal jdk есть необходимые инструменты для тестирования. Создаем тестовое окружение, чтобы не использовать реальный temporal.

Ниже представлена эта конфигурация:

@BeforeEach  
void setUp() {  
  applicationContext.start();  
  Worker worker = testWorkflowEnvironment.newWorker(MODERATION_TASK_QUEUE);  
  worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);  
  
  worker.registerActivitiesImplementations(symbolCountModerationActivity);  
  worker.registerActivitiesImplementations(forbiddenWordModerationActivity);  
  worker.registerActivitiesImplementations(luckTestModerationActivity);  
  testWorkflowEnvironment.start();  
}

Далее представлен сам тест:

@Test  
void shouldSuccessWorkflow() {  
  ModerationWorkflow workflow = workflowClient.newWorkflowStub(  
      ModerationWorkflow.class,  
      WorkflowOptions.newBuilder().setTaskQueue(MODERATION_TASK_QUEUE).build()  
  );  
  Moderation moderation = new Moderation(0, "Text", false, false);  
  moderationHandler.startModeration(moderation);  
  workflow.moderate(moderation);  
  
  await()  
      .atMost(10, SECONDS)  
      .pollInterval(1, SECONDS)  
      .until(() -> {  
        Moderation actual = moderationDao.getById(0);  
        assertThat(actual.isApproved()).isTrue();  
        return true;  
      });  
}

Кроме этого можно отдельно тестировать каждое activity, использовать моки и все, что захотите.

Заключение

В этой статье рассмотрели важные базовые моменты, связанные с temporal, а также пример проекта с этим инструментом. Этот проект можно скачать и самому потыкать: в нем есть swagger для отправки запроса, вся необходимая информация для запуска проекта есть в Readme. Скачивайте и запускайте.

Подводя итог, можно сказать, что temporal — это не просто еще одна библиотека для разработчика, а подход к построению надежных систем. Он абстрагирует всю сложность распределенных систем — обработку сбоев, повторные попытки, масштабирование — давая возможность разработчику сосредоточиться на написании бизнес-логики.

Ссылки:

P.S.
У меня есть репозиторий с полезными ссылками видео, статей и прочего. Можете найти там что-то полезное:
https://github.com/RinatBeybutov/Links-and-materials

Комментарии (3)


  1. Folko85
    27.11.2025 20:49

    Судя по сайту оно платное. Стоит ли своих денег? Или в разработке не применяете?


  1. Folko85
    27.11.2025 20:49

    Да, и было бы неплохо сравнить по функционалу с Камундой, например.


  1. ArtMan99
    27.11.2025 20:49

    Неплохое введение, стало чуть понятнее, зачем temporal вообще нужен