Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.
Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.
Ограничения применимости
Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.
Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.
Суть метода в пяти словах
select ... for update skip locked
Базовая очередь
Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.
Таблица для простейшей очереди содержит саму задачу и её статус:
create table task
(
id bigint not null primary key,
status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена
);
create index task__status__idx on task (status);
Добавление задачи:
insert into task (id) values ($1) on conflict (id) do nothing;
Получение следующей задачи:
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи:
update task
set
status = 2
where id = $1;
Очередь с приоритетами
В простом случае id
задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id
с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id)
.
Либо для приоритета добавляется отдельный столбец:
create table task
(
id bigint not null primary key,
priority integer not null,
status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена
);
create index task__status__priority__idx on task (status, priority);
insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;
with next_task as (
select id from task
where status = 0
order by priority
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Выделенный столбец позволяет менять приоритет задачи "на лету".
Очередь с повтором "упавших" задач
В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.
create table task
(
id bigint not null primary key,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
attempt integer not null default 0,
delayed_to timestamp null,
error_text text null
);
create index task__status__delayed_to__idx on task (status, delayed_to);
Как видно, расширился список статусов и добавились новые столбцы:
attempt
— номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на10 * attempt
минут);delayed_to
— время следующей попытки выполнения задачи;error_text
— текст ошибки.
Текст ошибки нужен для группировки по типам ошибки.
Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом "ошибка". Выполняем запрос:
select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;
За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
delayed_to = null,
error_text = null
from next_task
where task.id = next_task.id
returning task.id;
with next_task as (
select id from task
where status = 3
and delayed_to < localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
delayed_to = null,
error_text = null
from next_task
where task.id = next_task.id
returning task.id;
update task
set
status = 2,
delayed_to = null,
error_text = null
where id = $1;
update task
set
status = 3,
delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
error_text = $2
where id = $1;
update task
set
status = 4,
delayed_to = null,
error_text = $2
where id = $1;
Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or
может очень плохо сочетаться с сортировкой order by
.
Сбор метрик
Добавляем такие атрибуты:
- время создания задачи;
- время изменения задачи;
- время начала и завершения выполнения задачи.
create table task
(
id bigint not null primary key,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
attempt integer not null default 0,
begin_time timestamp null,
end_time timestamp null,
delayed_to timestamp null,
error_text text null,
created timestamp not null default localtimestamp,
updated timestamp not null default localtimestamp
);
create index task__status__delayed_to__idx on task (status, delayed_to);
create index task__updated__idx on task (updated);
Учитываем добавленные столбцы во всех запросах.
with next_task as (
select id from task
where status = 0
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
begin_time = localtimestamp,
end_time = null,
delayed_to = null,
error_text = null,
updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;
with next_task as (
select id from task
where status = 3
and delayed_to < localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1,
attempt = attempt + 1,
begin_time = localtimestamp,
end_time = null,
delayed_to = null,
error_text = null,
updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;
update task
set
status = 2,
end_time = localtimestamp,
delayed_to = null,
error_text = null,
updated = localtimestamp
where id = $1;
update task
set
status = 3,
end_time = localtimestamp,
delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
error_text = $2,
updated = localtimestamp
where id = $1;
update task
set
status = 4,
end_time = localtimestamp,
delayed_to = null,
error_text = $2,
updated = localtimestamp
where id = $1;
Примеры, для чего это может быть нужно
Поиск и перезапуск повисших задач:
update task
set
status = 3,
end_time = localtimestamp,
delayed_to = localtimestamp,
error_text = 'hanged',
updated = localtimestamp
where status = 1
and updated < localtimestamp - interval '1 hour';
Удаление старых задач:
delete from task
where updated < localtimestamp - interval '30 days';
Статистика по выполнению задач:
select
date_trunc('hour', end_time),
count(*),
sum(end_time - begin_time),
avg(end_time - begin_time)
from task
where status = 2
and end_time >= '2019-12-16'
group by 1
order by 1;
Повторный запуск ранее выполненных задач
Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.
create table task
(
id bigint not null primary key,
task_updated_at timestamp not null default localtimstamp,
status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
begin_time timestamp null,
end_time timestamp null,
delayed_to timestamp null,
error_text text null,
created timestamp not null default localtimestamp,
updated timestamp not null default localtimestamp
);
Здесь для времени обновления задачи добавлен столбец task_updated_at
, но можно было бы использовать поле created
.
Добавление или обновление (перезапуск) задачи:
insert into task (id, task_updated_at) values ($1, $2)
on conflict (id) do update
set
task_updated_at = excluded.task_updated_at,
status = case when status = 1 then 1 else 0 end,
delayed_to = null,
error_text = null,
updated = localtimestamp
where task_updated_at < excluded.task_updated_at;
Что здесь происходит. Задача становится "новой", если она сейчас не исполняется.
В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.
Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.
Успешное завершение задачи:
update task
set
status = case when begin_time >= updated then 2 else 0 end,
end_time = localtimestamp,
delayed_to = null,
error_text = null,
updated = localtimestamp
where id = $1;
Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус "новая".
Pipeline
Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.
Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.
create table task
(
id bigint not null primary key,
stage integer not null default 0,
status integer not null default 0
);
create index task__stage__status__idx on task (stage, status);
Получение следующей задачи на заданной стадии:
with next_task as (
select id from task
where stage = $1
and status = 0
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи с переходом на указанную стадию:
update task
set
stage = $2,
status = 2
where id = $1;
Или переход на следующую по порядку стадию:
update task
set
stage = stage + 1,
status = 2
where id = $1;
Задачи по расписанию
Это вариация очереди с повтором.
У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).
create table task
(
id bigint not null primary key,
period integer not null, -- периодичность запуска в секундах
status integer not null default 0, -- 0 - новая, 1 - в работе
next_run_time timestamp not null default localtimestamp
);
create index task__status__next_run_time__idx on task (status, next_run_time);
Добавление задачи:
insert into task (id, period, next_run_time) values ($1, $2, $3);
Получение следующей задачи:
with next_task as (
select id from task
where status = 0
and next_run_time <= localtimestamp
limit 1
for update skip locked
)
update task
set
status = 1
from next_task
where task.id = next_task.id
returning task.id;
Завершение задачи и планирование следующего запуска:
update task
set
status = 0,
next_run_time = next_run_time + make_interval(secs => period)
where id = $1
Вместо заключения
В создании специализированной очереди задач средствами РСУБД нет ничего сложного.
"Самопальная" очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.
Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.
Комментарии (55)
BugM
23.12.2019 02:35Флап соединения с БД и приехали. Соседний воркер берет ту же задачу и начинает выполнять.
Нужно добавлять приличное количество костылей для защиты от таких ситуаций.Varim
23.12.2019 03:08не знаю что такое «флап», но что по вашему делает следующий запрос?
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Если поток упал, то задача так и будет в статусе «в работе».BugM
23.12.2019 10:20С такой реализацией при попадании соединения или при обычном падении воркера таск залипнет. И костыли нужны для разруливания этой ситуации.
А в ситуации select for update и ничего не апдейтим пока таск выполняется будут дубли при пропадании соединения. И тоже костыли нужны.
Проблема всех простых очередей именно в таких ситуациях. Разруливание всех ситуаций когда что-то идет не так.
Varim
23.12.2019 15:51будут дубли при пропадании соединения
А что значит при пропадании соединения? Ведь если не закоммитим и пропадет соединение то БД просто сделает ролбек, а воркер не сможет сделать коммит или я в чем то неправ?BugM
23.12.2019 16:00Таск перевели в статус запущен. Закомитили. А в комплит перевести не смогли. Ошибка сети, ошибка БД, ошибка воркера, любая другая ошибка. Теперь статус таска неизвестен (вечный работает) и что с ним делать непонятно.
Если не комитить а держать открытую транщакцию, то при падении сессии в БД и соотвественно транзакции другой воркер возьмет этот таск и будет выполнять. Что при это будет делать первый воркер непонятно.
alekciy
23.12.2019 17:25Если все происходит в рамках одного сервера, то можно в базу писать PID процесса и по нему отлавливать грохнувшийся задачи и их перезапускать. Главно при реализации не забыть, что PID является переиспользуемым и чисто проверка на PID может вызывать проблемы.
apapacy
23.12.2019 13:35Обычно для задач устанавливается и таймаут для отслеживания аварийно завершившихся тасков (которые не смогли перевести себя в статус=3 в статусах автора статьи). При этом начинаются проблемы когда таск может быть равно как и долгоиграющим так и зависшим.
pin2t
23.12.2019 09:11Не нужно никаких костылей. Нужно просто выучить как работают транзакции в СУБД. Все там нормально будет, соседний воркер не получит ту же задачу
gleb_l
23.12.2019 10:42Нужно просто понимать, что данный набор методов *не может* идти под сквозной транзакцией от получения задачи до отметки о ее выполнении — иначе все механизмы, связанные с очередью, станут однозадачными.
Вся идея такой реализации очереди — в том, что используются не внешние транзакции, а встроенная атомарность SQL-движка, и в результате реализуется механизм двухфазных транзакций:
1) задание атомарно достается из очереди и отмечается, как взятое без всяких внешних транзакций
2) в течение времени, не превышающем время таймаута поток-обработчик задания его выполняет. Обработчик может использовать транзакции, а может и нет — это зависит от его логики обработки. Главное, чтобы действия, связанные с манипуляциями с очередью, либо не использовали внешних транзакций, либо совершались в транзакции с минимальным количеством затронутых объектов БД
3) поток-обработчик атомарно отмечает задание, как выполненное. Здесь тоже должна использоваться встроенная атомарность, чтобы не было необходимости во внешней транзакции. Как-то так
apapacy
23.12.2019 03:01Я недавно создал пост https://habr.com/ru/post/458608/ о нескольких реализациях job queue.
Вцелом хорошо что Вы рассматриваете разные типы задач, в том числе и задачи выполняемые по расписанию. А также хорошо что предусматриваете повторное выполнение задач в случае ошибки.
По опыту тестирования некоторых готовых библиотек, в частности agenda, которая основана также на базе данных (mongodb) могу сказать что решение с тяжелой базой данных существенно медленнее чем решение основанные например на redis. Но даже не это главное.Та же agenda, которая по идее должна работать медленно но стабильно в реальности после какого-то предела просто останавливается и перестает выбирать задачи из очереди. Я это говорю к тому, что в боевых условиях умозрительные построения могут начать работать не так как задумывал их автор.
Так же у очередей заданий желательно иметь масштабирующий и "сглаживающий" функционал, то есть возможность запускать несколько воркеров на выполнение одной задачи. А также на сглаживание "пиков" задерживая выполнение очередной задачи пока не завершаться текущие задачи.
resetme
23.12.2019 05:54Проблема в том что постоянно надо пуллить и смотреть появились ли задачи в очереди. Вот бы как-нибудь сделать так чтобы запускать задачу только при наличии задачи в очереди без постоянного опроса ее. Такое можно как-нибудь сделать?
DmitryKoterov
23.12.2019 07:11Поллинг обычно не проблема, если задач в очереди много. Т.е. за время обработки предыдущих выбранных задач данным воркером накапливаются новые, которые он счастливо и поллит.
edo1h
23.12.2019 06:04у постгреса есть механизм LISTEN/NOTIFY, очень удобен для малоактивных очередей (обработчик не должен постоянно опрашивать сервер в ожидании задания).
create index task__status__priority__idx on task (status, priority);
иногда бывает полезно делать вот так:
create index task__status0__priority__idx on task (priority) where status=0;
alekciy
23.12.2019 08:53Есть, но как PG запустит приложение?
edo1h
23.12.2019 12:41речь не про это.
вот у вас есть воркер, которые ждёт задачу. а очередь пуста. что остаётся делать воркеру? периодически слать
select bla-bla-bla
в ожидании задания.
будешь слать часто — ненужная нагрузка на сервер. редко — время реакции на новое задание неприемлемо высоко.
LISTEN/NOTIFY как раз и решает эту проблему — воркер делает LISTEN и ждёт.
resetme
23.12.2019 17:52А есть ли вариант послать NOTIFY из триггера на вставку строки и насколько это рабочий вариант?
alekciy
23.12.2019 18:09Нормальный это вариант. Работает. Но только для закомиченых транзакций (т.е. не сработает пока не будет COMMIT). Если слушателя при этом нет (отвалился), то ошибки не будет, но событие потеряется, т.е. PG в себе их не хранит.
DmitryKoterov
23.12.2019 07:15Вы, кстати, зря про то, что этот способ плохо масштабируется. Нормально он масштабируется — вернее, упирается только в одновременное кол-во коннектов к бд (которых можно поставить и 5000 штук для таких простых запросов). Выгребать очередь в 5000 воркеров — это очень и очень неплохо.
Громадный плюс данного подхода (вернее, не в точности его, а когда блокировка строки не отпускается воркером при захвате задачи — так тоже можно сделать) в том, что при смерти процесса-воркера (kill -9) его коннекшены автоматом закрываются, и все блокировки отпускаются. Т.е. у данного решения нет проблемы, когда одну и ту же задачу могут взять два воркера — а значит, не нужно никакого геморроя с heartbeat-ами и т.д. (Естественно, при условии, что воркер существенно использует тот же самый коннекшн в своей работе.)
И еще одно. Можно же иметь не одну таблицу, а 10. Хоть бы и партицированную. И натравливать N воркеров не на одну таблицу, а на 10. Это практически линейно все отмасштабирует (потому что для субд все эти запросы очень просты, она их как орешки щелкает, они все попадают в индекс).DmitryKoterov
23.12.2019 07:25Да, еще вот https://cadenceworkflow.io — на нем весь Uber держится, и там очереди именно так реализованы (правда, на MySQL, но сути не меняет).
IvanVakhrushev
23.12.2019 10:24Извините, но когда вы пишете про 5000 коннектов к БД, вы понимаете, как PG устроен под капотом?
Держать блокировку воркером на протяжении всего времени выполнения задачи — это совсем не масштабируемое решение. Для использования в продакшене оно мало подходит.
А если отпускать блокировку после захвата (как правильно делать), то нужно очень аккуратно реализовывать перезапуск\смену статуса у тасок, что, собственно, обсуждается в треде выше.DmitryKoterov
23.12.2019 11:12+15000 коннектов — это, например, дефолтный лимит в AWS Aurora Postgres. 5000 практически ничего не делающих процессов — не так и много: если каждый съест по 10М, то получится 50Г в памяти — по современным меркам приемлемо. (Но, опять же, 5000 — очень большая цифра, если блокировки отпускать, то коннектов потребуется на 1-2 порядка меньше.)
Насчет перезапуска тасков — это очень тонкий момент, нужно 10 раз подумать, действительно ли так уж много заданий и нельзя обойтись долгими сессионными блокировками (pg_try_advisory_lock). Работает воркер — захватил блокировку, умер — его коннект тоже умер, блокировка снялась, новый воркер может подхватить таску. (Это, правда, выходит за рамки данной статьи.) Реально очень сильно облегчает жизнь, особенно если данные, с которыми работает воркер, находятся в той же бд, где захватывается блокировка (тогда блип коннекта не страшен).
TyVik
23.12.2019 10:03+1Прочитал статью и проглядел комменты, но так и не увидел упоминания PGQ. Почему бы его не взять? Более того, Алексей Лесовский в своём докладе «Топ ошибок со стороны разработки при работе с PostgreSQL» явно жалуется на самописные очереди.
Melkij
23.12.2019 10:45Ночь потому что, хорошие dba спят ;-)
Самописная реализация очереди привлекает разработчиков, «что тут делать, фигак фигак и в продакшен». А что случается с продакшеном даже всего под сотней запросов к очереди в секунду — узнают потом. И то бывает игнорируют. Ну и что что 99% времени база лопатит эту недоочередь, ну и что что любая транзакция свыше десятка минут кладёт проект — но работает же!
Да вот не просто. Хотите очередь в базе (а это действительно удобно из-за помещения события в очередь в той же самой транзакции, rollback транзакции и consumer не увидит событие как будто ничего и не было) — то вам нужен pgq. У него большие проблемы с документацией — но это та реализация очереди-в-базе которая под нагрузкой живёт нормально потому что учитывает как именно работает эта СУБД.
По крайней мере пока не появилась реализация table access method с UNDO. Или вовсе что-то ещё более специфичное именно под очереди. Появившиеся в pg12 api tableam это уже позволяют сделать.
IvanVakhrushev
23.12.2019 10:131.
create index task__status__idx on task (status);
Бессмысленный индекс в таком виде. Значение 2 (выполнена) из него нужно выкинуть сразу. В штатном режиме все задачи будут иметь финальный статус 2, их будет много, индекс не будет использоваться.
2.error_text text null
Лучше так не делать в production'е.
Люди начинают писать в поле длинные стек-трейсы, таблица тостируется, вы теряете в перфомансе.
Ограничивайте длину поля явно.
3. Что касается очереди с повторами «упавших» задач, то не увидел самого главного: как отличить упавший воркер от долго работающего воркера?
Если воркер упал во время работы, кто перезапустит таску на выполнение? Кто проставит ошибку?ggo
23.12.2019 10:301. По финальному статусу обычно не ищут. Для нефинальных — cardinality обычно хороший.
3. Это «беда» любых воркеров. Если он не сообщает свой статус, долгоиграющий не отличить от зависшего. Вариант — контролировать таймауты и принудительно прерывать, с соответствующими статусами.DmitryKoterov
23.12.2019 11:18В общем случае это называется heartbeats. Раз в N секунд воркер с номером W пытается отрепортить, что он еще живой. Если во время такого репорта ему говорят, что появился какой-то другой, более молодой, воркер с тем же номером W, то старый воркер совершает сепукку.
Правда, можно представить, какую нагрузку это все создает, когда воркеров много, и что все равно есть промежуток времени, когда оба воркера могут работать одновременно. Это-то как раз и можно решить advisory-блокировкой на значение W в основной бд, с которой работает воркер W. В этом случае сам коннект к бд выступает таким своеобразным каналом heartbeat-ов.
yakov-bakhmatov Автор
23.12.2019 12:583. Супервизор.
Супервизор запускает и поддерживает заданное количество работающих исполнителей.
Супервизор следит за исполнителями. При аварийном завершении исполнителя перезапускает задачу. При превышении заданного времени работы исполнитель убивается, задача перезапускается.
talbot
23.12.2019 17:25Реализация подобной очереди давно используется в Яндекс.Деньгах и выложена на Гитхабе: github.com/yandex-money-tech/db-queue
Её тестировали на нагрузке до 1000 транзакций (авторизаций) в секунду, и очередь вполне справляется.
gleb_l
Если вы проектируете очередь, то извольте либо сделать ее thread-safe, либо явным образом объявите, что, скажем, метод получения очередной задачи thread-safe, а отметки об ее завершении — thread-unsafe.
Без этого никогда нельзя делать вот так: , так как другой поток мог уже модифицировать статус, и оба потока отметят задачу дважды. Правильно делать так: и далее проверять по числу records affected, удалось это текущему потоку, или нет.
Та же ситуация с переходом задач на следующую стадию — оно все требует либо serializable транзакции поверх всех экзерсисов (что делает систему фактически однотредовой), либо просит явного указания на однопоточное использование. С повтором же возможна ситуация, когда долго работающая задача отмечает свое успешное выполнение уже после того, как внешний скрипт отметил задачу, как подлежащую повтору — и она будет выполнена дважды
Varim
gleb_l
Получит, естественно, единственная коннекция — поэтому в плане получения задачи все потокобезопасно — я и пишу в исходном комментарии, что метод получения задачи — thread-safe.
Однако, нигде нет утверждения, что а) задача *обрабатывается* в той же коннекции, в которой была получена, и б) что обработка задачи — однопоточная.
Допустим даже, что прикладной уровень, получив от бакенда задачу, распараллелил ее на несколько тредов, грамотно подождал их окончания, и уже в единственном треде вызвал SQL-код, отмечающий ее, как выполненную (это весьма реальный сценарий) — но отсутствие потокобезопасности между фоновым SQL-процессом, который перезапускает задачи и кодом, который их отмечает, как выполненные — это явный фейл. В этом случае *никаким* построением прикладного уровня проблему повторного перезапуска успешно выполненной на грани таймаута задачи не решить — соответственно, бакенд кривой.
Поэтому я утверждаю, что если данное решение очереди рекомендуется в качестве образоцовой реализации очереди средствами SQL-бакенда, в нем должно быть явно указано отсутствие потокобезопасности в перечисленных случаях — а именно: 1) завершение задачи, 2) продвижение задачи в следующую стадию, 3) перезапуск по таймауту (последнее — нерешаемо на уровне архитектуры приложения). Либо… все эти места доработаны так, чтобы быть thread-safe. Тогда этот код можно рекомендовать другим без оговорок частных случаев использования — а это уже гораздо лучше — при том, что доработки нужны минимальные.
PS. автору статьи — тактика «минус-в-коммент, минус-в-карму» — ассоциируется с цитатой из М. Жванецкого — «зачем спорить с хромым об искусстве, если можно сразу ему сказать, что он хромой» :)
Varim
Но вот же запрос который атомарно делает недоступной задачу для других конекшинов/потоков:
gleb_l
Он делает атомарным только *взятие* задачи из очереди — и я с этим не спорю, т.к. for update skip locked гарантирует, что ID одной и той же задачи получит строго один поток — остальные либо не получат ничего (если в очереди больше нет задач на взятие), либо получат ID следующих незалоченных невзятых задач. Взятие сделано грамотно и оно — потокобезопасно.
Я же говорю о дальнейших действиях — по ID возвращенной нам задачи слой приложения выбирает из нее payload, начинает с ним работать, и по завершению работы либо обновляет номер фазы, либо отмечает задачу как сделанную или зафейленную. И проблемы конкурентности возникают как раз здесь.
Например, при взятии многофазной задачи из очереди нужно атомарно возвращать не только task.id, но и номер фазы, и при продвижении на следующую фазу подставлять в условие update не только ее ID, но и полученный номер фазы + все условия, которые говорят о валидности задачи на момент ее продвижения (то есть то, что она не стоит в перезапуске, не снята по таймауту, не поставлена в статус «выполнена» или «зафейлена» итд) — и далее смотреть, сколько строк саффекчено этим update. Если 0 — значит, нас опередили, и тред нашего приложения должен выкинуть ошибку и выбросить результат своей работы в помойку. Вот тогда будет потокобезопасно.
pin2t
Нормально все у автора с реализацией захватов и обновлений состояний задач. Просто он рассматривает только СУБД часть реализации, но с ней все в порядке. Неважно в сколько потоков будет обрабатываться задача, когда закоммитится транзакция БД в которой было обновление статуса, оно произойдет. Ну важно только чтобы обработка задачи была в рамках одной транзакции БД, не делать коммит транзакции БД до завершения обработки задачи.
Вы подучите как работают транзакции в БД. У автора с этим все в порядке. И не надо там никаких дополнительных проверок статуса при апдейте.
Потокобезопасность это вопрос уровня приложения, и если в приложение неправильно работает с потоками, то
вам не поможет
gleb_l
по поводу where… status = 0 — это тоже очень наивное заблуждение, что просто «транзакции» нам здесь помогут. Помогут только serializable транзакции — а это сразу прощай параллельность работы очереди. Кроме того, почитайте мой коммент снизу, зачем нужны все эти ухищрения с атомарным взятием, сборщиком подвисших заданий, перезапуском итд
apapacy
Я конечно никогда не работал с этим, но судя по документации select for update skip locked должен работать с блокировками на уровне строки именно так как это и предполагает автор статьи.
pin2t
В БД совершенно нормальный код. Вопрос потокобезопасности приложения он не относится к рассматриваемогу автором коду в БД. Да он должен быть потокобезопасным. Но к коду в БД это не относится.
gleb_l
Еще как относится. Вот, например, такой код будет работать до тех пор, пока из двух коннекций одновременно не попытаются вставить одно и то же значение @SomeUniqueField:
Первый же студенческий возглас — так надо транзакцию! — все равно не поможет, т.к. оба потока прочитают из таблицы через shared lock, убедятся, что такой записи нет, и затем попытаются ее добавить. И кто-то один по-прежнему обломается.
Следующий шаг (уровень джуна) — давайте сделаем serializable-транзакцию. Она, действительно, помогает, но одновременно с этим ведет себя, как средневековая проказа — любой объект, до которого вы дотронулись под serializable-транзакцией, оказывается заразным (получает exclusive lock). Именно поэтому такой уровень изоляции помогает — другие потоки даже прочитать ничего не могут. Но serializable — это СИЛЬНЕЙШИЙ антибиотик, который лечит нашу болезнь, но одновременно напрочь лишает базу возможности параллельной работы (см выше).
А всего-то надо было написать:
и мы, используя встроеннум атомарность, решим задачу без внешних транзакций.
Если же нам надо узнать, наш ли именно поток фактически вставил запись в таблицу, или это был кто-то другой, а наш обломался, то сразу после стейтмента вставки достаточно проверить:
alekciy
Указанные проблемы конечно имеют место быть, но к указанной теме (неблокируемый забор задачи из очереди) они отношения не имеют.
gleb_l
Указанная тема не
— он-то как раз сделан правильно, а — в реализации которой есть несколько методов, часть из которых thread-unsafe. Я даю иллюстрацию к тому, что использовать встроенную атомарность можно и нужно для того, чтобы простыми средствами сделать эти методы *тоже* потокобезопасными. Ведь ни у кого не вызывает вопросов способ, которым взятие задания сделано потокобезопасным, хотя он сделан без всяких транзакций :)Тот факт, что в конкретном случае мой пример для PostgreSQL решается через ON CONFLICT, не имеет значения, т.к. суть метода в том, чтобы проверить входное условие перед модификацией записи, чтобы убедиться, что запись не была изменена конкурентно. Я пишу под MS SQL, там нет ON CONFLICT ;)
Методика сходна с проверкой на модификацию записи перед обновлением добавлением значения rowversion к значению ключа
в предикат условия выборки/обновления — что гарантирует отсуствие изменений в период между взятием записи и ее обновлением
pin2t
Во-первых, как правильно уже отметили, ваш пример никак не относится к задаче в статье.
Во-вторых, ваш пример правильно решается кодом
а не тем что вы наизобретали.
Ну и в-третьих, без транзакций не получится, БД работают только с транзакциями, это обязательная часть.
Идите и учите как работают базы данных, и не будет для вас открытий и странных изобретений ugly кода
kai3341
Краткое содержание:
Статья "как плохо решить задачу неподходящими инструментами"
Комментарий "Оно просто не работает как очередь задач (MQ), потому как не соответствует требованиям MQ"
Ответ: "а тот факт, что оно не выполняет возложенных задач, никак не относится к статье"
От себя добавлю, что выбор постгреса в качестве очереди задач (помимо того, что решение попросту неэффективно) крайне неудачен ввиду особенностей реализации MVCC: постгрес на каждый update создаёт новую запись. Режим работы — минимум 1 update для каждой записи.
Прекратите собирать троллейбус из буханки хлеба, и после с умным видом рассказывать о корректности использования ржаного хлеба. Умные люди уже придумали, как правильно решать такие задачи
Изюминка ситуации в том, что в моём текущем проекте заказчик настоял на использовании БД (постгрес) в качестве резервной очереди задач. И я реализовал нечто подобное.
Разница в том, что с самого начала я знал, что творю маразм
yakov-bakhmatov Автор
Работа инженера — выбрать подходящий инструмент. В некоторых случаях нет готового хорошего инструмента, приходится выбирать из плохих, как-то адаптировать и как-то с этим жить. Статья как раз о тех случаях, в которых MQ не удовлетворяет требованиям, и приходится делать самописную очередь.
В самом начале, как раз, указан пример требований (очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач) и отмечены ограничения применимости.
Вот давайте попробуем выполнить эти требования при помощи MQ. Давайте даже ослабим требования, исключив приоритет задач.
Возьмём MQ, которая умеет отложенную доставку сообщений. Например, ActiveMQ.
Шлём упавшие задачи в ту же очередь с заданной задержкой доставки. Всё ok, но однажды возникает вопрос: сколько в этой очереди новых задач и сколько повторных?
Тогда давайте разделим очередь на две: в одну пишем новые, в другую — упавшие с задержкой. Отлично. Как теперь балансировать нагрузку на воркеры? 50% на новые, 50% на упавшие? А если упавших мало, то 50% мощностей будут простаивать? Брать задачи из каждой очереди по очереди?
Допустим, как-то решили этот вопрос. Далее. Как узнать, какие основные причины, по которым задачи попадают в очередь упавших? Ну, наверное, надо логи погрепать…
Ладно. А как быть уверенным, что в очереди находятся действительно все упавшие задачи, и какая-то не потерялась? Ну, например, воркер может писать в какую-нибудь БД, когда он взял в работу задачу. И поискать в этой БД невыполненные задачи, которые давно не обновлялись (не брались в работу).
А зачем нам тогда очередь, если у нас уже есть таблица с задачами, их статусом и временем последней попытки выполнения?
По поводу update-ов в БД. Для одной задачи в лучшем случае выполняется 1 insert и 2 update (взять задачу, завершить задачу). Во время выполнения задачи в БД могут совершаться десятки и сотни запросов на вставку и изменение строк.
Эти программисты так любят что-то менять в БД, они что, не понимают, что при этом каждый раз создаётся новая запись?! Давайте им запретим.
Изюминка ситуации в том, что в одном из моих текущих проектов очень активно используется ActiveMQ. И её использование в некоторых случаях не соответствует эксплуатационным требованиям. И я с самого начала знал, что она не будет им соответствовать. И вот сейчас, когда самописная очередь прошла проверку на менее критичных задачах в менее критичных проектах, придёт, наконец, время заменить один не очень хороший инструмент на другой, тоже не очень хороший, но получше.
apapacy
Такие транзакции являются https://en.wikipedia.org/wiki/Long-lived_transaction и это скорее плохо чем хорошо. Такие транзакции очень часто используются в сообществе программистов Interbase/Firebird судя по информации с некоторых форумов.
Также такие транзакции нередко реализованы в некоторых ORM (например sequelize/node.js)
Вцелом, "хорошим", более обычным применением транзакций является их выполнение в одном запросе с клиента к серверу.
qw1
Но вы же согласились, что взятие задачи тут выполнено корректно и потокобезопасно. А это значит, никакой другой поток/клиент не может работать с этой задачей (в том числе, менять её статус или фазу), если следует соглашениям и взял задачу корректно.
gleb_l
Конечно взятие реализовано корректно. Но нигде не оговорено, что *обработка* задачи всегда должна идти в том же потоке, который осуществил взятие. Поэтому я и утверждаю, что в данном случае желательно делать все методы работы с очередью потокобезопасными уже на уровне БД, тем более, что а) это практически ничего не стоит, и б) очередь — довольно стандартный широкоиспользуемый компонент, и вы никогда не знаете, насколько корректно код приложения будет работать с вами.
alekciy
Если вы в приложении взяли задачу одним потоком, а обрабатывайте его в другом, то это не проблема РСУБД и не зона его ответсвенности, т. к. ни чего она про ваше приложение не знает и знать не должна. Да и в принципе не может. На уровне соединений при конкурентной доступе она безопасность обеспечивает.
qw1
А этого не будет, если только программист специально не напишет такой сценарий: взяли элемент, стартовали два потока, которые поставят этому элементу признак «завершён».
Но зачем такое писать в здравом уме? Можно просто написать «delete from tablename» без «where», и сказать «ваша система небезопасна, раз позволяет такое»?
yakov-bakhmatov Автор
Очень дельное замечание (как и другие ваши комментарии в ветке), спасибо.
Я не утверждаю эталонность подхода. Считаю, что нельзя просто копировать код из этих ваших интернетов в свой проект. Всё нужно обдумывать, критически оценивать и адаптировать к своим условиям.
При подготовке статьи код берётся из работающей системы, обобщается и упрощается. Здесь легко потерять некоторые нюансы или не донести контекст.
Например, отмеченные вами проблемы завершения задач в контексте процессов моей системы проблемами не являются, поскольку
1) перезапуск уже завершённой задачи допустим и к катастрофе не приведёт;
2) за исполнителями наблюдает супервизор и принудительно завершает их по таймауту, перезапуская взятые исполнителями задачи. Пример перезапуска задач в статье нужен для ситуации, когда аварийно завершился супервизор (и при этом были убиты все подчинённые ему исполнители).
gleb_l
Я искренне рад, что вы понимаете критичные для построения очередей в БД моменты (и вообще, критичные для эффективно работы с БД принципы), и упрощения в конкретной реализации очереди являются не результатом технического просчета, а частным минимально-необходимым решением задачи.
Count_s
Поведение нормальной СУБД от потоков клиента не зависит (в общем то она и не знает есть ли они), для изоляции есть a — транзакции и b — явные блокировки, что и описано в статье.