Зачем это понадобилось?

Есть приложение A , которое крутится где-то там, и к которому у меня нет доступа. Это приложение использует API реализованное с помощью хранимых процедур. Есть приложение B, которое настраивает данные для приложения А, а также строит отчёты по работе приложения А. К приложению B и хранимым процедурам у меня доступ есть.

Со временем приложение B обрасло телеграм-ботом. Получать оповещения в режиме реального времени всем понравилось, и тут поступила хотелка: когда в приложении А происходит событие X, оповещать пользователей приложения B через телеграм-бота.

Задача была бы несложной, если можно было бы договориться о том, чтобы приложение A уведомляло приложение B о наступлении события X.

Но договориться не удалось.

Анализ системы показал, что при наступлении события X, приложение A вызывает хранимую процедуру Y. Немного подумав, я решил запустить задание (Job) в приложении B, банально вставив строчку в таблицу jobs. А уж это задание отправит оповещение через телеграм-бота.

Что нужно для успеха?

Драйвер очереди должен быть выбран, как database. Должны быть настроены и запущены воркеры.

Реализация

Лучше один раз увидеть и пощупать, чем сто раз прочитать теорию, поэтому реализацию описываю в виде туториала для тестового приложения.

Кто считает это излишним, переходите сразу к этому разделу. Вернуться к началу никогда не поздно.

К сожалению, элегантного решения для общего случая я не нашёл, задача решается «в лоб» и для каждого задания решение будет уникальным.

Но общую идею я опишу.

Наше тестовое приложение будет отправлять письмо и эта отправка будет инициирована хранимой процедурой.

Создание тестового приложения

composer create-project laravel/laravel pgsql_job
cd pgsql_job
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache

Настройте соединение с базой данных. Если базы нет, можно запустить

docker-контейнер
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5432:5432 --name pgsql postgres

После этого обращаться к базе. Настройки в .env

DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5432
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test

В качестве драйвера очереди укажите database (QUEUE_CONNECTION=database).

Опыты будем ставить на отправке писем. Чтобы не настраивать соединение, поставьте MAIL_MAILER=log. Пусть письма падают в логи.

Создание уведомления

make:php artisan make:notification TestNotify

Заходим в app/Notifications/TestNotify.php и изменяем конструктор

    public function __construct(public $subject, public $id)
    {
        //
    }

и метод toMail()

    public function toMail(object $notifiable): MailMessage
    {
        return (new MailMessage)
            ->subject($this->subject)
            ->line('ID: ' . $this->id);
    }
В результате должно получиться
<?php

namespace App\Notifications;

use Illuminate\Bus\Queueable;
use Illuminate\Notifications\Messages\MailMessage;
use Illuminate\Notifications\Notification;

class TestNotify extends Notification
{
    use Queueable;

    public function __construct(public $subject, public $id)
    {
        //
    }

    public function via(object $notifiable): array
    {
        return ['mail'];
    }

    public function toMail(object $notifiable): MailMessage
    {
        return (new MailMessage)
            ->subject($this->subject)
            ->line('ID: ' . $this->id);
    }

    public function toArray(object $notifiable): array
    {
        return [
            //
        ];
    }
}

Давайте проверим, работает ли наше уведомление.

Заходим в tinker

php artisan tinker

После появления приглашения вводим код

 use App\Notifications\TestNotify;
 Notification::route('mail', 'test@mail.ru')->notify(new TestNotify('Test Subject', 123456));
Смотрим в лог

Отлично! Всё на месте. И тема письма, и ID и почтовый адрес. Лог можно удалить, чтобы не искать факт повторной отправки письма.

Создание задания

php artisan make:job TestJob

Заходим в app/Jobs/TestJob.php и изменяем конструктор

    public function __construct(public $email, public $subject, public $id)
    {
        //
    }

и метод handle()

    public function handle(): void
    {
        Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
    }
В результате должно получиться
<?php

namespace App\Jobs;

use App\Notifications\TestNotify;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Notification;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(public $email, public $subject, public $id)
    {
        //
    }

    public function handle(): void
    {
        Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
    }
}

Временно настроим драйвер очереди на sync (QUEUE_CONNECTION=sync) и проверим работу нашего задания в tinker. Если вы ещё из него не вышли — выйдите. Запущенный tinker ничего не знает о нашем новом задании. Заходим в tinker и после появления приглашения вводим

use App\Jobs\TestJob;
\Bus::dispatch(new TestJob('tset@liam.ur', 'Subject Test', 654321));

Почему так странно отправили задание? Ответ находится здесь.

Смотрим в лог

Опять всё на месте. Удаляем лог, меняем драйвер очереди на database.

На этом этапе у нас есть уведомление и задание, которое это уведомление отправляет. Настала пора «засунуть» отправку задания в хранимую функцию. Но для начала поймём, как должна выглядеть запись о постановке задания в очередь.

