Очередь слонов - pixabay.com


Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.


Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.


Ограничения применимости


Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.


Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.


Суть метода в пяти словах


select ... for update skip locked

Базовая очередь


Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.


Таблица для простейшей очереди содержит саму задачу и её статус:


create table task
(
    id          bigint not null primary key,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__idx on task (status);

Добавление задачи:


insert into task (id) values ($1) on conflict (id) do nothing;

Получение следующей задачи:


with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи:


update task
set
    status = 2
where id = $1;

Очередь с приоритетами


В простом случае id задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id).


Либо для приоритета добавляется отдельный столбец:


create table task
(
    id          bigint not null primary key,
    priority    integer not null,
    status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена
);

create index task__status__priority__idx on task (status, priority);

Добавление задачи:
insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;

Получение следующей задачи:
with next_task as (
    select id from task
    where status = 0
    order by priority
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Выделенный столбец позволяет менять приоритет задачи "на лету".


Очередь с повтором "упавших" задач


В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.


create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    delayed_to  timestamp null,
    error_text  text null
);

create index task__status__delayed_to__idx on task (status, delayed_to);

Как видно, расширился список статусов и добавились новые столбцы:


  • attempt — номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на 10 * attempt минут);
  • delayed_to — время следующей попытки выполнения задачи;
  • error_text — текст ошибки.

Текст ошибки нужен для группировки по типам ошибки.


Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом "ошибка". Выполняем запрос:


select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;

За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.


Получение следующей новой задачи:
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:
with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    delayed_to = null,
    error_text = null
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:
update task
set
    status = 2,
    delayed_to = null,
    error_text = null
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:
update task
set
    status = 3,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:
update task
set
    status = 4,
    delayed_to = null,
    error_text = $2
where id = $1;

Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or может очень плохо сочетаться с сортировкой order by.


Сбор метрик


Добавляем такие атрибуты:


  • время создания задачи;
  • время изменения задачи;
  • время начала и завершения выполнения задачи.

create table task
(
    id          bigint not null primary key,
    status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    attempt     integer not null default 0,
    begin_time  timestamp null,
    end_time    timestamp null,
    delayed_to  timestamp null,
    error_text  text null,
    created     timestamp not null default localtimestamp,
    updated     timestamp not null default localtimestamp
);

create index task__status__delayed_to__idx on task (status, delayed_to);
create index task__updated__idx on task (updated);

Учитываем добавленные столбцы во всех запросах.


