В какой-то момент на старте в нашем data-сервисе (известная в узких кругах аналитическая платформа для селлеров WB/Ozon «Таблички») стало возникать много фоновых работ: ETL‑сенсоры, сложные API‑запросы к маркетплейсам, пересчёты витрин, обслуживание «сервисных» задач. К тому же добавилась потребность сгладить пики нагрузки на БД, растянув поступающую нагрузку во времени. Хотелось:
без отдельного брокера (Rabbit/Kafka) на старте,
с горизонтальным масштабированием воркеров,
с понятным «забрал задачу → обработал → отметил результат»,
чтобы падение воркера не превращало задачи в «вечные зомби»,
и чтобы очередь не могла съесть всю память (backpressure).
Ниже — паттерн “очередь в Postgres”, который хорошо работает в проде для задач at-least-once: SKIP LOCKED для конкурентного забора, lease/heartbeat для возврата задач при падениях, и backpressure на стороне воркера. Такой подход быстр в реализации и не требует отдельной инфраструктуры. Производительность упирается в диск/WAL, но для фоновых задач обычно даёт сотни–тысячи операций очереди в секунду на одном Postgres-инстансе — с большим запасом для большинства MVP.
Важно: это не замена Kafka в high-load стриминге и не про “exactly-once”. Это pragmatic-решение для фоновых задач/джобов, когда Postgres уже есть и хочется минимальную инфраструктуру (необходимо в стартапах).
1) Модель данных: “таблица задач” + “таблица heartbeat”
Таблица задач (пример)
Пример набора полей, который нужен для такого паттерна:
CREATE TABLE tasks_queue (
task_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_name text NOT NULL,
task_parameters jsonb NOT NULL,
priority int NOT NULL DEFAULT 1,
is_occupied boolean NOT NULL DEFAULT false,
when_occupied timestamptz NULL,
worker_id uuid NULL,
error_occurred boolean NOT NULL DEFAULT false,
when_error timestamptz NULL,
error_text text NULL,
created_at timestamptz NOT NULL DEFAULT now()
);
Таблица heartbeat
CREATE TABLE workers_heartbeat (
worker_id uuid PRIMARY KEY,
last_heartbeat timestamptz NOT NULL
);
Идея простая: воркер раз в N секунд “пингует” себя в БД. Если heartbeat давно не обновлялся — значит воркер “мертв”, и его задачи можно перезабрать (отдать другим воркерам).
2) Атомарный “забор” задач: FOR UPDATE SKIP LOCKED + UPDATE … RETURNING
Классическая проблема: несколько воркеров одновременно читают “первые 100 задач” и все забирают одно и то же.
Решение — не делать SELECT отдельно, а делать атомарно:
выбираем кандидатов
FOR UPDATE SKIP LOCKED(чтобы другие транзакции их пропустили),тут же помечаем занятыми,
возвращаем список задач через
RETURNING.
Пример запроса:
WITH task_selection AS (
SELECT task_id, task_name, task_parameters
FROM tasks_queue t
WHERE error_occurred = FALSE
AND (
-- новые задачи
is_occupied = FALSE
OR t.worker_id IS NULL
-- или задачи от “мертвого” воркера (см. heartbeat)
OR NOT EXISTS (
SELECT 1
FROM workers_heartbeat w
WHERE w.worker_id = t.worker_id
AND w.last_heartbeat >= NOW() - INTERVAL '30 minutes'
)
)
ORDER BY priority DESC
LIMIT 100
FOR UPDATE SKIP LOCKED
)
UPDATE tasks_queue
SET is_occupied = TRUE,
when_occupied = NOW(),
worker_id = '…worker uuid…'
WHERE task_id IN (SELECT task_id FROM task_selection)
RETURNING task_id, task_name, task_parameters;
Почему это работает:
SKIP LOCKEDгарантирует, что параллельные воркеры не будут блокировать друг друга на одних и тех же строках — они просто “перепрыгнут” занятые строки.UPDATE … RETURNINGделает “claim” атомарным: задача либо перешла конкретному воркеру, либо нет.
3) Lease/heartbeat: как возвращать “зависшие” задачи
Если воркер упал после того, как пометил задачу is_occupied=true, задача зависнет навсегда… если не сделать механизм возврата.
Самый простой вариант lease:
воркер регулярно пишет heartbeat,
при выборе задач разрешаем забирать и “occupied”, если у их
worker_idheartbeat устарел.
Это даёт поведение:
нормально работающий воркер удерживает задачи, потому что heartbeat свежий,
упавший воркер через
DEAD_EXECUTOR_OCCUPATION_SEC“отпускает” задачи (их заберёт другой воркер).
Тонкости:
это at-least-once: если воркер упал в середине выполнения, задачу возьмут снова.
значит, обработчики должны быть идемпотентными или иметь дедупликацию эффектов.
4) Backpressure: чтобы не взорвать память и не забрать “лишнего” из БД
Очень частая ошибка самодельных очередей: воркер каждые 5 секунд забирает “LIMIT 1000”, складывает в память, а потом ещё “LIMIT 1000”… и так до OOM.
Решение: держать локальную очередь фиксированного размера (asyncio.Queue(maxsize=…)) и из БД забирать ровно столько, сколько есть свободного места.
Схема:
есть
task_queueв памяти (bounded),есть
active_counter(сколько задач сейчас реально выполняется),producerсчитаетfree_capacityи делаетLIMIT free_capacity,consumerисполняет задачи с семафором по параллелизму.
Псевдокод:
free_capacity = task_queue.maxsize - (running + queued)
if free_capacity > 0:
tasks = db_claim_tasks(limit=free_capacity)
for t in tasks:
await task_queue.put(t)
Это даёт:
стабильную память,
предсказуемый параллелизм,
и “естественное” давление на источник задач.
5) Graceful shutdown: корректная остановка воркеров
В проде воркеры живут в контейнерах/оркестраторах: прилетает SIGTERM, под ждёт terminationGracePeriodSeconds, потом убивает.
Правильное поведение:
перестать брать новые задачи из БД,
дождаться завершения уже взятых,
корректно выйти.
Практически:
общий
stop_event,producerприstop_eventне делаетdb_claim_tasks,consumerвыходит, когда очередь пуста и активных задач нет.
6) Обработка ошибок и ретраи
Минимальный вариант:
на исключении записываем
error_occurred=trueиerror_text,либо делаем
attempts++иnext_retry_at,сломанные задачи уводим в отдельную таблицу/статус.
Я предпочитаю:
хранить
attempts,делать экспоненциальный backoff,
и иметь простой SQL-дашборд “сколько задач в очереди / сколько error / сколько занято / сколько зависло”.
7) Когда этот паттерн НЕ стоит использовать
Не ваш вариант, если:
нужен высокий throughput (сотни тысяч/миллионы сообщений в минуту),
нужна строгая гарантия порядка,
нужно exactly-once,
нужны задержки/таймеры/сложные роутинги (хотя часть можно сделать полями в таблице).
Тогда лучше брокер + зрелый фреймворк (Kafka/Rabbit/SQS + consumer group и т.д.).
Итог
Очередь в Postgres на SKIP LOCKED — отличный “инфраструктурный компромисс”, когда:
Postgres уже есть,
задачи фоновые,
нужен простой масштабируемый воркер-пул,
важнее надёжность и прозрачность, чем максимальная производительность.
не хочется заводить 10 разных ВМ на старте проекта/MVP
Ключевые элементы, которые делают решение боевым:
атомарный claim через
FOR UPDATE SKIP LOCKED+UPDATE … RETURNING,возврат задач через lease/heartbeat,
backpressure через bounded-очередь и
LIMIT free_capacity,graceful shutdown.
Комментарии (5)

ptr128
11.01.2026 07:11воркер раз в N секунд “пингует” себя в БД
Если вместо (или кроме) абстрактного worker_id фиксировать в таблице pg_backend_pid(), то никакого heartbeat не требуется. Достаточно по pg_stat_activity проверять наличие pid и сравнивать backend_start с временем последней модификации задачи. Если backend_start больше времени модификации задачи или такого pid нет в pg_stat_activity, то процесс умер и блокировки нет.
AlekseyPeregudov
Вы изобретаете dbms_scheduler из Oracle. Не подумайте, ничего плохого в этом не вижу, даже наоборот. Если интересно - можете подсмотреть что там есть хорошего для вас.
Например - объединение заданий в классы: "эти 10 - закрытие дня", "эти 20 - отчетность" и так далее.
Выстаивание последовательности заданий: "эта задача стартует по завершению вот той" - выстаивается цепочка логически взаимосвязанных заданий и не нужно до минут высчитывать время выполнения каждой из них.
Приоритезация и окна выполнения: "с 23:00 до 6:00" должны быть выполнены эти 200 заданий, их приоритет такой-то.
Ну и так далее. Удачи.
pel_mrk Автор
Спасибо! Возьму на заметку