image

В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.

Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки dispatch(), примерно так:

class MyFirstJob extends Job
{
    use DispatchesJobs;

    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle()
    {
        $this->doSomething($this->data);
        $this->dispatch(new MySecondJob($this->data));  // Second task
    }
}

А следующая стадия обработки инициировалась совершенно аналогично:

class MySecondJob extends Job
{
    use DispatchesJobs;

    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle()
    {
        $this->doSomething($this->data);
        if ($this->someCondition($this->data)) {
            $this->dispatch(new MyThirdJob($this->data));  // Third task
        }
    }
}

Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.

Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов dispatch()) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).

Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.

Управление очередями


У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.

<?php

namespace App\Jobs\Pipeline;

use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;

abstract class PipelineAbstract
{
    use DispatchesJobs;

    /**
     * @param array $params
     * @return PipelineAbstract
     */
    public function start(array $params)
    {
        $this->next(null, $params);
        return $this;
    }

    /**
     * @param Job $currentJob
     * @param array $params Set of parameters for starting new jobs
     */
    abstract public function next(Job $currentJob = null, array $params);

    /**
     * @param Job $job
     */
    protected function startJob(Job $job)
    {
        $this->dispatch($job);
    }

}

В методе next() у нас как раз и будет реализован бизнес процесс. startJob() — просто обертка над dispatch() на всякий случай. А start() будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).

Пример реализации бизнес логики:

<?php

namespace App\Jobs\Pipeline;

use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;

class ProcessDataPipeline extends PipelineAbstract
{

    /**
     * @inheritdoc
     */
    public function next(Job $currentJob = null, array $params)
    {
        // Start first job
        if ($currentJob === null)
        {
            $this->startJob(new MyFirstJob($params, $this));
        }

        if ($currentJob instanceof MyFirstJob)
        {
            $this->startJob(new MySecondJob($params, $this));
        }

        if ($currentJob instanceof MySecondJob)
        {
            if ($this->someCondition($params))
            {
                $this->startJob(new MyThirdJob($params, $this));
            }
        }
    }
}

Вот и все. Остается только заменить запуск MyFirstJob.

Было

$this->dispatch(new MyFirstJob($data));

Стало

(new ProcessDataPipeline())->start($data);

А вместо добавления заданий в остальные очереди вызовем метод next().

Было

$this->dispatch(new MySecondJob($data));

Стало

$this->next($data);

Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.

<?php

namespace App\Jobs;

use App\Jobs\Pipeline\PipelineAbstract;

abstract class Job
{
    /**
     * @param array $params
     */
    public function next(array $params)
    {
        if ($this->pipeline)
        {
            $this->pipeline->next($this, $params);
        }
    }
}

И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода next()) обрабатывались нужной реализацией пайплайна.

class MyFirstJob extends Job
{

    /**
     * @param mixed data
     * @param PipelineAbstract|null $pipeline
     */
    public function __construct($data, PipelineAbstract $pipeline = null)
    {
        $this->data = $data;
        $this->pipeline = $pipeline;
    }
}

Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:

Что хорошего


  • Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
  • Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
  • Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.

Кстати, в свежей версии Laravel появился похожий инструмент withChain(), он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье.

Комментарии (9)


  1. AMaxKaluga
    26.04.2018 11:46

    Идут годы, а ничего не меняется.
    Вспомнил Borland Pascal/C++, потоки сообщений Windows3.1 и самописную подсистему реализацию графической среды со своей системой потоков сообщений и «заимствованными» классами объктов из BP.


  1. iwex
    26.04.2018 16:34

    А не проще было бы просто описать порядок выполнения джобов и не плодить if() {...} ?


    1. mnv Автор
      26.04.2018 16:42

      Вы про метод next()? Не думал как тут можно обойтись без условий. Он вызывается из каждого джоба одинаковым способом. Внутри этого метода надо как-то понять, что делать дальше исходя из того, на какой стадии сейчас находимся. От этого и возникают условия. Тут получается этакая фабрика джобов. На мой взгляд наличие условий в этом методе — не страшно. Но если есть идеи как избавиться тут от if-ов, буду рад.


      1. iwex
        26.04.2018 16:53

        Для начала, какой смысл вызывать один джоб за другим? 1 раз отправили джоб чтобы запрос не висел, а внутри джоба сделать ту же цепочку.
        Если все же всё пихать в джобы- вместо if просто сделать карту что за чем идет:


        protected static $jobsSequence = [
           MyFirstJob::class => MySecondJob::class,
           ...
        ]

        или сделать просто массив с очередью и использовать поиск по массиву + брать след индекс.


        1. mnv Автор
          26.04.2018 17:19

          внутри джоба сделать ту же цепочку

          Я в статье объяснил, почему это оказалось не удобно.
          Насчет карты, можно. Это будет чуть ближе к варианту со штатным withChain(). Это уже дело техники и предпочтений, и зависит от ситуации. В моем случае, как я упоминал выше, есть элементы бизнес логики, иногда надо создавать 1 джоб, иногда сразу группу джобов, с разными входными данными, тут практичнее использовать условия.


  1. L0NGMAN
    27.04.2018 00:47

    Можете привести конкретный пример из жизни где вы используете чаининг джобов?


    1. mnv Автор
      27.04.2018 08:03

      Вот пример. Из внешнего сервиса прилетает новый документ на обработку. Нужно найти в нем ключевые слова, определить их тональность, сделать еще ряд манипуляций. Это делается через очереди. Каждая задача — в своей очереди. И эта цепочка как раз описана в отдельном потомке PipelineAbstract.


      Помимо этого иногда надо повторно обработать старые документы. В этом случае происходит почти то же самое, но немножко по другому. Чтобы не плодить условия внутри джобов или сами джобы, оказалось очень удобно сделать второй наследник PipelineAbstract и там описать процесс повторной обработки материала.


  1. Triazepin
    29.04.2018 13:28

    Вы используете жесткое связывание когда пытаетесь в задачу добавить добавить логику управлением задач. Задача должна делать свою работу, в завершении нужно бросить событие если результат интересует еще кого-то. А в листенерах уже сопоставлять эти события с другими действиями, если надо.


    1. mnv Автор
      29.04.2018 13:42

      Да, можно и так. Но у меня жесткого связывания нет, логику управления задачами я как раз вынес за пределы задач. С листенерами получится примерно так же — надо передавать данные и информацию о том, в рамках какого процесса сейчас идет обработка данных, если нужно, чтобы одну и ту же задачу можно было использовать в разных процессах.