Получение следующей новой задачи:
with next_task as (
    select id from task
    where status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Получение следующей отложенной из-за ошибки задачи:
with next_task as (
    select id from task
    where status = 3
      and delayed_to < localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1,
    attempt = attempt + 1,
    begin_time = localtimestamp,
    end_time = null,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
from next_task
where task.id = next_task.id
returning task.id;

Успешное завершение задачи:
update task
set
    status = 2,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:
update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp + make_interval(mins => 5 * attempt),
    error_text = $2,
    updated = localtimestamp
where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:
update task
set
    status = 4,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = $2,
    updated = localtimestamp
where id = $1;

Примеры, для чего это может быть нужно


Поиск и перезапуск повисших задач:


update task
set
    status = 3,
    end_time = localtimestamp,
    delayed_to = localtimestamp,
    error_text = 'hanged',
    updated = localtimestamp
where status = 1
  and updated < localtimestamp - interval '1 hour';

Удаление старых задач:


delete from task
where updated < localtimestamp - interval '30 days';

Статистика по выполнению задач:


select
    date_trunc('hour', end_time),
    count(*),
    sum(end_time - begin_time),
    avg(end_time - begin_time)
from task
where status = 2
  and end_time >= '2019-12-16'
group by 1
order by 1;

Повторный запуск ранее выполненных задач


Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.


create table task
(
    id              bigint not null primary key,
    task_updated_at timestamp not null default localtimstamp,
    status          integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)
    begin_time      timestamp null,
    end_time        timestamp null,
    delayed_to      timestamp null,
    error_text      text null,
    created         timestamp not null default localtimestamp,
    updated         timestamp not null default localtimestamp
);

Здесь для времени обновления задачи добавлен столбец task_updated_at, но можно было бы использовать поле created.


Добавление или обновление (перезапуск) задачи:


insert into task (id, task_updated_at) values ($1, $2)
on conflict (id) do update
set
    task_updated_at = excluded.task_updated_at,
    status = case when status = 1 then 1 else 0 end,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where task_updated_at < excluded.task_updated_at;

Что здесь происходит. Задача становится "новой", если она сейчас не исполняется.


В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.


Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.


Успешное завершение задачи:


update task
set
    status = case when begin_time >= updated then 2 else 0 end,
    end_time = localtimestamp,
    delayed_to = null,
    error_text = null,
    updated = localtimestamp
where id = $1;

Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус "новая".


Pipeline


Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.


Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.


create table task
(
    id      bigint not null primary key,
    stage   integer not null default 0,
    status  integer not null default 0
);

create index task__stage__status__idx on task (stage, status);

Получение следующей задачи на заданной стадии:


with next_task as (
    select id from task
    where stage = $1
      and status = 0
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи с переходом на указанную стадию:


update task
set
    stage = $2,
    status = 2
where id = $1;

Или переход на следующую по порядку стадию:


update task
set
    stage = stage + 1,
    status = 2
where id = $1;

Задачи по расписанию


Это вариация очереди с повтором.


У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).


create table task
(
    id              bigint not null primary key,
    period          integer not null,               -- периодичность запуска в секундах
    status          integer not null default 0,     -- 0 - новая, 1 - в работе
    next_run_time   timestamp not null default localtimestamp
);

create index task__status__next_run_time__idx on task (status, next_run_time);

Добавление задачи:


insert into task (id, period, next_run_time) values ($1, $2, $3);

Получение следующей задачи:


with next_task as (
    select id from task
    where status = 0
      and next_run_time <= localtimestamp
    limit 1
    for update skip locked
)
update task
set
    status = 1
from next_task
where task.id = next_task.id
returning task.id;

Завершение задачи и планирование следующего запуска:


update task
set
    status = 0,
    next_run_time = next_run_time + make_interval(secs => period)
where id = $1

Вместо заключения


В создании специализированной очереди задач средствами РСУБД нет ничего сложного.


"Самопальная" очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.


Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.

Комментарии (55)


  1. gleb_l
    23.12.2019 00:41
    -1

    В создании специализированной очереди задач средствами РСУБД нет ничего сложного
    … кроме возможных проблем с конкурентным доступом и модификацией данных.

    Если вы проектируете очередь, то извольте либо сделать ее thread-safe, либо явным образом объявите, что, скажем, метод получения очередной задачи thread-safe, а отметки об ее завершении — thread-unsafe.
    Без этого никогда нельзя делать вот так:
    update X set status = 1 where id = @id
    , так как другой поток мог уже модифицировать статус, и оба потока отметят задачу дважды. Правильно делать так:
    update X set status = 1 where id = @id and status = 0
    и далее проверять по числу records affected, удалось это текущему потоку, или нет.

    Та же ситуация с переходом задач на следующую стадию — оно все требует либо serializable транзакции поверх всех экзерсисов (что делает систему фактически однотредовой), либо просит явного указания на однопоточное использование. С повтором же возможна ситуация, когда долго работающая задача отмечает свое успешное выполнение уже после того, как внешний скрипт отметил задачу, как подлежащую повтору — и она будет выполнена дважды


    1. Varim
      23.12.2019 01:36

      select… for update skip locked
      Из за этого запроса не может быть ситуации когда задачу получат разные потоки или я не прав?


      1. gleb_l
        23.12.2019 02:21

        Получит, естественно, единственная коннекция — поэтому в плане получения задачи все потокобезопасно — я и пишу в исходном комментарии, что метод получения задачи — thread-safe.
        Однако, нигде нет утверждения, что а) задача *обрабатывается* в той же коннекции, в которой была получена, и б) что обработка задачи — однопоточная.
        Допустим даже, что прикладной уровень, получив от бакенда задачу, распараллелил ее на несколько тредов, грамотно подождал их окончания, и уже в единственном треде вызвал SQL-код, отмечающий ее, как выполненную (это весьма реальный сценарий) — но отсутствие потокобезопасности между фоновым SQL-процессом, который перезапускает задачи и кодом, который их отмечает, как выполненные — это явный фейл. В этом случае *никаким* построением прикладного уровня проблему повторного перезапуска успешно выполненной на грани таймаута задачи не решить — соответственно, бакенд кривой.
        Поэтому я утверждаю, что если данное решение очереди рекомендуется в качестве образоцовой реализации очереди средствами SQL-бакенда, в нем должно быть явно указано отсутствие потокобезопасности в перечисленных случаях — а именно: 1) завершение задачи, 2) продвижение задачи в следующую стадию, 3) перезапуск по таймауту (последнее — нерешаемо на уровне архитектуры приложения). Либо… все эти места доработаны так, чтобы быть thread-safe. Тогда этот код можно рекомендовать другим без оговорок частных случаев использования — а это уже гораздо лучше — при том, что доработки нужны минимальные.

        PS. автору статьи — тактика «минус-в-коммент, минус-в-карму» — ассоциируется с цитатой из М. Жванецкого — «зачем спорить с хромым об искусстве, если можно сразу ему сказать, что он хромой» :)


        1. Varim
          23.12.2019 03:03

          Но вот же запрос который атомарно делает недоступной задачу для других конекшинов/потоков:

          with next_task as (
              select id from task
              where status = 0
              limit 1
              for update skip locked
          )
          update task
          set
              status = 1
          from next_task
          where task.id = next_task.id
          returning task.id;


          1. gleb_l
            23.12.2019 03:10

            Он делает атомарным только *взятие* задачи из очереди — и я с этим не спорю, т.к. for update skip locked гарантирует, что ID одной и той же задачи получит строго один поток — остальные либо не получат ничего (если в очереди больше нет задач на взятие), либо получат ID следующих незалоченных невзятых задач. Взятие сделано грамотно и оно — потокобезопасно.

            Я же говорю о дальнейших действиях — по ID возвращенной нам задачи слой приложения выбирает из нее payload, начинает с ним работать, и по завершению работы либо обновляет номер фазы, либо отмечает задачу как сделанную или зафейленную. И проблемы конкурентности возникают как раз здесь.

            Например, при взятии многофазной задачи из очереди нужно атомарно возвращать не только task.id, но и номер фазы, и при продвижении на следующую фазу подставлять в условие update не только ее ID, но и полученный номер фазы + все условия, которые говорят о валидности задачи на момент ее продвижения (то есть то, что она не стоит в перезапуске, не снята по таймауту, не поставлена в статус «выполнена» или «зафейлена» итд) — и далее смотреть, сколько строк саффекчено этим update. Если 0 — значит, нас опередили, и тред нашего приложения должен выкинуть ошибку и выбросить результат своей работы в помойку. Вот тогда будет потокобезопасно.


            1. pin2t
              23.12.2019 09:00

              Нормально все у автора с реализацией захватов и обновлений состояний задач. Просто он рассматривает только СУБД часть реализации, но с ней все в порядке. Неважно в сколько потоков будет обрабатываться задача, когда закоммитится транзакция БД в которой было обновление статуса, оно произойдет. Ну важно только чтобы обработка задачи была в рамках одной транзакции БД, не делать коммит транзакции БД до завершения обработки задачи.

              Вы подучите как работают транзакции в БД. У автора с этим все в порядке. И не надо там никаких дополнительных проверок статуса при апдейте.

              Потокобезопасность это вопрос уровня приложения, и если в приложение неправильно работает с потоками, то

              where ... status = 0

              вам не поможет


              1. gleb_l
                23.12.2019 11:01

                Потокобезопасность это вопрос уровня приложения,
                — это пять! Кривой код в БД, который потенциально способен привести к тому, что одна и та же задача может быть выполнена дважды, отмечена и как ошибочная, и как успешная, а для многофазных заданий — еще и проскочить несколько фаз — это вопрос уровня приложения?! Я построил новый дом, но вы заходите туда строго по-одному, а то у него пол провалится :)

                по поводу where… status = 0 — это тоже очень наивное заблуждение, что просто «транзакции» нам здесь помогут. Помогут только serializable транзакции — а это сразу прощай параллельность работы очереди. Кроме того, почитайте мой коммент снизу, зачем нужны все эти ухищрения с атомарным взятием, сборщиком подвисших заданий, перезапуском итд


                1. apapacy
                  23.12.2019 13:30
                  +1

                  Я конечно никогда не работал с этим, но судя по документации select for update skip locked должен работать с блокировками на уровне строки именно так как это и предполагает автор статьи.


                1. pin2t
                  23.12.2019 15:44
                  +1

                  Кривой код в БД

                  В БД совершенно нормальный код. Вопрос потокобезопасности приложения он не относится к рассматриваемогу автором коду в БД. Да он должен быть потокобезопасным. Но к коду в БД это не относится.


                  1. gleb_l
                    23.12.2019 18:02
                    +1

                    Еще как относится. Вот, например, такой код будет работать до тех пор, пока из двух коннекций одновременно не попытаются вставить одно и то же значение @SomeUniqueField:

                    IF NOT EXISTS(
                       SELECT 1 FROM SomeTable
                       WHERE SomeUniqueField = @SomeUniqueField
                     )
                    	INSERT INTO SomeTable (SomeUniqueField)
                    


                    Первый же студенческий возглас — так надо транзакцию! — все равно не поможет, т.к. оба потока прочитают из таблицы через shared lock, убедятся, что такой записи нет, и затем попытаются ее добавить. И кто-то один по-прежнему обломается.

                    Следующий шаг (уровень джуна) — давайте сделаем serializable-транзакцию. Она, действительно, помогает, но одновременно с этим ведет себя, как средневековая проказа — любой объект, до которого вы дотронулись под serializable-транзакцией, оказывается заразным (получает exclusive lock). Именно поэтому такой уровень изоляции помогает — другие потоки даже прочитать ничего не могут. Но serializable — это СИЛЬНЕЙШИЙ антибиотик, который лечит нашу болезнь, но одновременно напрочь лишает базу возможности параллельной работы (см выше).

                    А всего-то надо было написать:
                    	INSERT INTO SomeTable (SomeUniqueField)
                    	SELECT Q.SF
                    	FROM (
                    		SELECT @SomeUniqueField AS SF
                    	) Q
                    	WHERE NOT EXISTS(
                    		SELECT 1 FROM SomeTable S
                    		WHERE S.SomeUniqueField = Q.SF
                    	)
                    


                    и мы, используя встроеннум атомарность, решим задачу без внешних транзакций.
                    Если же нам надо узнать, наш ли именно поток фактически вставил запись в таблицу, или это был кто-то другой, а наш обломался, то сразу после стейтмента вставки достаточно проверить:
                    	IF @@ROWCOUNT > 0 ...		-- наш
                    
                    	ELSE ...			-- чужой


                    1. alekciy
                      23.12.2019 18:15
                      +1

                      Указанные проблемы конечно имеют место быть, но к указанной теме (неблокируемый забор задачи из очереди) они отношения не имеют.


                      1. gleb_l
                        23.12.2019 20:25
                        -1

                        Указанная тема не

                        неблокируемый забор задачи из очереди
                        — он-то как раз сделан правильно, а
                        Очередь задач в PostgreSQL
                        — в реализации которой есть несколько методов, часть из которых thread-unsafe. Я даю иллюстрацию к тому, что использовать встроенную атомарность можно и нужно для того, чтобы простыми средствами сделать эти методы *тоже* потокобезопасными. Ведь ни у кого не вызывает вопросов способ, которым взятие задания сделано потокобезопасным, хотя он сделан без всяких транзакций :)

                        Тот факт, что в конкретном случае мой пример для PostgreSQL решается через ON CONFLICT, не имеет значения, т.к. суть метода в том, чтобы проверить входное условие перед модификацией записи, чтобы убедиться, что запись не была изменена конкурентно. Я пишу под MS SQL, там нет ON CONFLICT ;)

                        Методика сходна с проверкой на модификацию записи перед обновлением добавлением значения rowversion к значению ключа
                        в предикат условия выборки/обновления — что гарантирует отсуствие изменений в период между взятием записи и ее обновлением


                    1. pin2t
                      23.12.2019 19:06
                      +2

                      Во-первых, как правильно уже отметили, ваш пример никак не относится к задаче в статье.
                      Во-вторых, ваш пример правильно решается кодом

                      INSERT INTO SomeTable (SomeUniqueField) VALUES (@SomeUniqueField)
                       ON CONFLICT (SomeUniqueField) DO NOTHING;

                      а не тем что вы наизобретали.
                      Ну и в-третьих, без транзакций не получится, БД работают только с транзакциями, это обязательная часть.

                      Идите и учите как работают базы данных, и не будет для вас открытий и странных изобретений ugly кода


                      1. kai3341
                        24.12.2019 02:17

                        Краткое содержание:
                        Статья "как плохо решить задачу неподходящими инструментами"
                        Комментарий "Оно просто не работает как очередь задач (MQ), потому как не соответствует требованиям MQ"
                        Ответ: "а тот факт, что оно не выполняет возложенных задач, никак не относится к статье"


                        От себя добавлю, что выбор постгреса в качестве очереди задач (помимо того, что решение попросту неэффективно) крайне неудачен ввиду особенностей реализации MVCC: постгрес на каждый update создаёт новую запись. Режим работы — минимум 1 update для каждой записи.


                        Прекратите собирать троллейбус из буханки хлеба, и после с умным видом рассказывать о корректности использования ржаного хлеба. Умные люди уже придумали, как правильно решать такие задачи


                        Изюминка ситуации в том, что в моём текущем проекте заказчик настоял на использовании БД (постгрес) в качестве резервной очереди задач. И я реализовал нечто подобное.
                        Разница в том, что с самого начала я знал, что творю маразм


                        1. yakov-bakhmatov Автор
                          26.12.2019 11:13
                          +2

                          Работа инженера — выбрать подходящий инструмент. В некоторых случаях нет готового хорошего инструмента, приходится выбирать из плохих, как-то адаптировать и как-то с этим жить. Статья как раз о тех случаях, в которых MQ не удовлетворяет требованиям, и приходится делать самописную очередь.

                          В самом начале, как раз, указан пример требований (очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач) и отмечены ограничения применимости.

                          Вот давайте попробуем выполнить эти требования при помощи MQ. Давайте даже ослабим требования, исключив приоритет задач.

                          Возьмём MQ, которая умеет отложенную доставку сообщений. Например, ActiveMQ.

                          Шлём упавшие задачи в ту же очередь с заданной задержкой доставки. Всё ok, но однажды возникает вопрос: сколько в этой очереди новых задач и сколько повторных?

                          Тогда давайте разделим очередь на две: в одну пишем новые, в другую — упавшие с задержкой. Отлично. Как теперь балансировать нагрузку на воркеры? 50% на новые, 50% на упавшие? А если упавших мало, то 50% мощностей будут простаивать? Брать задачи из каждой очереди по очереди?

                          Допустим, как-то решили этот вопрос. Далее. Как узнать, какие основные причины, по которым задачи попадают в очередь упавших? Ну, наверное, надо логи погрепать…

                          Ладно. А как быть уверенным, что в очереди находятся действительно все упавшие задачи, и какая-то не потерялась? Ну, например, воркер может писать в какую-нибудь БД, когда он взял в работу задачу. И поискать в этой БД невыполненные задачи, которые давно не обновлялись (не брались в работу).

                          А зачем нам тогда очередь, если у нас уже есть таблица с задачами, их статусом и временем последней попытки выполнения?

                          По поводу update-ов в БД. Для одной задачи в лучшем случае выполняется 1 insert и 2 update (взять задачу, завершить задачу). Во время выполнения задачи в БД могут совершаться десятки и сотни запросов на вставку и изменение строк.

                          Эти программисты так любят что-то менять в БД, они что, не понимают, что при этом каждый раз создаётся новая запись?! Давайте им запретим.

                          Изюминка ситуации в том, что в одном из моих текущих проектов очень активно используется ActiveMQ. И её использование в некоторых случаях не соответствует эксплуатационным требованиям. И я с самого начала знал, что она не будет им соответствовать. И вот сейчас, когда самописная очередь прошла проверку на менее критичных задачах в менее критичных проектах, придёт, наконец, время заменить один не очень хороший инструмент на другой, тоже не очень хороший, но получше.


              1. apapacy
                23.12.2019 13:28

                , не делать коммит транзакции БД до завершения обработки задачи.

                Такие транзакции являются https://en.wikipedia.org/wiki/Long-lived_transaction и это скорее плохо чем хорошо. Такие транзакции очень часто используются в сообществе программистов Interbase/Firebird судя по информации с некоторых форумов.


                Также такие транзакции нередко реализованы в некоторых ORM (например sequelize/node.js)


                Вцелом, "хорошим", более обычным применением транзакций является их выполнение в одном запросе с клиента к серверу.


            1. qw1
              23.12.2019 19:12
              +1

              Например, при взятии многофазной задачи из очереди нужно атомарно возвращать не только task.id, но и номер фазы,
              id задачи — это первичный ключ, поэтому не может существовать задачи с тем же id, но другой фазой, поэтому возвращать фазу нет необходимости.
              и при продвижении на следующую фазу подставлять в условие update не только ее ID, но и полученный номер фазы + все условия, которые говорят о валидности задачи на момент ее продвижения (то есть то, что она не стоит в перезапуске, не снята по таймауту, не поставлена в статус «выполнена» или «зафейлена» итд) — и далее смотреть, сколько строк саффекчено этим update. Если 0 — значит, нас опередили
              Но вы же согласились, что взятие задачи тут выполнено корректно и потокобезопасно. А это значит, никакой другой поток/клиент не может работать с этой задачей (в том числе, менять её статус или фазу), если следует соглашениям и взял задачу корректно.


              1. gleb_l
                23.12.2019 20:12
                -2

                Конечно взятие реализовано корректно. Но нигде не оговорено, что *обработка* задачи всегда должна идти в том же потоке, который осуществил взятие. Поэтому я и утверждаю, что в данном случае желательно делать все методы работы с очередью потокобезопасными уже на уровне БД, тем более, что а) это практически ничего не стоит, и б) очередь — довольно стандартный широкоиспользуемый компонент, и вы никогда не знаете, насколько корректно код приложения будет работать с вами.


                1. alekciy
                  23.12.2019 22:06

                  Если вы в приложении взяли задачу одним потоком, а обрабатывайте его в другом, то это не проблема РСУБД и не зона его ответсвенности, т. к. ни чего она про ваше приложение не знает и знать не должна. Да и в принципе не может. На уровне соединений при конкурентной доступе она безопасность обеспечивает.


                1. qw1
                  23.12.2019 23:51
                  +1

                  нигде не оговорено, что *обработка* задачи всегда должна идти в том же потоке, который осуществил взятие
                  Проблема возникает не тогда, когда обработка выполнится не в том потоке, который осуществил взятие (это как раз нормально и проблем не вызовет), а когда два потока пытаются обновить одновременно одну строку таблицы.

                  А этого не будет, если только программист специально не напишет такой сценарий: взяли элемент, стартовали два потока, которые поставят этому элементу признак «завершён».

                  Но зачем такое писать в здравом уме? Можно просто написать «delete from tablename» без «where», и сказать «ваша система небезопасна, раз позволяет такое»?


    1. yakov-bakhmatov Автор
      23.12.2019 12:48

      Очень дельное замечание (как и другие ваши комментарии в ветке), спасибо.

      Я не утверждаю эталонность подхода. Считаю, что нельзя просто копировать код из этих ваших интернетов в свой проект. Всё нужно обдумывать, критически оценивать и адаптировать к своим условиям.

      При подготовке статьи код берётся из работающей системы, обобщается и упрощается. Здесь легко потерять некоторые нюансы или не донести контекст.

      Например, отмеченные вами проблемы завершения задач в контексте процессов моей системы проблемами не являются, поскольку

      1) перезапуск уже завершённой задачи допустим и к катастрофе не приведёт;

      2) за исполнителями наблюдает супервизор и принудительно завершает их по таймауту, перезапуская взятые исполнителями задачи. Пример перезапуска задач в статье нужен для ситуации, когда аварийно завершился супервизор (и при этом были убиты все подчинённые ему исполнители).


      1. gleb_l
        23.12.2019 15:52

        Я искренне рад, что вы понимаете критичные для построения очередей в БД моменты (и вообще, критичные для эффективно работы с БД принципы), и упрощения в конкретной реализации очереди являются не результатом технического просчета, а частным минимально-необходимым решением задачи.


    1. Count_s
      23.12.2019 13:01

      Поведение нормальной СУБД от потоков клиента не зависит (в общем то она и не знает есть ли они), для изоляции есть a — транзакции и b — явные блокировки, что и описано в статье.


  1. BugM
    23.12.2019 02:35

    Флап соединения с БД и приехали. Соседний воркер берет ту же задачу и начинает выполнять.
    Нужно добавлять приличное количество костылей для защиты от таких ситуаций.


    1. Varim
      23.12.2019 03:08

      не знаю что такое «флап», но что по вашему делает следующий запрос?

      with next_task as (
          select id from task
          where status = 0
          limit 1
          for update skip locked
      )
      update task
      set
          status = 1
      from next_task
      where task.id = next_task.id
      returning task.id;

      Если поток упал, то задача так и будет в статусе «в работе».


      1. BugM
        23.12.2019 10:20

        С такой реализацией при попадании соединения или при обычном падении воркера таск залипнет. И костыли нужны для разруливания этой ситуации.


        А в ситуации select for update и ничего не апдейтим пока таск выполняется будут дубли при пропадании соединения. И тоже костыли нужны.


        Проблема всех простых очередей именно в таких ситуациях. Разруливание всех ситуаций когда что-то идет не так.


        1. Varim
          23.12.2019 15:51

          будут дубли при пропадании соединения
          А что значит при пропадании соединения? Ведь если не закоммитим и пропадет соединение то БД просто сделает ролбек, а воркер не сможет сделать коммит или я в чем то неправ?


          1. BugM
            23.12.2019 16:00

            Таск перевели в статус запущен. Закомитили. А в комплит перевести не смогли. Ошибка сети, ошибка БД, ошибка воркера, любая другая ошибка. Теперь статус таска неизвестен (вечный работает) и что с ним делать непонятно.


            Если не комитить а держать открытую транщакцию, то при падении сессии в БД и соотвественно транзакции другой воркер возьмет этот таск и будет выполнять. Что при это будет делать первый воркер непонятно.


            1. Varim
              23.12.2019 16:07

              Так нет идеальных очередей, например с RabbitMQ тоже есть нюансы.


            1. alekciy
              23.12.2019 17:25

              Если все происходит в рамках одного сервера, то можно в базу писать PID процесса и по нему отлавливать грохнувшийся задачи и их перезапускать. Главно при реализации не забыть, что PID является переиспользуемым и чисто проверка на PID может вызывать проблемы.


      1. apapacy
        23.12.2019 13:35

        Обычно для задач устанавливается и таймаут для отслеживания аварийно завершившихся тасков (которые не смогли перевести себя в статус=3 в статусах автора статьи). При этом начинаются проблемы когда таск может быть равно как и долгоиграющим так и зависшим.


        1. BugM
          23.12.2019 16:01

          И по закону Мерфи такие проблемы будут точно.


    1. pin2t
      23.12.2019 09:11

      Не нужно никаких костылей. Нужно просто выучить как работают транзакции в СУБД. Все там нормально будет, соседний воркер не получит ту же задачу


      1. gleb_l
        23.12.2019 10:42

        Нужно просто понимать, что данный набор методов *не может* идти под сквозной транзакцией от получения задачи до отметки о ее выполнении — иначе все механизмы, связанные с очередью, станут однозадачными.
        Вся идея такой реализации очереди — в том, что используются не внешние транзакции, а встроенная атомарность SQL-движка, и в результате реализуется механизм двухфазных транзакций:
        1) задание атомарно достается из очереди и отмечается, как взятое без всяких внешних транзакций
        2) в течение времени, не превышающем время таймаута поток-обработчик задания его выполняет. Обработчик может использовать транзакции, а может и нет — это зависит от его логики обработки. Главное, чтобы действия, связанные с манипуляциями с очередью, либо не использовали внешних транзакций, либо совершались в транзакции с минимальным количеством затронутых объектов БД
        3) поток-обработчик атомарно отмечает задание, как выполненное. Здесь тоже должна использоваться встроенная атомарность, чтобы не было необходимости во внешней транзакции. Как-то так


  1. apapacy
    23.12.2019 03:01

    Я недавно создал пост https://habr.com/ru/post/458608/ о нескольких реализациях job queue.
    Вцелом хорошо что Вы рассматриваете разные типы задач, в том числе и задачи выполняемые по расписанию. А также хорошо что предусматриваете повторное выполнение задач в случае ошибки.


    По опыту тестирования некоторых готовых библиотек, в частности agenda, которая основана также на базе данных (mongodb) могу сказать что решение с тяжелой базой данных существенно медленнее чем решение основанные например на redis. Но даже не это главное.Та же agenda, которая по идее должна работать медленно но стабильно в реальности после какого-то предела просто останавливается и перестает выбирать задачи из очереди. Я это говорю к тому, что в боевых условиях умозрительные построения могут начать работать не так как задумывал их автор.


    Так же у очередей заданий желательно иметь масштабирующий и "сглаживающий" функционал, то есть возможность запускать несколько воркеров на выполнение одной задачи. А также на сглаживание "пиков" задерживая выполнение очередной задачи пока не завершаться текущие задачи.


  1. resetme
    23.12.2019 05:54

    Проблема в том что постоянно надо пуллить и смотреть появились ли задачи в очереди. Вот бы как-нибудь сделать так чтобы запускать задачу только при наличии задачи в очереди без постоянного опроса ее. Такое можно как-нибудь сделать?


    1. DmitryKoterov
      23.12.2019 07:11

      Поллинг обычно не проблема, если задач в очереди много. Т.е. за время обработки предыдущих выбранных задач данным воркером накапливаются новые, которые он счастливо и поллит.


    1. edo1h
      23.12.2019 12:37

      мой комментарий прямо под вашим )


  1. edo1h
    23.12.2019 06:04

    у постгреса есть механизм LISTEN/NOTIFY, очень удобен для малоактивных очередей (обработчик не должен постоянно опрашивать сервер в ожидании задания).


    create index task__status__priority__idx on task (status, priority);

    иногда бывает полезно делать вот так:


    create index task__status0__priority__idx on task (priority) where status=0;


    1. alekciy
      23.12.2019 08:53

      Есть, но как PG запустит приложение?


      1. edo1h
        23.12.2019 12:41

        речь не про это.


        вот у вас есть воркер, которые ждёт задачу. а очередь пуста. что остаётся делать воркеру? периодически слать select bla-bla-bla в ожидании задания.
        будешь слать часто — ненужная нагрузка на сервер. редко — время реакции на новое задание неприемлемо высоко.


        LISTEN/NOTIFY как раз и решает эту проблему — воркер делает LISTEN и ждёт.


      1. resetme
        23.12.2019 17:52

        А есть ли вариант послать NOTIFY из триггера на вставку строки и насколько это рабочий вариант?


        1. alekciy
          23.12.2019 18:09

          Нормальный это вариант. Работает. Но только для закомиченых транзакций (т.е. не сработает пока не будет COMMIT). Если слушателя при этом нет (отвалился), то ошибки не будет, но событие потеряется, т.е. PG в себе их не хранит.


  1. DmitryKoterov
    23.12.2019 07:15

    Вы, кстати, зря про то, что этот способ плохо масштабируется. Нормально он масштабируется — вернее, упирается только в одновременное кол-во коннектов к бд (которых можно поставить и 5000 штук для таких простых запросов). Выгребать очередь в 5000 воркеров — это очень и очень неплохо.

    Громадный плюс данного подхода (вернее, не в точности его, а когда блокировка строки не отпускается воркером при захвате задачи — так тоже можно сделать) в том, что при смерти процесса-воркера (kill -9) его коннекшены автоматом закрываются, и все блокировки отпускаются. Т.е. у данного решения нет проблемы, когда одну и ту же задачу могут взять два воркера — а значит, не нужно никакого геморроя с heartbeat-ами и т.д. (Естественно, при условии, что воркер существенно использует тот же самый коннекшн в своей работе.)

    И еще одно. Можно же иметь не одну таблицу, а 10. Хоть бы и партицированную. И натравливать N воркеров не на одну таблицу, а на 10. Это практически линейно все отмасштабирует (потому что для субд все эти запросы очень просты, она их как орешки щелкает, они все попадают в индекс).


    1. DmitryKoterov
      23.12.2019 07:25

      Да, еще вот https://cadenceworkflow.io — на нем весь Uber держится, и там очереди именно так реализованы (правда, на MySQL, но сути не меняет).


    1. IvanVakhrushev
      23.12.2019 10:24

      Извините, но когда вы пишете про 5000 коннектов к БД, вы понимаете, как PG устроен под капотом?
      Держать блокировку воркером на протяжении всего времени выполнения задачи — это совсем не масштабируемое решение. Для использования в продакшене оно мало подходит.
      А если отпускать блокировку после захвата (как правильно делать), то нужно очень аккуратно реализовывать перезапуск\смену статуса у тасок, что, собственно, обсуждается в треде выше.


      1. DmitryKoterov
        23.12.2019 11:12
        +1

        5000 коннектов — это, например, дефолтный лимит в AWS Aurora Postgres. 5000 практически ничего не делающих процессов — не так и много: если каждый съест по 10М, то получится 50Г в памяти — по современным меркам приемлемо. (Но, опять же, 5000 — очень большая цифра, если блокировки отпускать, то коннектов потребуется на 1-2 порядка меньше.)


        Насчет перезапуска тасков — это очень тонкий момент, нужно 10 раз подумать, действительно ли так уж много заданий и нельзя обойтись долгими сессионными блокировками (pg_try_advisory_lock). Работает воркер — захватил блокировку, умер — его коннект тоже умер, блокировка снялась, новый воркер может подхватить таску. (Это, правда, выходит за рамки данной статьи.) Реально очень сильно облегчает жизнь, особенно если данные, с которыми работает воркер, находятся в той же бд, где захватывается блокировка (тогда блип коннекта не страшен).


  1. TyVik
    23.12.2019 10:03
    +1

    Прочитал статью и проглядел комменты, но так и не увидел упоминания PGQ. Почему бы его не взять? Более того, Алексей Лесовский в своём докладе «Топ ошибок со стороны разработки при работе с PostgreSQL» явно жалуется на самописные очереди.


    1. Melkij
      23.12.2019 10:45

      Ночь потому что, хорошие dba спят ;-)

      Самописная реализация очереди привлекает разработчиков, «что тут делать, фигак фигак и в продакшен». А что случается с продакшеном даже всего под сотней запросов к очереди в секунду — узнают потом. И то бывает игнорируют. Ну и что что 99% времени база лопатит эту недоочередь, ну и что что любая транзакция свыше десятка минут кладёт проект — но работает же!
      Да вот не просто. Хотите очередь в базе (а это действительно удобно из-за помещения события в очередь в той же самой транзакции, rollback транзакции и consumer не увидит событие как будто ничего и не было) — то вам нужен pgq. У него большие проблемы с документацией — но это та реализация очереди-в-базе которая под нагрузкой живёт нормально потому что учитывает как именно работает эта СУБД.

      По крайней мере пока не появилась реализация table access method с UNDO. Или вовсе что-то ещё более специфичное именно под очереди. Появившиеся в pg12 api tableam это уже позволяют сделать.


  1. IvanVakhrushev
    23.12.2019 10:13

    1.

    create index task__status__idx on task (status);

    Бессмысленный индекс в таком виде. Значение 2 (выполнена) из него нужно выкинуть сразу. В штатном режиме все задачи будут иметь финальный статус 2, их будет много, индекс не будет использоваться.

    2.
    error_text  text null

    Лучше так не делать в production'е.
    Люди начинают писать в поле длинные стек-трейсы, таблица тостируется, вы теряете в перфомансе.
    Ограничивайте длину поля явно.

    3. Что касается очереди с повторами «упавших» задач, то не увидел самого главного: как отличить упавший воркер от долго работающего воркера?
    Если воркер упал во время работы, кто перезапустит таску на выполнение? Кто проставит ошибку?


    1. ggo
      23.12.2019 10:30

      1. По финальному статусу обычно не ищут. Для нефинальных — cardinality обычно хороший.
      3. Это «беда» любых воркеров. Если он не сообщает свой статус, долгоиграющий не отличить от зависшего. Вариант — контролировать таймауты и принудительно прерывать, с соответствующими статусами.


      1. DmitryKoterov
        23.12.2019 11:18

        В общем случае это называется heartbeats. Раз в N секунд воркер с номером W пытается отрепортить, что он еще живой. Если во время такого репорта ему говорят, что появился какой-то другой, более молодой, воркер с тем же номером W, то старый воркер совершает сепукку.


        Правда, можно представить, какую нагрузку это все создает, когда воркеров много, и что все равно есть промежуток времени, когда оба воркера могут работать одновременно. Это-то как раз и можно решить advisory-блокировкой на значение W в основной бд, с которой работает воркер W. В этом случае сам коннект к бд выступает таким своеобразным каналом heartbeat-ов.


    1. yakov-bakhmatov Автор
      23.12.2019 12:58

      3. Супервизор.
      Супервизор запускает и поддерживает заданное количество работающих исполнителей.
      Супервизор следит за исполнителями. При аварийном завершении исполнителя перезапускает задачу. При превышении заданного времени работы исполнитель убивается, задача перезапускается.


  1. Senyaak
    23.12.2019 12:33

    Спасибо за статью. Сначало из-за слоников подумал что тут про php :D


  1. talbot
    23.12.2019 17:25

    Реализация подобной очереди давно используется в Яндекс.Деньгах и выложена на Гитхабе: github.com/yandex-money-tech/db-queue
    Её тестировали на нагрузке до 1000 транзакций (авторизаций) в секунду, и очередь вполне справляется.