В современном быстро развивающемся и ориентированном на данные мире создание сложных workflow, способных обрабатывать тысячи регистраций, процессов или действий, может быть сложной задачей. Важно иметь решение, которое будет масштабируемым, надежным и простым в использовании. К счастью, на рынке есть различные инструменты и движки, и одним из наших предпочтительных инструментов является Temporal.io. Temporal предоставляет набор проверенных паттернов, которые были опробованы в крупных продуктах и командах, позволяя разработчикам использовать их легко и эффективно, сосредоточив внимание исключительно на производительности инженерных решений.

В этом посте мы рассмотрим, как можно использовать Temporal Workflows для разработки структуры для эффективного управления жизненным циклом документов. Мы сосредоточимся на использовании Temporal для обработки событий workflow, связанных с изменениями документов, не перегружая систему ненужными процессами и ручными шагами. Мы предоставим примеры кода на PHP, но вы можете адаптировать их к любому из поддерживаемых Temporal языков.


Проблема

Рассмотрим сценарий, когда у вас есть набор данных, файлов или документов, которые нужно отслеживать на предмет изменений (например, AI-агентом или высокоуровневым супервайзером). Мы можем назвать этот процесс «жизненным циклом». Эти документы не обязательно генерируют много активности в вашей системе и не потребляют ресурсы, когда они бездействуют. Однако все меняется в момент, когда пользователь заходит и редактирует файл или загружает новый файл. Кроме того, пользователи не работают в предсказуемые интервалы, поэтому мы ожидаем всплеск активности, за которым следует период, когда использование ресурсов или ожидание изменений невозможно. Некоторые процессы, которые следят за этими изменениями файлов, не могут выполняться во время фактического периода активности пользователя (например, повторное создание сводки с использованием AI-агента). Мы хотим накопить некоторые из этих изменений или подождать, пока пользователь завершит работу.

Решение

Решение, которое мы рассмотрим сегодня, использует возможность Temporal создавать обработчики и конечные автоматы, основанные на таймерах и событиях. Преимущество этого подхода заключается в том, что вы можете создать очень сложные конечные автоматы и наблюдатели событий, которые могут разрешать зависимости по ходу дела. После завершения работы все очищается, не оставляя зависших процессов или потребления ресурсов, при этом гарантируя последовательный характер обработки и управления жизненным циклом документов.

Мы начнем с простого workflow и будем добавлять функции до достижения желаемого решения.

Шаг 0: Предварительные условия

Прежде чем углубляться в нюансы workflow, давайте начнем с наших базовых абстракций: события и очереди событий.

<?php

namespace App\DTO;

class Event
{
   public function __construct(
       public string $entity,
       public string $action,
   ) {
   }
}

namespace App\DTO;

use Temporal\Internal\Marshaller\Meta\MarshalArray;

class Queue implements \Countable
{
   public function __construct(
       #[MarshalArray(of: Event::class)]
       private array $events = [],
   ) {
   }

   public function merge(Queue $queue): Queue
   {
       $this->events = array_merge($this->events, $queue->events);
       return $this;
   }

   public function count(): int
   {
       return count($this->events);
   }

   // Moves all events to new queue and flushes the current queue
   public function flush(): Queue
   {
       $queue = new Queue($this->events);
       $this->events = [];

       return $queue;
   }
}

Temporal SDK позволяет нам маршалить (сериализовать) различные объекты и значения, чтобы передавать их между нашим кодом, workflow и activity. Мы будем использовать атрибут MarshalArray, чтобы указать типизированный массив для наших целей.

Шаг 1: Создание базового Workflow

Первым шагом является создание базового Workflow, который включает два основных метода: signal и Start. Но прежде чем создавать их, давайте создадим основу Workflow:

<?php

namespace App\Workflow;

use App\Activity\ProcessActivity;
use App\DTO\Queue;
use Temporal\Activity\ActivityOptions;
use Temporal\Internal\Workflow\ActivityProxy;
use Temporal\Workflow;