Декодирование записи о задании из таблицы jobs

Драйвер очереди переключили? Выйдите и войдите в тинкер. Иначе тинкер ничего не будет знать об этом переключении. Давайте ещё раз с помощью тинкер отправим задание в очередь. В таблице jobs должна появиться запись.

Запись из таблицы jobs
Запись из таблицы jobs

Поля available_at и created_at равны результату выполнения функции time() на момент вставки записи в таблицу. Это количество секунд, прошедших с начала эпохи Unix (1 января 1970 00:00:00 GMT) до текущего времени.

Поле payload содержит в себе json, с которым мы сейчас разберёмся.

Заходим в тинкер, выполняем

json_decode(\DB::table('jobs')->first()->payload);

Результат выполнения

> json_decode(\DB::table('jobs')->first()->payload);
= {#6275
    +"uuid": "0c2c7167-804a-4bed-9991-8827571f6be8",
    +"displayName": "App\Jobs\TestJob",
    +"job": "Illuminate\Queue\CallQueuedHandler@call",
    +"maxTries": null,
    +"maxExceptions": null,
    +"failOnTimeout": false,
    +"backoff": null,
    +"timeout": null,
    +"retryUntil": null,
    +"data": {#6298
      +"commandName": "App\Jobs\TestJob",
      +"command": "O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}",
    },
  }

Собственно, создать такой json в PostgreSQL почти не проблема. Единственное затруднение — это содержимое data->command, в котором хранится сериализованный объект App\Jobs\TestJob

Подаём команду в tinker

unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);

Получаем

> unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
= App\Jobs\TestJob {#6318  
    +email: "tset@liam.ur",
    +subject: "Subject Test",
    +id: 654321,
    +job: null,
    +connection: null,
    +queue: null,
    +chainConnection: null,
    +chainQueue: null,
    +chainCatchCallbacks: null,
    +delay: null,
    +afterCommit: null,
    +middleware: [],
    +chained: [],
  }

Проблема в том, что PostgreSQL ничего не знает про объекты PHP и об алгоритме сериализации. Поэтому я принял решение, работать с этим значением, как со строкой.

Теперь можно приступать к работе с PostgreSQL

Вставка записи о задании в таблицу jobs средствами PostgreSQL

Чтобы вставить запись о задании в таблицу jobs нам нужно:

  • сгенерировать uuid

  • сосчитать количество секунд, прошедших с начала эпохи Unix (1 января 1970 00:00:00 GMT) до текущего времени

  • сформировать json, в который следует вставить необходимые для выполнения задания данные

Учим PostgreSQL генерировать uuid

Чтобы сгенерировать uuid в PostgreSQL нужны функции, содержащиеся в модуле uuid-ossp. Зачастую этот модуль отключён и его нужно включить. Я покажу, как проверить, подключены ли нужные функции на примере докер-контейнера, описанного выше.

Подключаюсь к запущенному контейнеру

docker container exec -it pgsql bash

В контейнере подключаюсь к терминальному клиенту для работы с PostgreSQL — psql

# psql -U test

Проверяю, какие функции подключены

test=# \df
                       List of functions
 Schema | Name | Result data type | Argument data types | Type
--------+------+------------------+---------------------+------
(0 rows)

Видно, что функций для работы с uuid нет. Пробуем их добавить.

test=# CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION
test-# \df
                                 List of functions
 Schema |        Name        | Result data type |    Argument data types    | Type
--------+--------------------+------------------+---------------------------+------
 public | uuid_generate_v1   | uuid             |                           | func
 public | uuid_generate_v1mc | uuid             |                           | func
 public | uuid_generate_v3   | uuid             | namespace uuid, name text | func
 public | uuid_generate_v4   | uuid             |                           | func
 public | uuid_generate_v5   | uuid             | namespace uuid, name text | func
 public | uuid_nil           | uuid             |                           | func
 public | uuid_ns_dns        | uuid             |                           | func
 public | uuid_ns_oid        | uuid             |                           | func
 public | uuid_ns_url        | uuid             |                           | func
 public | uuid_ns_x500       | uuid             |                           | func
(10 rows)

Нам нужна функция uuid_generate_v4. Пробуем ею воспользоваться.

test=# select uuid_generate_v4();
           uuid_generate_v4
--------------------------------------
 8a7a8be0-e4cd-4c0a-8275-dcf2ed58c5a9
(1 row)

Отлично, работает!

Если в вашем случае функции не добавились, обратитесь к документации по PostgreSQL.

