В какой-то момент на старте в нашем data-сервисе (известная в узких кругах аналитическая платформа для селлеров WB/Ozon «Таблички») стало возникать много фоновых работ: ETL‑сенсоры, сложные API‑запросы к маркетплейсам, пересчёты витрин, обслуживание «сервисных» задач. К тому же добавилась потребность сгладить пики нагрузки на БД, растянув поступающую нагрузку во времени. Хотелось:

  • без отдельного брокера (Rabbit/Kafka) на старте,

  • с горизонтальным масштабированием воркеров,

  • с понятным «забрал задачу → обработал → отметил результат»,

  • чтобы падение воркера не превращало задачи в «вечные зомби»,

  • и чтобы очередь не могла съесть всю память (backpressure).

Ниже — паттерн “очередь в Postgres”, который хорошо работает в проде для задач at-least-once: SKIP LOCKED для конкурентного забора, lease/heartbeat для возврата задач при падениях, и backpressure на стороне воркера. Такой подход быстр в реализации и не требует отдельной инфраструктуры. Производительность упирается в диск/WAL, но для фоновых задач обычно даёт сотни–тысячи операций очереди в секунду на одном Postgres-инстансе — с большим запасом для большинства MVP.

Важно: это не замена Kafka в high-load стриминге и не про “exactly-once”. Это pragmatic-решение для фоновых задач/джобов, когда Postgres уже есть и хочется минимальную инфраструктуру (необходимо в стартапах).


1) Модель данных: “таблица задач” + “таблица heartbeat”

Таблица задач (пример)

Пример набора полей, который нужен для такого паттерна:

CREATE TABLE tasks_queue (
  task_id           uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  task_name         text NOT NULL,
  task_parameters   jsonb NOT NULL,

  priority          int  NOT NULL DEFAULT 1,

  is_occupied       boolean NOT NULL DEFAULT false,
  when_occupied     timestamptz NULL,
  worker_id         uuid NULL,

  error_occurred     boolean NOT NULL DEFAULT false,
  when_error        timestamptz NULL,
  error_text        text NULL,

  created_at        timestamptz NOT NULL DEFAULT now()
);

Таблица heartbeat

CREATE TABLE workers_heartbeat (
  worker_id       uuid PRIMARY KEY,
  last_heartbeat  timestamptz NOT NULL
);

Идея простая: воркер раз в N секунд “пингует” себя в БД. Если heartbeat давно не обновлялся — значит воркер “мертв”, и его задачи можно перезабрать (отдать другим воркерам).

2) Атомарный “забор” задач: FOR UPDATE SKIP LOCKED + UPDATE … RETURNING

Классическая проблема: несколько воркеров одновременно читают “первые 100 задач” и все забирают одно и то же.

Решение — не делать SELECT отдельно, а делать атомарно:

  • выбираем кандидатов FOR UPDATE SKIP LOCKED (чтобы другие транзакции их пропустили),

  • тут же помечаем занятыми,

  • возвращаем список задач через RETURNING.

Пример запроса:

WITH task_selection AS (
  SELECT task_id, task_name, task_parameters
  FROM tasks_queue t
  WHERE error_occurred = FALSE
    AND (
      -- новые задачи
      is_occupied = FALSE
      OR t.worker_id IS NULL

      -- или задачи от “мертвого” воркера (см. heartbeat)
      OR NOT EXISTS (
        SELECT 1
        FROM workers_heartbeat w
        WHERE w.worker_id = t.worker_id
          AND w.last_heartbeat >= NOW() - INTERVAL '30 minutes'
      )
    )
  ORDER BY priority DESC
  LIMIT 100
  FOR UPDATE SKIP LOCKED
)
UPDATE tasks_queue
SET is_occupied = TRUE,
    when_occupied = NOW(),
    worker_id = '…worker uuid…'
WHERE task_id IN (SELECT task_id FROM task_selection)
RETURNING task_id, task_name, task_parameters;

Почему это работает:

  • SKIP LOCKED гарантирует, что параллельные воркеры не будут блокировать друг друга на одних и тех же строках — они просто “перепрыгнут” занятые строки.

  • UPDATE … RETURNING делает “claim” атомарным: задача либо перешла конкретному воркеру, либо нет.

3) Lease/heartbeat: как возвращать “зависшие” задачи

Если воркер упал после того, как пометил задачу is_occupied=true, задача зависнет навсегда… если не сделать механизм возврата.

Самый простой вариант lease:

  • воркер регулярно пишет heartbeat,

  • при выборе задач разрешаем забирать и “occupied”, если у их worker_id heartbeat устарел.

Это даёт поведение:

  • нормально работающий воркер удерживает задачи, потому что heartbeat свежий,

  • упавший воркер через DEAD_EXECUTOR_OCCUPATION_SEC “отпускает” задачи (их заберёт другой воркер).