#[Workflow\WorkflowInterface]
class DocumentWorkflow
{
   private ProcessActivity|ActivityProxy $process;
   private Queue $queue;

   public function __construct()
   {
       $this->queue = new Queue();

       $this->process = Workflow::newActivityStub(
           ProcessActivity::class,
           ActivityOptions::new()
               ->withStartToCloseTimeout(5)
               ->withTaskQueue('demo_workflow'),
       );
   }
}

Signal метод (сигнальный метод)

SignalMethod используется для передачи информации в workflow для последующей обработки в основном цикле. Этот метод должен уметь получать переданную информацию и обрабатывать их последовательно в нашем основном цикле. Метод может выглядеть следующим образом:

<?php

#[Workflow\SignalMethod]
public function capture(Queue $events): void
{
   $this->queue = $this->queue->merge($events);
}

Start Workflow

Создадим простой Workflow, который вызывает одну activity при получении события создания документа. Вот наш основной цикл:

<?php

#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id): \Generator
{
   while (true) {
       // processing our queue
       yield $this->process->handleEvents(
           $document_id,
           $this->queue->flush()
       );

       // new events might arrive at this point
       if ($this->queue->count() === 0) {
           break;
       }
   }
}

Как вы видите, у нас теперь есть цикл, который обрабатывает события, и после завершения обработки событий workflow завершает работу. Если во время обработки поступают новые события, мы повторяем процесс. Ничего сложного на данный момент, и много места для улучшений (например, текущая версия будет очищать даже пустую очередь).

Шаг 2: Отправка событий в workflow

Теперь, когда у нас есть workflow, который может принимать события, вы можете задаться вопросом, в чем преимущество использования такого простого workflow и почему нам вообще нужен сигнальный метод. Секрет заключается в коде, который используется для отправки этих событий в workflow.

<?php

function push(string $document_id, Event ...$event): void
{
   $wf = $this->wfClient->newWorkflowStub(
       class: DocumentWorkflow::class,
       options: WorkflowOptions::new()
           ->withTaskQueue('demo_workflow')
           ->withWorkflowId($document_id)
           ->withWorkflowIdReusePolicy(IdReusePolicy::AllowDuplicate),
   );

   $this->wfClient->startWithSignal(
       workflow: $wf,
       signal: 'capture',
       signalArgs: [new Queue($event)],
       startArgs: [$document_id],
   );
}

Мы предполагаем, что document_id - это уникальный идентификатор файла, а $this->wfClient - экземпляр Temporal Client SDK. Если workflow не запущен, он будет автоматически создан. Затем сигнал будет вызван на вновь созданном или существующем workflow.

Полный код вспомогательной функции push можно найти здесь.

Этот подход позволяет нам отправлять события в очередь workflow, которая создается по требованию и очищается, как только очередь становится пустой.

Лучшая часть этого подхода заключается в том, что Temporal гарантирует, что события поступят в workflow, независимо от обстоятельств. Однако только события, добавленные в workflow во время выполнения activity, будут помещены в ту же очередь; в противном случае workflow завершится, и нам придется перезапускать его снова и снова.

Шаг 3: Введение таймеров

Теперь, когда workflow Temporal может принимать и обрабатывать события, давайте обсудим, как мы можем внедрить таймеры для более эффективного управления поведением пользователей. В приложениях, где вы обрабатываете события пользователей, управление временем становится критическим, так как пользователи склонны вносить изменения всплесками, а не создавать одно событие каждую минуту. Они работают, когда у них есть настроение, и не работают в другие моменты.

В данном рабочем процессе мы не сохраняем состояние, но для более сложных сценариев загрузка состояния документа и его обновление может быть тяжелым процессом, где частые перезапуски workflow нежелательны.

У нас есть несколько вариантов для реализации такого поведения, и для начала мы будем использовать простой метод awaitWithTimeout.