В прошлом этот модуль зависел от библиотеки OSSP UUID, что отразилось в его имени. Хотя библиотеку OSSP UUID всё ещё можно найти по адресу http://www.ossp.org/pkg/lib/uuid/, она плохо поддерживается и её становится всё сложнее портировать на новые платформы. Поэтому модуль uuid-ossp теперь на некоторых платформах можно собирать без библиотеки OSSP. Во FreeBSD и некоторых других ОС на базе BSD подходящие функции формирования UUID включены в системную библиотеку libc. В Linux, macOS и некоторых других платформах подходящие функции предоставляются библиотекой libuuid, которая изначально пришла из проекта e2fsprogs (хотя в современных дистрибутивах Linux она является частью пакета util-linux-ng). Вызывая configure, передайте ключ --with-uuid=bsd, чтобы использовать функции BSD, либо --with-uuid=e2fs, чтобы использовать libuuid из e2fsprogs, либо ключ --with-uuid=ossp, чтобы использовать библиотеку OSSP UUID. В конкретной системе может быть установлено сразу несколько библиотек, поэтому configure не выбирает библиотеку автоматически.

Учим PostgreSQL считать секунды

В PostgreSQL отсутствует функция, аналогичная функции time() в PHP. Поэтому получить нужное количество секунд можно с помощью такой конструкции

test=# select extract(epoch from current_timestamp)::integer;
  extract
------------
 1697631498
(1 row)

Собираем json

Самое простое, взять за основу уже готовую запись валидного задания. Берём за основу результат поданной в тинкер команды: json_decode(\DB::table('jobs')->first()->payload) и на основе полученного результата собираем json.

select json_build_object(
               'uuid', uuid_generate_v4(),
               'displayName', 'App\Jobs\TestJob',
               'job', 'Illuminate\Queue\CallQueuedHandler@call',
               'maxTries', null,
               'maxExceptions', null,
               'failOnTimeout', false,
               'backoff', null,
               'timeout', null,
               'retryUntil', null,
               'data', (json_build_object('commandName', 'App\Jobs\TestJob',
                                          'command', 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}'))
       );

Обратите внимание, что вместо передачи известного uuid генерируется новый.

Данный запрос возвращает валидный payload для задачи, но нам нужно не просто поставить в очередь задачу, а передать ей данные.

Разберём поле command. Я передаю этот параметр строкой. В сериализованной строке передаётся тип данных и их размерность, если это требуется. Мы передаём в задачу три параметра: два строковых — email и subject и один целочисленный. Который отображается в теле письма. Найдём эти параметры в строке.

email: s:12:"tset@liam.ur". s — строка, 12 — длина строки, "tset@liam.ur" — содержимое строки, экранированное двойными кавычками.

Аналогично для subject.

В случае id: i:654321 i — целое число, 654321 — его значение.

Таким образом, надо подменить следующие параметры: размерность и содержимое.

Следует помнить, что в PostgreSQL с вычислением длины строк, закодированных в utf-8, ровно такая же беда, как и в PHP. Поэтому длину строковых переменных следует вычислять с помощью функции octet_length();

Часть функции, подменяющей параметры

_command = 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:' ||
               octet_length(_email) || ':"' ||
               _email ||'";s:7:"subject";s:'||
               octet_length(_subject) ||':"' ||
               _subject || '";s:2:"id";i:' || _id::varchar || ';}';

Функция PostgreSQL

Создаём миграцию

php artisan make:migration create_set_job_function
Содержимое миграции
<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        $sql = <<<HERE
create function set_job(_email character varying, _subject character varying, _id integer) returns void
    language plpgsql
as
$$
declare
    _time int;
    _payload json;
    _command varchar;

begin
    _time := extract(epoch from current_timestamp)::integer;
    _command = 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:' ||
               octet_length(_email) || ':"' ||
               _email ||'";s:7:"subject";s:'||
               octet_length(_subject) ||':"' ||
               _subject || '";s:2:"id";i:' || _id::varchar || ';}';
    _payload = json_build_object(
            'uuid', uuid_generate_v4(),
            'displayName', 'App\Jobs\TestJob',
            'job', 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries', null,
            'maxExceptions', null,
            'failOnTimeout', false,
            'backoff', null,
            'timeout', null,
            'retryUntil', null,
            'data', (json_build_object('commandName', 'App\Jobs\TestJob',
                                       'command', _command)));
    insert into jobs(queue, payload, attempts, reserved_at, available_at, created_at)
        values ('default', _payload, 0, null, _time, _time);
end;
$$;

alter function set_job(varchar, varchar, integer) owner to test;
HERE;
        DB::unprepared($sql);
    }

    public function down(): void
    {
        DB::unprepared('drop function if exists set_job');
    }
};

Выполняем миграцию

php artisan migrate 

Переходим в терминал контейнера и пробуем вызвать эту функцию

test=# select set_job('e@mail.ru', 'Кирилица в заголовке', 9876);
 set_job
---------

(1 row)

В терминале запускаем воркер

php artisan queue:work --once
И смотрим в лог

Всё на месте.

Эпилог

Вот таким нехитрым способом можно заставить Laravel реагировать на события, происходящие в базе данных: хранимых функциях и триггерах.

Комментарии (0)