Знакомо?
Знакомо?
На момент написания статьи мне сложно придумать более простое описание темпорала, чем это
На момент написания статьи мне сложно придумать более простое описание темпорала, чем это

В жизни каждого человека бывают моменты, когда приходит время писать бизнес логику и чтобы вернувшись через полгода все было более чем понятно. Все зависит от того, зайдете ли вы в ту дверь.
Поделюсь небольшим опытом, который сформировался в процессе перехода с "dumb rpc поверх nats" на полноценные воркфлоу в рамках разработки слоя биллинга для хостинга.

Lets get it started

Рассмотрим классический случай, когда стоит задача организовать взаимодействие с другим сервисом. Шаги, которые вы пройдете будут следующими:

  • Поднимаете любую известную шину данных (Rabbit, Kafka, Nats, etc);

  • Строите связь между сервисами через kafka топики/nats сабжекты/очереди rabbitmq;

  • Делаете де-/сериализацию данных;

  • Попутно костылите логику, например, для retry и backoff;

Какие моменты возникают:

  • Отсутствие контроля выполнения. Упал воркер - вы потеряли весь прогресс;

  • Зависимость от стабильности шины данных;

  • Необходимость в скотче для скрепления всей конструкции;

Temporal как гейм-ченжер бизнес логики вашего приложения

Temporal закрывает все 4 базовых пункта сразу, но при этом предоставляет ещё больше инструментов для создания легко масштабируемых сервисов.

В Temporal есть 2 основные сущности:

  • Workflow - ближайшая аналогия это CI пайплайны. Workflow просты и изящны в своей реализации. Вызывают необходимую логику через Activity.
    Отказоустойчивы по умолчанию;

  • Activity - своего рода RPC методы. Могут жить где угодно. Не зависят от ЯП и встраиваются буквально в любом месте приложения.
    Например, в случае с NestJS activity методы могут жить в основном инстансе вашего api сервиса и использовать DI без каких-либо ограничений.
    Отказоустойчивы по умолчанию;

А также 4 дополнительных, которые вам скорее всего понадобятся:

  • Query - метод workflow, который может вернуть хранимое состояние. Например, на каком этапе находится выполнение воркфлоу.

  • Signal - callback от workflow, который может выполнять любую логику. Наиболее частое использование - обработка событий извне, чтобы как-либо влиять на выполнение воркфлоу.

  • Timer - аналог sleep.

  • Condition - асинхронная функция, которая вернет true/false если выполнилось обозначенное условие. Например, ждем событие, обрабатываем в signal callback и condition вернет true, после чего выполнение воркфлоу может продолжиться.

Практическое применение. Учимся делать красиво c Temporal

Нам нужен durable execution. Завершить любой ценой. Упали - поднялись. Поднялись - продолжили.

  • Основная логика хранится в виде Workflow (те-самые ci-like пайплайны).

  • retry/backoff made right. Temporal сам повторит вызов в случае ошибки (вы по-прежнему можете контролировать и выбрасывать nonRetryable исключение);

  • cron made right. Temporal из коробки умеет вызов задач по крону, при этом вам не нужно возиться с реализацией как самого крона, так и с блокировками через Redis/Consul/etc в случае работы нескольких инстансов приложения (привет разработчикам Spring). Запуск произойдет строго на одном воркере за раз.

Я не стану придумывать эфемерные ситуации, а опишу то, с чем столкнулся сам в рамках одного из проектов хостинга, в котором принимаю участие.

Управление услугами должно быть надежным как для клиента, так и администратора. От этапа создания услуги до этапа освобождение ресурсов без оставления dangling сущностей.


Вводные:

  • REST API (далее backend) для личного кабинета, принимает все запросы с UI;

  • Внешний сервис на Python (далее py-worker), работающий с виртуализацией;

  • Критическая необходимость гарантированного предоставления услуги без всяких "но". Получили запрос на создание - обязательно обработали;

Как было раньше

Вызовы от backend к py-worker проходили через NATS по модели request-reply в рамках транзакции с блокировками на уровне базы данных.