<?php

#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id): \Generator
{
   while (true) {
       // processing our queue
       yield $this->process->handleEvents(
           $document_id,
           $this->queue->flush()
       );

       $ok = yield Workflow::awaitWithTimeout(10, fn() => $this->queue->count() !== 0);
       if (!$ok && $this->queue->count() === 0) {
           // workflow is stale
           break;
       }
   }

   // we are done
}

Мы изменили условие внутри Workflow, чтобы оно ожидало не только очередь, но и срабатывание таймера. Если таймер срабатывает и события не добавлены, мы можем выйти из Workflow. Обратите внимание, что таймер будет срабатывать после каждой партии; мы оптимизируем это поведение ниже.

Этот подход позволяет нам накапливать события для новых изменений без немедленного выхода из Workflow. Workflow продолжит работу в течение 10 секунд после последнего события пользователя, но сбросит первую партию событий немедленно.

Continue-as-New

Созданное нами решение будет хорошо работать для коротких серий событий, вызванных пользователем, но когда количество событий может исчисляться сотнями и более, удержание Workflow активным слишком долго может вызвать сбои из-за размера истории. Мы можем избежать этой проблемы с помощью автоматического перезапуска Workflow, используя continueAsNew при достижении предела истории.

<?php

#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id, ?Queue $queue = null): \Generator
{
   if ($queue !== null) {
       // we want to make sure we captured previous and current events
       $this->queue = $queue->merge($this->queue);
   }

   while (true) {
       // processing our queue
       yield $this->process->handleEvents(
           $document_id,
           $this->queue->flush()
       );
       
       $ok = yield Workflow::awaitWithTimeout(10, fn() => $this->queue->count() !== 0);
       if (!$ok && $this->queue->count() === 0) {
           // workflow is stale
           break;
       }

       // our workflow is too large, let's continue as new
       if (Workflow::getInfo()->historyLength > 500) {
           break;
       }
   }

   if ($this->queue->count()) {
       // restart as new workflow
       yield Workflow::continueAsNew(
           'document.events',
           [$document_id, $this->queue],
           Workflow\ContinueAsNewOptions::new()
               ->withTaskQueue('demo_workflow'),
       );
   }
}

Мы изменили наш код, чтобы переносить необработанную очередь между запусками Workflow и останавливать выполнение, если наша история превышает лимит. И еще раз, преимущество Temporal в том, что система гарантирует, что никакие входящие события не будут потеряны во время перезапуска Workflow, что дает нам по-настоящему надежный и ограниченный по времени наблюдатель событий.

Хотя этот подход выглядит простым, представьте, что вы также можете переносить текущее состояние процесса между Workflow, фактически имея возможность продолжать работу бесконечно долго.

Таймеры, нужно больше таймеров!

Теперь, когда у нас есть базовая настройка, почему бы не попробовать что-то более интересное? Допустим, природа нашего приложения диктует необходимость ограничения и пакетирования пользовательских событий, а также их дедупликации. Хотя это требование сначала кажется темой для обсуждения, внутри Workflow Temporal мы фактически хотим внести небольшое изменение.

Прежде всего, мы можем решить проблему дедупликации на уровне очереди, поскольку мы сохраняем все буферизованные события перед их обработкой, мы можем использовать этот список для фильтрации. Добавление ограничения позже поможет нам накопить больше образцов для дедупликации.

Добавим этот метод в класс Queue:

<?php

public function mergeWithoutDuplicates(Queue $queue): Queue
{
   $this->events = array_merge(
       $this->events,
       array_udiff($queue->events, $this->events, fn($a, $b) => $a <=> $b),
   );

   return $this;
}

Мы вернемся к обновлению сигнального метода через мгновение. Теперь давайте создадим нашу собственную реализацию таймера под названием RollingTimer.

