(В оригинальном PostgreSQL есть ошибка в PL/pgSQL, из-за которой мой планировщик некорректно работает, когда в задаче, написанной на PL/pgSQL, возникает неперехваченное исключение. Я описал эту ошибку здесь и исправил у себя в форке тут.)
Для установки планировщика не требуется создавать расширение в (каждой) базе данных. Вместо этого просто добавляем его в конфигурационный файл
shared_preload_libraries = 'pg_task'
После перезапуска PostgreSQL планировщик создаст во всех базах данных таблицы task от имени пользователей баз данных и в схемах по-умолчанию для этих пользователей.
Если нужно, чтобы планировщик запускался только для определённых баз данных, то можно задать их в конфигурационном файле
pg_task.database = 'database1,database2'
Если же требуется запускать планировщик не от пользователей баз данных, то это тоже можно задать так
pg_task.database = 'database1:user1,database2:user2'
Если нужно создавать таблицы планировщика не в схеме по-умолчанию для пользователей, то можно задать это так
pg_task_schema.database1 = schema3
Если же требуется и таблицу планировщика назвать по-другому, то это можно сделать так
pg_task_table.database1 = table3
По умолчанию планировщик проверяет задачи каждые 1000 мс, но это можно изменить так
pg_task_period.database1 = 100
pg_task_period.database2 = 10
Итак, планировщик создает (если ещё не создано) (схему, если нужно, и) таблицу задач с такими колонками
id BIGSERIAL NOT NULL PRIMARY KEY, -- идентификатор, первичный ключ
dt TIMESTAMP NOT NULL DEFAULT NOW(), -- планируемое время запуска задачи (по-умолчанию - как можно быстрее)
start TIMESTAMP, -- фактическое время запуска задачи
stop TIMESTAMP, -- фактическое время окончания выполнения задачи
queue TEXT NOT NULL DEFAULT 'default', -- очередь выполнения задачи (нужна для ограничения количества одновременно выполняемых задач)
max INT, -- максимальное количество одновременно выполняемых задач в очереди (также, позволяет задавать приоритет выполнения конкретной задачи)
pid INT, -- идентификатор процесса, который выполняет задачу
request TEXT NOT NULL, -- собственно SQL текст задачи
response TEXT, -- результат выполнения задачи
state TEXT NOT NULL DEFAULT 'QUEUE', -- состояние задачи (по-умолчанию - запланирована, также может быть выполнена, остановлена, ...)
timeout INTERVAL, -- позволяет ограничивать время выполнения задачи
delete BOOLEAN NOT NULL DEFAULT false, -- удалять задачу автоматически после выполнения, если нет результата
repeat INTERVAL, -- интервал повторения задачи
drift BOOLEAN NOT NULL DEFAULT true -- предотвращать ли дрейф времени при повторе задачи
На самом деле, планировщик запускает несколько фоновых рабочих процессов: один для отслеживания изменений в конфигурационном файле и запуске/остановке при необходимости остальных фоновых процессов планировщика. И по одному фоновому рабочему процессу для каждой базы для проверки запланированных задач в каждой базе и запуске при необходимости выполнения задач.
Например, если мы хотим выполнить задачу как можно быстрее, то выполняем SQL команду
INSERT INTO task (request) VALUES ('SELECT now()')
Результат выполнения задачи планировщик запишет в колонку результата в текстовом виде. Если в результате выполнения задачи будет несколько колонок, то планировщик добавит их в шапку вместе с типами колонок. Также, в результате выполнения задачи может быть и несколько строк, все они добавятся в колонку результата.
Если мы хотим выполнить задачу, например, через 5 минут, то запишем планируемое время в соответствующую колонку
INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()')
а если мы хотим, чтобы задача выполнилась в конкретное время, то так и запишем
INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()')
Если нам требуется, чтобы задача выполнялась каждые 5 минут, то пишем так
INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()')
если же написать так
INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false)
то повторение задачи будет запускаться через 5 минут после окончания выполнения задачи (а не после запланированного времени, как по умолчанию).
При возникновении исключения при выполнении задачи, оно перехватывается и в текстовом виде записывается в колонку результата, а задаче присваивается соответствующее состояние.
Например
INSERT INTO task (request) VALUES ('SELECT 1/0')
Если нужно, чтобы для какой-то очереди задач одновременно выполнялось не более 2х задач, то создаём задачи командой
INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()')
Пусть у нас в этой очереди накопилось очень много задач и они одновременно выполняются по 2. Если создать задачу командой
INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()')
то она выполнится как можно раньше всех остальных задач в этой очереди, т.е. это ещё что-то типа приоритета
Комментарии (10)
ALepikhov
20.06.2019 05:47Полагаю, шансы этого патча близки к нулю, поскольку сообщество не любит добавлять в СУБД функциональность, которую можно реализовать другими способами или через внешние инструменты.
К тому же не очевидно, что планировщик задач должен быть реализован на уровне СУБД и потенциально снижать надежность оной.
Melkij
20.06.2019 21:34Как мило, что заявив «пока не удалось пропихнуть в оригинальный репозиторий» вы всё-таки отправили патч в pgsql-hackers. После публикации статьи.
Патч принят не будет. Потому что, процитирую здесь ответ Тома:
The complaint in bug #15738 is 100% bogus
Нельзя так просто в PG_CATCH сделать что-то и не сделать PG_RE_THROW. Чтож, мне за такую попытку тоже уже по голове настучали недавно. См. обсуждения например здесь, здесьRekGRpth Автор
21.06.2019 05:54Нельзя так просто в PG_CATCH сделать что-то и не сделать PG_RE_THROW.
Почему нельзя? И где об этом написано?Вот, например код из оригинального репозиторияvoid plperl_spi_commit(void) { MemoryContext oldcontext = CurrentMemoryContext; PG_TRY(); { SPI_commit(); SPI_start_transaction(); } PG_CATCH(); { ErrorData *edata; /* Save error info */ MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); FlushErrorState(); /* Punt the error to Perl */ croak_cstr(edata->message); } PG_END_TRY(); } void plperl_spi_rollback(void) { MemoryContext oldcontext = CurrentMemoryContext; PG_TRY(); { SPI_rollback(); SPI_start_transaction(); } PG_CATCH(); { ErrorData *edata; /* Save error info */ MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); FlushErrorState(); /* Punt the error to Perl */ croak_cstr(edata->message); } PG_END_TRY(); }
Melkij
21.06.2019 07:56Спросите ответным письмом в hackers. Мол, «я смотрел код в plperl и там сделано так. Это неверно?».
RekGRpth Автор
21.06.2019 06:06Как мило, что заявив «пока не удалось пропихнуть в оригинальный репозиторий» вы всё-таки отправили патч в pgsql-hackers. После публикации статьи.
Я сообщил об ошибке ещё в начале апреля, но т.к. никто даже ничего и не ответил, то я даже и не пытался предложить им патч, а сделал это после вопроса об этом.Melkij
21.06.2019 08:06Именно что «даже не пытался предложить патч» совершенно не вяжется с фразой «пока не удалось пропихнуть». Написали бы что описали баг здесь и локально возможно исправили так — претензии бы не было. А говорить что не удалось пропихнуть патч который даже не отправляли — явно некорректно.
Да, в -bugs могут не ответить. Спустя некоторое время можно вежливо переспросить, мол вопрос остался без внимания, был бы кто-нибудь столь любезен прокомментировать, может я делаю что-то не то и это ожидаемое поведение кода.
Tantrido
А патч будет принят в основную ветку?
TyVik
Зная сообщество Postgres, не раньше чем через пару лет. В лучшем случае.
Это не упрек комьюнити — просто все патчи долго проверяются, обкатываются и тестируются для надёжности.