Хоть это и работало, но с массой минусов, одни из которых:

  • Никакой гарантии, что NATS будет доступен в момент вызова (в том числе повторного вызова).

  • Необходимость переизобретения retry + backoff и логика постепенно усложнялась;

  • Переизобретение своего "формата" общения между сервисами с `{status: "OK"}` или `{error: "Something has failed"}`

Как это реализовано сейчас:

  • Вся основная логика сформирована в workflow;

  • Добавляется новый сервис billing-worker. Он запускает воркер для Temporal и готов запускать workflow.

Итоговый воркфлоу (упрощен в целях демонстрации декларативности логики) получился следующим:

type CreateHipletWorkflowArgs = {
  hiplet: { id: number };
  region: { options: { default_network_uuid: string } };
};

export const hipletCreatedSignal = wf.defineSignal('hiplet_created');
const { set_instance_id, set_hiplet_ip } = wf.proxyActivities<IHipletsActivities>({
  startToCloseTimeout: '1m',
});

// Кросс вызовы методов воркера для облака
const cloud_activities = wf.proxyActivities({
  taskQueue: 'cloud-queue',
  startToCloseTimeout: '1m',
});

export async function hipletCreate(args: CreateHipletWorkflowArgs): Promise<void> {
  const { hiplet, region } = args;
  
  // Переменные могут выступать как место для хранения состояния воркфлоу
  let is_created_by_signal = false;
  
  // Callback, который выполнится в случае если в воркфлоу придет сигнал
  wf.setHandler(hipletCreatedSignal, () => {
    is_created_by_signal = true;
  });

  // Вызов метода от py-worker, который общается с системой управления виртуализацией
  const port = await cloud_activities['port.create']({
    name: `hiplet-${hiplet.id}`,
    network_id: region.options.default_network_uuid,
  });

  const ips = extractFixedIPs(port.fixed_ips);
  await set_hiplet_ip({
    hiplet_id: hiplet.id,
    ipv4: ips.ipv4,
    ipv6: ips.ipv6,
  });
  wf.log.info(`Port created. ID: ${port.id}`);

  const payload = {
    hiplet_id: hiplet.id,
    port_id: port.id,
    ips: ips,
  };
  const server = await cloud_activities['instance.create'](payload);
  await set_instance_id({
    hiplet_id: hiplet.id,
    instance_id: server.id,
  });
  
  // Ждем в течение 5 минут сигнал от биллинга, что услуга создана
  const is_created = await wf.condition(() => is_created_by_signal, '5m');
  if (!is_created) {
    // Если сигнал не пришел, то выбрасываем ошибку. Перезапускаем воркфлоу
    throw wf.ApplicationFailure.retryable('Hiplet create timeout');
  }
}

Отправка сигнала с биллинга происходит при получении события от виртуализации, что услуга была создана. С точки зрения разработчика все как нельзя просто:

await this.temporal.signalWorkflow(`hiplet:create:${hiplet.id}`, 'hiplet_created');

Что получаем благодаря Temporal:

  • Декларативная бизнес-логика в одном месте;

  • Не зависимы от стабильности внешних шин данных;

  • Temporal будет пытаться выполнить все описанные этапы в воркфлоу. Если воркер упадет на одном из этапов, то выполнение продолжится с того же места на другом воркере. Применимо в том числе к for loop с индексами.

Рассмотрен базовый спектр возможностей, который закрывает бОльшую часть потребностей в рамках durable execution.

Итог

Какие пл��сы вынесли с Temporal:

  • Детальный observability по времени выполнения каждого воркфлоу и каждого этапа;

  • Выпилили NATS для работы с клиентскими услугами. Все действия переведены в формат воркфлоу, что гарантирует надежность выполнения.

  • Тысячи выполненных воркфлоу. Ни одна клиентская услуга не потерялась.
    Например, иногда могут закончиться IP или вычислительные ресурсы. Temporal будет из раза в раз запускать воркфлоу пока услуга не будет создана.

  • Возможность изменения логики без серьезных последствий.
    Temporal поддерживает версионирование воркеров.

  • Нововведения на уровне Temporal никак не затрагивают другие компоненты биллинга. Зоны ответственности разделены.

В случае если у вас появятся вопросы в процессе прочтения, буду рад ответить и поделиться опытом.

Комментарии (0)