Этот класс запускает обработку только тогда, когда очередь заполняется до нужной емкости или срабатывает таймер, в отличие от awaitWithTimeout этот помощник попытается повторно использовать созданные таймеры, экономя нам несколько дополнительных событий в истории Workflow:

<?php

namespace App\Helpers;

use App\DTO\Queue;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Workflow;

use function React\Promise\resolve;

class RollingTimer
{
    private \DateTimeInterface $last;
    private ?PromiseInterface $timer = null;
    private ?Deferred $ready = null;

    public function __construct(
        readonly private int $waitSeconds,
    ) {
        $this->last = Workflow::now();
    }

    public function touch(): void
    {
        $this->last = Workflow::now();
    }

    public function wait(Queue $queue, int $size): PromiseInterface
    {
        $this->ready ??= new Deferred();
        if ($this->timer === null) {
            $this->timer = Workflow::timer($this->waitSeconds);
            $this->timer->then($this->tick(...)); // unlocks current $this->ready
        }

        return Workflow::await(
            fn() => $queue->count() >= $size,
            $this->ready->promise(),
        )->then($this->reset(...));
    }

    private function tick(): void
    {
        $this->timer = null; // old timer gone

        if ($this->ready === null) {
            return;
        }

        // how long time passed since last event
        $passed = Workflow::now()->getTimestamp() - $this->last->getTimestamp();

        if ($passed < $this->waitSeconds) { // we captured recent event so the pipe is still alive
            $this->timer = Workflow::timer($this->waitSeconds);
            $this->timer->then($this->tick(...));
            return;
        }

        $this->ready->resolve(true);
        $this->ready = null;
    }

    private function reset(): void
    {
        $this->ready?->reject();
        $this->ready = null;
    }
}

Теперь обновим наш метод Signal (не забудьте создать RollingTimer в конструкторе Workflow):

<?php

#[Workflow\SignalMethod]
public function capture(Queue $events): void
{
   $this->queue = $this->queue->mergeWithoutDuplicates($events);
   $this->waiter->touch(); // indicating fresh data
}

Теперь каждый новый набор данных будет продвигать внутреннюю временную отметку, хранящуюся внутри RollingTimer, сообщая ему, когда произошло последнее событие, чтобы мы могли ожидать нужную продолжительность после этого события.

Модификация основного метода Workflow также проста:

<?php

#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id, ?Queue $queue = null): \Generator
{
   if ($queue !== null) {
       // we want to make sure we captured previous and current events
       $this->queue = $queue->merge($this->queue);
   }

   while (true) {
       // wait for timer or queue to fill up for 8 items
       yield $this->waiter->wait($this->queue, size: 8);

       // no batches to wait for, exiting
       if ($this->queue->count() === 0) {
           break;
       }

       // processing our queue
       yield $this->process->queue($document_id, $this->queue->flush());

       // our workflow is too large, let's continue as new
       if (Workflow::getInfo()->historyLength > 500) {
           break;
       }
   }

   if ($this->queue->count()) {
       // restart as new workflow
       yield Workflow::continueAsNew(
           'document.events',
           [$document_id, $this->queue],
           Workflow\ContinueAsNewOptions::new()
               ->withTaskQueue('demo_workflow'),
       );
   }
}

Вы можете поэкспериментировать с этой демонстрацией, используя этот репозиторий Github; он включает два файла для запуска, один из которых может эмулировать случайную активность событий по нескольким документам.

Заключение

Используя возможности Temporal, такие как обработчики событий на основе таймеров, мы можем создавать сложные workflow, которые эффективно обрабатывают события пользователей. Добавьте машину состояний в этот Workflow и получите надежный инструмент для управления жизненным циклом документов в ~200 строках кода!

Следите за новыми статьями в этой серии, где мы будем углубляться в функциональность Temporal и исследовать его применение в различных областях, включая ИИ и не только.


P.s. У нас есть telegram канал, где можно задавать вопросы по temporal. А также есть youtube канал где мы публикуем видео по разработке.

Комментарии (0)