В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.
Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки
dispatch()
, примерно так:class MyFirstJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
$this->dispatch(new MySecondJob($this->data)); // Second task
}
}
А следующая стадия обработки инициировалась совершенно аналогично:
class MySecondJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
if ($this->someCondition($this->data)) {
$this->dispatch(new MyThirdJob($this->data)); // Third task
}
}
}
Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.
Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов
dispatch()
) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.
Управление очередями
У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;
abstract class PipelineAbstract
{
use DispatchesJobs;
/**
* @param array $params
* @return PipelineAbstract
*/
public function start(array $params)
{
$this->next(null, $params);
return $this;
}
/**
* @param Job $currentJob
* @param array $params Set of parameters for starting new jobs
*/
abstract public function next(Job $currentJob = null, array $params);
/**
* @param Job $job
*/
protected function startJob(Job $job)
{
$this->dispatch($job);
}
}
В методе
next()
у нас как раз и будет реализован бизнес процесс. startJob()
— просто обертка над dispatch()
на всякий случай. А start()
будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).Пример реализации бизнес логики:
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;
class ProcessDataPipeline extends PipelineAbstract
{
/**
* @inheritdoc
*/
public function next(Job $currentJob = null, array $params)
{
// Start first job
if ($currentJob === null)
{
$this->startJob(new MyFirstJob($params, $this));
}
if ($currentJob instanceof MyFirstJob)
{
$this->startJob(new MySecondJob($params, $this));
}
if ($currentJob instanceof MySecondJob)
{
if ($this->someCondition($params))
{
$this->startJob(new MyThirdJob($params, $this));
}
}
}
}
Вот и все. Остается только заменить запуск
MyFirstJob
.Было
$this->dispatch(new MyFirstJob($data));
Стало
(new ProcessDataPipeline())->start($data);
А вместо добавления заданий в остальные очереди вызовем метод
next()
.Было
$this->dispatch(new MySecondJob($data));
Стало
$this->next($data);
Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.
<?php
namespace App\Jobs;
use App\Jobs\Pipeline\PipelineAbstract;
abstract class Job
{
/**
* @param array $params
*/
public function next(array $params)
{
if ($this->pipeline)
{
$this->pipeline->next($this, $params);
}
}
}
И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода
next()
) обрабатывались нужной реализацией пайплайна.class MyFirstJob extends Job
{
/**
* @param mixed data
* @param PipelineAbstract|null $pipeline
*/
public function __construct($data, PipelineAbstract $pipeline = null)
{
$this->data = $data;
$this->pipeline = $pipeline;
}
}
Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:
Что хорошего
- Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
- Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
- Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.
Кстати, в свежей версии Laravel появился похожий инструмент
withChain()
, он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье. Комментарии (9)
iwex
26.04.2018 16:34А не проще было бы просто описать порядок выполнения джобов и не плодить if() {...} ?
mnv Автор
26.04.2018 16:42Вы про метод
next()
? Не думал как тут можно обойтись без условий. Он вызывается из каждого джоба одинаковым способом. Внутри этого метода надо как-то понять, что делать дальше исходя из того, на какой стадии сейчас находимся. От этого и возникают условия. Тут получается этакая фабрика джобов. На мой взгляд наличие условий в этом методе — не страшно. Но если есть идеи как избавиться тут от if-ов, буду рад.iwex
26.04.2018 16:53Для начала, какой смысл вызывать один джоб за другим? 1 раз отправили джоб чтобы запрос не висел, а внутри джоба сделать ту же цепочку.
Если все же всё пихать в джобы- вместо if просто сделать карту что за чем идет:
protected static $jobsSequence = [ MyFirstJob::class => MySecondJob::class, ... ]
или сделать просто массив с очередью и использовать поиск по массиву + брать след индекс.
mnv Автор
26.04.2018 17:19внутри джоба сделать ту же цепочку
Я в статье объяснил, почему это оказалось не удобно.
Насчет карты, можно. Это будет чуть ближе к варианту со штатнымwithChain()
. Это уже дело техники и предпочтений, и зависит от ситуации. В моем случае, как я упоминал выше, есть элементы бизнес логики, иногда надо создавать 1 джоб, иногда сразу группу джобов, с разными входными данными, тут практичнее использовать условия.
L0NGMAN
27.04.2018 00:47Можете привести конкретный пример из жизни где вы используете чаининг джобов?
mnv Автор
27.04.2018 08:03Вот пример. Из внешнего сервиса прилетает новый документ на обработку. Нужно найти в нем ключевые слова, определить их тональность, сделать еще ряд манипуляций. Это делается через очереди. Каждая задача — в своей очереди. И эта цепочка как раз описана в отдельном потомке
PipelineAbstract
.
Помимо этого иногда надо повторно обработать старые документы. В этом случае происходит почти то же самое, но немножко по другому. Чтобы не плодить условия внутри джобов или сами джобы, оказалось очень удобно сделать второй наследник
PipelineAbstract
и там описать процесс повторной обработки материала.
Triazepin
29.04.2018 13:28Вы используете жесткое связывание когда пытаетесь в задачу добавить добавить логику управлением задач. Задача должна делать свою работу, в завершении нужно бросить событие если результат интересует еще кого-то. А в листенерах уже сопоставлять эти события с другими действиями, если надо.
mnv Автор
29.04.2018 13:42Да, можно и так. Но у меня жесткого связывания нет, логику управления задачами я как раз вынес за пределы задач. С листенерами получится примерно так же — надо передавать данные и информацию о том, в рамках какого процесса сейчас идет обработка данных, если нужно, чтобы одну и ту же задачу можно было использовать в разных процессах.
AMaxKaluga
Идут годы, а ничего не меняется.
Вспомнил Borland Pascal/C++, потоки сообщений Windows3.1 и самописную подсистему реализацию графической среды со своей системой потоков сообщений и «заимствованными» классами объктов из BP.