Тонкости:

  • это at-least-once: если воркер упал в середине выполнения, задачу возьмут снова.

  • значит, обработчики должны быть идемпотентными или иметь дедупликацию эффектов.

4) Backpressure: чтобы не взорвать память и не забрать “лишнего” из БД

Очень частая ошибка самодельных очередей: воркер каждые 5 секунд забирает “LIMIT 1000”, складывает в память, а потом ещё “LIMIT 1000”… и так до OOM.

Решение: держать локальную очередь фиксированного размера (asyncio.Queue(maxsize=…)) и из БД забирать ровно столько, сколько есть свободного места.

Схема:

  • есть task_queue в памяти (bounded),

  • есть active_counter (сколько задач сейчас реально выполняется),

  • producer считает free_capacity и делает LIMIT free_capacity,

  • consumer исполняет задачи с семафором по параллелизму.

Псевдокод:

free_capacity = task_queue.maxsize - (running + queued)
if free_capacity > 0:
    tasks = db_claim_tasks(limit=free_capacity)
    for t in tasks:
        await task_queue.put(t)

Это даёт:

  • стабильную память,

  • предсказуемый параллелизм,

  • и “естественное” давление на источник задач.

5) Graceful shutdown: корректная остановка воркеров

В проде воркеры живут в контейнерах/оркестраторах: прилетает SIGTERM, под ждёт terminationGracePeriodSeconds, потом убивает.

Правильное поведение:

  1. перестать брать новые задачи из БД,

  2. дождаться завершения уже взятых,

  3. корректно выйти.

Практически:

  • общий stop_event,

  • producer при stop_event не делает db_claim_tasks,

  • consumer выходит, когда очередь пуста и активных задач нет.

6) Обработка ошибок и ретраи

Минимальный вариант:

  • на исключении записываем error_occurred=true и error_text,

  • либо делаем attempts++ и next_retry_at,

  • сломанные задачи уводим в отдельную таблицу/статус.

Я предпочитаю:

  • хранить attempts,

  • делать экспоненциальный backoff,

  • и иметь простой SQL-дашборд “сколько задач в очереди / сколько error / сколько занято / сколько зависло”.

7) Когда этот паттерн НЕ стоит использовать

Не ваш вариант, если:

  • нужен высокий throughput (сотни тысяч/миллионы сообщений в минуту),

  • нужна строгая гарантия порядка,

  • нужно exactly-once,

  • нужны задержки/таймеры/сложные роутинги (хотя часть можно сделать полями в таблице).

Тогда лучше брокер + зрелый фреймворк (Kafka/Rabbit/SQS + consumer group и т.д.).

Итог

Очередь в Postgres на SKIP LOCKED — отличный “инфраструктурный компромисс”, когда:

  • Postgres уже есть,

  • задачи фоновые,

  • нужен простой масштабируемый воркер-пул,

  • важнее надёжность и прозрачность, чем максимальная производительность.

  • не хочется заводить 10 разных ВМ на старте проекта/MVP

Ключевые элементы, которые делают решение боевым:

  • атомарный claim через FOR UPDATE SKIP LOCKED + UPDATE … RETURNING,

  • возврат задач через lease/heartbeat,

  • backpressure через bounded-очередь и LIMIT free_capacity,

  • graceful shutdown.

Комментарии (5)


  1. AlekseyPeregudov
    11.01.2026 07:11

    Вы изобретаете dbms_scheduler из Oracle. Не подумайте, ничего плохого в этом не вижу, даже наоборот. Если интересно - можете подсмотреть что там есть хорошего для вас.

    Например - объединение заданий в классы: "эти 10 - закрытие дня", "эти 20 - отчетность" и так далее.

    Выстаивание последовательности заданий: "эта задача стартует по завершению вот той" - выстаивается цепочка логически взаимосвязанных заданий и не нужно до минут высчитывать время выполнения каждой из них.

    Приоритезация и окна выполнения: "с 23:00 до 6:00" должны быть выполнены эти 200 заданий, их приоритет такой-то.

    Ну и так далее. Удачи.


    1. pel_mrk Автор
      11.01.2026 07:11

      Спасибо! Возьму на заметку


  1. GerrAlt
    11.01.2026 07:11

    кажется тут advisory lock был бы более производительным вариантом по сравнению с таблицей heartbeat


    1. pel_mrk Автор
      11.01.2026 07:11

      Спасибо за идею!


  1. ptr128
    11.01.2026 07:11

    воркер раз в N секунд “пингует” себя в БД

    Если вместо (или кроме) абстрактного worker_id фиксировать в таблице pg_backend_pid(), то никакого heartbeat не требуется. Достаточно по pg_stat_activity проверять наличие pid и сравнивать backend_start с временем последней модификации задачи. Если backend_start больше времени модификации задачи или такого pid нет в pg_stat_activity, то процесс умер и блокировки нет.