Для тех, кому хочется сразу посмотреть код: репозиторий сервиса — в конце текста.


Откуда задача

Нужен сервис, который централизованно выполняет исходящие HTTP-запросы для экосистемы микросервисов и интеграций. Постановка на уровне требований:

  1. Два режима входа — и синхронный (ответ нужен вызывающей стороне), и асинхронный (достаточно принять задачу и отдать результат «куда-то ещё»).

  2. Два канала постановки — удобно и через HTTP API, и напрямую в Kafka (без лишнего hop через HTTP).

  3. Rate limit — защита квот и предсказуемое поведение при 429 со стороны внешних API.

  4. Кеш ответов — снижение нагрузки на внешние системы и наши же ресурсы.

  5. Строгий порядок там, где он важен — если указан ключ партиции в «ordered»-режиме, сообщения по этому ключу не должны перемешиваться.

  6. Масштаб по числу ключей — сотни тысяч и миллионы логических партиций при ограниченном числе воркеров; одна «тяжёлая» партиция не должна блокировать остальные.

  7. Ретраи и отложенные сообщения — экспоненциальные паузы, очередь «на завтра», планирование на произвольный горизонт.

  8. Единая точка наблюдаемости по действиям — желательно, чтобы «что произошло с задачей» оставалось в журнале событий (логе), а не только в памяти воркера.

Ни одна «одна технология» не закрывает это без слоёв. Сначала — почему в стеке именно Kafka, PostgreSQL и Redis; дальше — как мы спроектировали сервис Requester: контекст, контракты запроса/ответа, движение данных, внутренние воркеры, graceful shutdown, детали rate limit / retry / cache / отложенных задач, wake-up, тестирование и узкое место с большими payload в Redis.


Выбор стека

Асинхронный режим — это в первую очередь очередь: задачи копятся и обрабатываются не в момент вызова API, а позже, конкурируя за пул воркеров. Притом нам нужен строгий порядок там, где бизнес этого требует (по логическому ключу), и при этом журналируемый вход/выход для интеграций. Для такой модели Kafka — естественный выбор: распределённый append-only лог, топики, партиции, consumer groups, долговременное хранение, высокая пропускная способность. Входные и выходные события сервиса мы ведём через Kafka, чтобы единая цепочка «приняли задачу → обработали → отдали ответ» оставалась в брокере.

Отложенные сообщения и ретраи «на часы и дни» нельзя свести только к retention и «переливанию» в рамках одного брокера без надёжного учёта на стороне приложения. Нужна долговечная, транзакционно предсказуемая таблица фактов: когда задача должна снова стать готовой, какая у неё полезная нагрузка, как избежать дублей при гонке воркеров. Для этого оптимальна PostgreSQL: диск, SKIP LOCKED, индексы по scheduled, бэкапы, привычные операции для outbox-шаблона.

Остаётся зазор: ни Kafka, ни PostgreSQL сами по себе удобно не закрывают сценарий «миллионы логических партиций (ключей) + общий пул воркеров + справедливая конкуренция + строгий порядок внутри ключа + откладывание на секунды + ретраи с блокировкой партиции». В consumer group Kafka партиции привязаны к консьюмерам: это не тот же паттерн, что динамическая task-очередь, где воркер в следующий тик берёт следующую готовую задачу из любой логической партиции. Заводить отдельную физическую сущность на каждый бизнес-ключ в Kafka или в RabbitMQ — нереалистично. Outbox в PostgreSQL отлично хранит «запланировано на завтра», но как планировщик следующего тика для сотен тысяч ключей с разным scheduled и приоритетом он не заменяет лёгкий in-memory движок с Lua-атомарностью.

Оптимальный третий слой — Redis и очередь задач на нём: один инстанс (или кластер по мере роста) хранит состояние планировщика — ZSET, блокировки ordered-партиций, идемпотентность, heartbeat воркеров — без миллионов отдельных «очередей» в смысле брокера. Как устроена модель партиция = ordered-порядок / общий пул / reject и отложенные сообщения, rate limit как cooldown партиции, зачем Lua и какие компромиссы по памяти и durability — мы подробно разобрали в отдельной статье: «Очередь на Redis с Lua: порядок в партициях, общий пул воркеров и отложенные сообщения».

Роль smart-redis-queue

В Requester эту роль играет библиотека smart-redis-queue: ordered-партиции с префиксом !, приоритеты, prefetch, Reject/RejectWithDelay, heartbeat и возврат задач при смерти воркера — то есть всё, что в предыдущей статье описано на уровне структур ключей и Lua-скриптов. Requester не дублирует эту механику — он использует очередь как движок планирования: что лежит в payload задачи (полный JSON или ссылка на offset в Kafka) — слой контракта сервиса.


Контекст системы (уровень «ящиков»)

Внешние сервисы могут:

  • вызывать HTTP API Requester;

  • публиковать задачи в топик Kafka in.

Requester выполняет HTTP к целевым URI (или специальный режим без реального HTTP — см. wake up), учитывает лимиты, кеш, порядок и ретраи. Все ответы попадают в топик Kafka out. Дальше потребители забирают результаты сами, либо стоит Redpanda Connect (или аналог), который читает out и по ключам / заголовкам / полям proxyData раскидывает сообщения в нужные топики или очереди целевых сервисов.

Зачем такая «лестница» из компонентов. Kafka даёт append-only журнал: что задача вошла в систему и что из неё вышло — остаётся в топиках. Redis-очередь решает партиционирование и планирование с общим пулом воркеров без заведения миллиона физических очередей. PostgreSQL — долговременный outbox для задач «далеко в будущем» и длинных бэкоффов, чтобы не держать гигантские ZSET и не упираться в модель отложенных сообщений только в Redis.


C4: контейнеры (кратко)

Контейнер

Роль

Requester (процесс)

HTTP API, consumer in, пул воркеров Redis, пул воркеров PostgreSQL→Kafka, consumer out для sync, Kafka producer.

Kafka

Топик in — вход; топик out — выход; единый лог и точка интеграции для Connect.

Redis

Rate limiter, кеш ответов, очередь задач (smart-redis-queue).

PostgreSQL

Таблица outbox: задачи с scheduled в будущем, ретраи с задержкой > 1 с (для не-ordered сценариев).


Контракты: запрос и ответ

Единая форма задачи в HTTP (POST /request) и в Kafka (топик in, value — JSON) совпадает по смыслу: тело = объект с полями request и meta. Ответ, который забирают из топика out (и в режиме sync: true — ещё и в HTTP), — объект response + meta.

request — что выполнить

Поле

Тип / формат

За что отвечает

uri

строка

Целевой URL исходящего HTTP-вызова. Пустая строка — режим wake up: реальный HTTP не делаем, body/headers уходят в ответ «как есть» (см. ниже).

method

строка

HTTP-метод (GET, POST, …).

body

строка

Тело запроса (как правило JSON в виде строки; сервис не парсит схему — это контракт с целевым API).

headers

объект строк

Заголовки к целевому запросу.

meta — политика обработки и идентификаторы

Поле делает сервис гибким, позволяя настраивать стратегии работы сервиса на клиенте который шлет запросы.

Поле

Тип / формат

За что отвечает

requestId

строка, обязателен

Сквозной идентификатор задачи: попадает в out, в ключ Kafka, в meta ответа; по нему маршрутизируют downstream и в sync ждут результат в Hub.

traceparent

строка, опционально

W3C Trace Context: сервис принимает от клиента и продлевает в трейсинг; при пустом значении может заполняться на входе HTTP.

partition

строка, опционально

Логическая партиция очереди в Redis. Если имя начинается с !, включён ordered-режим: строгий порядок внутри ключа, Reject/rate limit ведут себя иначе, чем у обычных партиций.

sync

bool, опционально

true — дождаться результата в HTTP (пока воркер не положит ответ в out, out-consumer доставит в Hub). false или отсутствует — 202 Accepted, результат только в out.

scheduled

время (RFC3339), опционально

Когда первый раз считать задачу готовой к исполнению. Влияет на маршрут Redis (близкое будущее) vs PostgreSQL (дальше ~1 с).

proxyData

произвольный JSON

Прозрачный конверт: Requester не интерпретирует — копирует в meta ответа, чтобы Connect / потребители могли маршрутизировать по своим полям (целевой сервис, тенант, тип события и т.д.).

limiter

объект, опционально

Rate limit до HTTP (и до кеша). См. вложенные поля.

retry

объект, опционально

Политика повторов при 429/5xx/сети для не-ordered; для ordered — используется иначе (reject с задержкой, см. раздел про retry).

cache

объект, опционально

Кеш ответов: ключ и TTL; кладём только успешные 2xx.

meta.limiter:

Поле

За что отвечает

key

Ключ в Redis для счётчиков/окон (общий квотный «ведро»-идентификатор).

algorithm

Имя стратегии: token-bucket, leaky-bucket, fixed-window-counter, sliding-window-log, sliding-window-counter.

rates

Список пар окно + лимит: каждый элемент — duration (строка длительности, по правилам Go time.ParseDuration: 1h, 3s, 500ms, …) и value (целое число разрешённых срабатываний за окно).

meta.retry:

Поле

За что отвечает

max

Максимальное число попыток (с учётом ретраев после первого запуска).

delay / maxDelay

Стартовая и максимальная пауза между попытками (строка длительности).

multiplier

Множитель экспоненциального бэкоффа.

jitter

Доля случайного разброса вокруг задержки (снижает «столбики» нагрузки).

meta.cache:

Поле

За что отвечает

key

Ключ записи в Redis (префикс кеша добавляется на стороне сервиса).

ttl

Время жизни кешированного ответа.

В Kafka доработанная копия задачи (после consumer/processor) также содержит служебные поля верхнего уровня: tryCount (число уже выполненных/запланированных попыток), createdAt (время постановки) — в HTTP при первой отправке клиент их не передаёт, сервис при необходимости проставляет при сериализации Task.

response + meta (топик out и тело sync-ответа)

Поле

Раздел

За что отвечает

response.status

response

HTTP-статус целевого ответа (или синтетический 200 при wake up, 5xx при внутренней ошибке).

response.body / response.headers

response

Тело и заголовки ответа.

meta.requestId

meta

Тот же requestId, что во входе — связка «запрос–ответ».

meta.tryCount

meta

Сколько раз по сути «доходили» до исхода (учёт ретраев).

meta.time

meta

Время обработки на стороне Requester (длительность).

meta.cached

meta

true, если ответ взят из кеша, а не сходил в сеть.

meta.waitTime

meta

Ожидание, связанное с лимитером/паузой (для sync при 429 — в т.ч. для заголовка Retry-After).

meta.proxyData

meta

Эхо из входа: тот же произвольный JSON для downstream.

Сериализация длительностей: в моделях ответа поля meta.time и meta.waitTime имеют тип time.Duration; стандартный encoding/json в Go сериализует их целым числом наносекунд. Если понадобится строка вида 150ms на wire — это уже смена контракта (кастомный MarshalJSON).


Движение данных: от сообщения до ответа

Ключевая идея: в Kafka уже лежит полное тело задачи (JSON). Consumer in не обязан таскать этот JSON в Redis целиком.

Ветвление у consumer in

  • Если задача готова или наступит в пределах ~1 секунды — в Redis уходит лёгкая запись: ссылка на позицию в логе Kafka (topic, partition, offset) — десятки байт JSON.

  • Если отложена дальше — полный payload пишется в PostgreSQL (outbox). Когда наступает время, воркеры PostgreSQL снова отправляют задачу в Kafka in, откуда она обрабатывается обычным путём.

Зачем из PostgreSQL снова в Kafka, а не «сразу в Redis»

  1. Единый путь — вся жизнь задачи снова проходит через in: проще рассуждать, проще трассировка, один формат сообщения.

  2. Журнал — в in остаётся запись о том, что отложенная задача «проснулась»; это не теряется внутри закрытого цикла БД→приложение.

  3. Порядок относительно других событий — новая запись в in упорядочивается так же, как и остальной поток (при одной партиции топика in — глобально FIFO на уровне топика; детальная упорядоченность по бизнес-ключу обеспечивается уже Redis-очередью для ordered-партиций).

Порядок и шаринг воркеров между логическими партициями на этом пути обеспечивает слой Redis; зачем он нужен в связке с Kafka и PostgreSQL — в разделе «Выбор стека» выше.


Логика приложения: из чего состоит процесс

Компоненты

  1. Один consumer группы на топик in
    Достаточно для «перекладывания» сообщений в Redis или PostgreSQL. Топик in в эксплуатации разумно держать с одной партицией, чтобы не плодить параллельные ветки на этом раннем этапе: партиционирование по бизнес-ключу целиком отдаём Redis, чтобы не усложнять систему вторым уровнем партиций в Kafka.

  2. Пул воркеров PostgreSQL (SKIP LOCKED, несколько горутин)
    Забирают готовые строки, отправляют задачу обратно в in.

  3. Пул воркеров Redis (число = MAX_WORKERS)
    Берут задачи из очереди, разрешают payload (чтение из Kafka по offset при необходимости), вызывают processor.

  4. Пул Kafka Fetcher’ов
    По одному соединению на воркер: иначе один mutex на чтение по offset стал бы узким местом при сотнях воркеров.

  5. HTTP-сервер
    POST /request — постановка задачи в in; режим sync: true ждёт результат через локальный Hub, куда попадает ответ из consumer out.

  6. Out consumer
    Отдельная уникальная consumer group на инстанс (hostname + pid), старт с OffsetNewest, чтобы не читать историю при рестарте — нужен только мост в Hub для синхронных клиентов.

Запуск и graceful shutdown

Порядок остановки осмысленный для операторов:

  1. SIGINT / SIGTERM — начинаем shutdown.

  2. Фаза 1: HTTPServer.Shutdown: новые запросы не принимаем, висящие соединения могут дописать ответ.

  3. Фаза 2: воркеры — отмена контекста, WaitGroup дожидается завершения consumer in, PG movers, пула Redis, consumer out.

  4. Фаза 3: закрытие ресурсов — defer на producer’ы, Redis, пул fetcher’ов, БД.

Таймаут shutdown — до порядка минуты, чтобы не обрывать задачи в полёте без шанса на корректное завершение.


Rate limit, retry, cache, отложенные сообщения

Rate limit

Перед кешом и HTTP выполняется проверка лимитера (Redis + стратегии из семейства token/leaky/sliding/fixed window). При отказе:

  • для ordered-партиций (meta.partition начинается с !) — возвращается reject с задержкой (RejectWithDelay): очередь сама блокирует партицию на TTL, не крутим tight loop;

  • для обычных партиций — задача перепланируется на RetryAfter в Redis (если задержка короткая) или в PostgreSQL (если длинная).

Retry

На 429 и 5xx (и на ошибки сети) при наличии meta.retry и неисчерпанном max:

  • ordered — снова через reject + backoff в Redis (порядок и блокировка партиции сохраняются);

  • не ordered — задача сериализуется и уходит в Redis (≤ 1 с до следующей попытки) или в PostgreSQL (> 1 с), processor делает Ack текущей Redis-задачи, потому что retry уже «новая» постановка.

Бэкофф — экспоненциальный с потолком maxDelay и джиттером.

Кеш

При meta.cache ответы 2xx кладутся в Redis (обёртка go-redis/cache). Параллельные промахи по одному ключу схлопываются через singleflight: один реальный HTTP, остальные ждут.

Отложенные сообщения

Поле meta.scheduled: consumer in сам решает — сейчас в Redis (как правило, с offset-ref) или в PostgreSQL для дальнего горизонта.


Wake up (инициация без HTTP)

Если uri пустой, HTTP-вызов не выполняется: тело и заголовки из заявки попадают в ответ как «успешный» 200. В связке с scheduled и proxyData это удобный будильник для других сервисов: в нужный момент в out уходит сообщение, которое downstream может трактовать как событие или команду. Redpanda Connect дальше маршрутизирует по ключу или по полям payload.


Как тестировали

  • WireMock — контролируемый mock HTTP: 500, 429, задержки, детерминированные ответы для сценариев retry и rate limit.

  • k6 — нагрузочные скрипты в репозитории: отдельно limiter, retry, cache, ordered-партиции, max RPS; есть сценарий прямой записи в Kafka in и замера обработки через out (custom k6 с xk6-kafka).

  • Prometheus + Grafana — метрики инфраструктуры и сервиса (в docker-compose заготовки есть).

  • OpenTelemetry — опционально (OTEL_ENABLED=1): полный трейс от http.requestkafka.in.consumekafka.fetchprocessor.handlehttp.clientkafka.outkafka.out.consume, визуализация в Jaeger. Удобно ловить цикл PG → Kafka in → Redis → out.


Узкое место: большие сообщения в Redis

На практике всплыло узкое место: при больших телах задач пропускная способность заметно проседала, потому что крупный JSON гонялся через Redis (память, сеть, сериализация, время Lua/round-trip). Redis в этой архитектуре — координатор очереди, а не хранилище «толстых» документов.

Решение: в Redis кладём только ссылку на offset в Kafka (OffsetRef: topic, partition, offset). Воркер перед обработкой достаёт оригинальное сообщение из Kafka через выделенный fetcher. Пропускная способность и стабильность выросли; память Redis перестала быть линейной от размера body запросов.

Компромисс: пока сообщение не закоммичено и не вычитано, нужна согласованность retention в Kafka с горизонтом отложенных задач в Redis; для «дальних» отложенных задач по-прежнему используется PostgreSQL с полным payload.


Оптимизация через Kafka fetcher: компромиссы

Ссылка на offset вынуждает воркер перечитать запись в Kafka по конкретной позиции. Это не тот же паттерн, что последовательное чтение «хвоста» consumer group-ом: с точки зрения диска возможны скачкообразные (random) обращения к сегментам, выше доля промахов по page cache брокера по сравнению с идеальным стриминговым read.

Для порядка величины ~2500 RPS на нашем контуре это не стало практическим узким местом: у современных SSD (NVMe) произвольный ввод-вывод по-прежнему выдерживает такой профиль лучше, чем механическим дискам прошлого поколения; узким местом ранее оставались именно объём данных через Redis, а не IOPS брокера.

Retention. Если retention в топике in короткий, а воркеры отстают, теоретически сообщение по offset может уже исчезнуть из лога, пока фасад Redis ещё ссылается на него. В нашем продуктовом допущении: отправлять наружу то, что «пролежало» в очереди дольше ~5 часов, не требуется — срок retention и эксплуатационные лимиты можно согласовать с этим горизонтом. Дляё отложенных на длительные сроки задач полный payload по-прежнему живёт в PostgreSQL, а не в цепочке offset-ref.

Куда развиваться, если профиль изменится: хранить тело в отдельном объектном хранилище (S3-совместимое, MinIO и т.д.) с ключом в Redis; либо снова грузить сжатый payload в Redis (snappy/zstd), пожертвовав частью выигрыша по RAM. Как вариант, комбинировать подходы - меньше N kb в redis, все остальное в хранилище. Или даже сделать ttl store в самом приложении и если воркеры успели, то брать из памяти. Нужно экспериментировать и подбирать оптимальный вариант. Это запасной фронт работ.

На текущем этапе offset + fetcher — осмысленный компромисс: Redis остаётся лёгким, RPS отправки (producer) и общая устойчивость сценария для нас достаточны; вместо преждевременной усложнёнки отдельным store мы фиксируем цифры в нагрузочных тестах и при росте нагрузки возвращаемся к вариантам выше.


Итог

  • Спроектирован Requester как сервис исходящих HTTP-запросов с API и Kafka, с единым выходом в топик out для интеграций и Connect.

  • Kafka — журнал и точка входа/выхода; PostgreSQL — outbox для длинных задержек; Redis + smart-redis-queue — партиции, порядок, пул воркеров, rate limit и отложенность «вблизи».

  • Offset-ref и Kafka fetcher сняли давление с Redis; цена — random I/O и зависимость от retention; при ~2500 RPS и согласованных SLA по «свежести» задач это приемлемо; при необходимости — отдельный store или сжатие в Redis.

  • Graceful shutdown, k6, WireMock, Grafana и OpenTelemetry закрывают эксплуатационный цикл: от нагрузочного теста до трассировки полного пути сообщения.

Репозиторий: https://github.com/Rinsvent/requester.


Приложение: ссылки

Комментарии (19)


  1. omigo777
    26.04.2026 17:45

    Какую прикладную задачу решали? Или это статья ради статьи: LLM спроектировала что-то, LLM это что-то описала?


    1. rinsvent Автор
      26.04.2026 17:45

      Прикладная задача - рассылка сообщений. Сервис был написан как заммена действующей системе. И ключевой фактор чтобы 1 клиент не блочил другого - именно для этого нужен редис и разделение по партициям! Сейчас почему-то у многих пунктик на LLM - сервис проектировали инженеры)) Но статью да - LLM помогала написать, что скрывать. Без нее сильно дольше бы писал, но суть не в буквах а в идеи и вызовах с которыми столкнулись, и хотелось поделиться этим. В целом если формат зайдет есть идея пройтись по архитектуре и написать ряд уже не прикладных статей. Не часто на хабре видел статьи по проектированию где разбирались бы моменты выбора стека, движенеи данных - считаю, что может быть интересным)


  1. pkokoshnikov
    26.04.2026 17:45

    А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет? А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя? Зачем вообще скорость ретрая если задача асинхронная? Вопросов очень много


    1. rinsvent Автор
      26.04.2026 17:45

      А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет? 

      Вот тут хороший вопрос! Сервис проектирвоался как замена текущей системы - сейчас уже есть хранение в редис и в случае если упадет, то - да сообщения будут потеряны. На практике у нас редис кластер и такого еще ни разу не было, поэтому на первом этапе этот момент опустил, но действительно собирался этим заняться.
      Ключевая идея, что у на есть лог событий в кафка и хотел добавить команду в приложение которая в случае сбоя заново перечитает кафку (топик in) с первого offset который был не обработан (тут нужно будет слдеить за хвостом)
      Преечитав кафку (топик out) можно гарантирвоать что сообщения не будут обработаны джажды + потребуется вероятно на этот момент не стартовать воркеры.

      А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя?

      Как выше написал, пока не реализовано, но если делать, то будет зависеть от того как быстро среагируем на инцидент. Сейчас средняя скорость 300 с / сек (в пиках 1700)
      Допустим среагировали через 10 часов. Т очто было ранее 5 часов отсеиваем как устаревшие - получается не смогли отправить. Сейчас у бизнеса такие условия. получается за 5 часов надо вкачать

      при средней скорости 300 с / сек это 5 3600 * 300 = 5_400_000 сообщений которые нужно заново загрузить в систему.
      Скорость перекидывания в базу согласно бенчмаркам очереди https://habr.com/ru/articles/1018194/ могут достигать 100_000 c / сек, то есть в базу все зайдет за минуту, а разгребаться уже будет со скоростью 2500 rps примерно минут 40.
      Это худший случай, когда все сообщения надо переотпрравить, если часть будет дедублицирована из-за того что ранее обрабоатлась, то должно быть быстрее. Но это пока в теории, реальные цифра можно будет увидеть только после реализации

      Зачем вообще скорость ретрая если задача асинхронная?

      Не совсем понял вопрос, но если суть что можно было sleep поставить, то тогда воркеры будут тормозиться и перфоманс будет сильно проседать. Так что с возвратом в очередь получается намного эффективнее


      1. pkokoshnikov
        26.04.2026 17:45

        Ну ладно главный вопрос, почему не взяли postgres? 2500 rps не выглядит слишком большой нагрузкой. В придачу бы получили персистентность и надёжность.

        Асинхронный контракт обычно делают когда результат нужен не обязательно сразу. Иначе зачем вообщем вам асинхронный контракт?


        1. rinsvent Автор
          26.04.2026 17:45

          Все просто) postgres плохо подходит для реализации требований описанных в ТЗ к задаче.
          Для нас критически важно - не блокирующие партиции / гарантированный порядок отправки в рамках 1 партиции. В этом случае на постгрес пришлось бы делать логику по типу. Храним мапинг воркер - партиция. При запросах джойним их и фильтруем. Затем получить 1 сообщение / залочить / обработать / акнуть или реджекнуть. После того как воркер разгреб одну партицию перекидываем его на другую, причем это пришлось бы делать не только когда разгреб а при любых ошибках с лиимитами или ретраями. Пришлось бы писать координатор, который бы слал еще доп запросы))
          2500 rps это выполненных задач. При получении по 1 сообщению если все будет лежать в базе, то на 1 сообщение это как минимум 3 запроса, то есть уже 7500 обращений к базе в секунду. То есть это уже почти потолок. С редис есть простор для оптимизаций и увеличению пропускной способности. Но не важно. Выше написал что за 5 часов у нас где-то 5_400_000 сообщений, если горизон хранения несколько дней, то в базе будет условно 54 миллиона сообщений и запросы будут явно работать сильно медленнее чем в редисе и более чем уверен что пропускная способнасть была бы меньше в разы. причем 54 миллиона это только срочных, а отложенные могут лежать годами.
          Ну и самое важное что решение с редис уже давно на проде и показывает себя отлично и надежно)
          То что редис не дает таких же гарантий согласен, но это можно поправить через логику описанную выше - с перечитываним кафки. Только прийдется немного посидеть)

          Асинхронный контракт обычно делают когда результат нужен не обязательно сразу. Иначе зачем вообщем вам асинхронный контракт?

          Все верно. есть кейсы когда ответ не нужен вообще, а есть когда нужен. и обязательно все сообщения должны проходить через одну систему rate limit


          1. pkokoshnikov
            26.04.2026 17:45

            Есть достаточно не сложное решение

            Делаем таблицу очереди. У каждого сообщения есть ключ, который задаёт логическую партицию.

            Открываем транзакцию

            Сначала через FOR UPDATE SKIP LOCKED выбираем кандидатов на обработку из таблицы очереди. После этого пытаемся взять pg_try_advisory_xact_lock для 1 ключа.

            Этим мы обеспечиваем эксклюзивность обработки данного ключа. Далее отдельным запросом выбираем сообщения этого ключа для обработки.

            После успешной обработки удаляем сообщения из очереди и коммитим транзакцию.

            Тут можно довольно многое оптимизировать например разделить на 2 таблицы саму очередь и потребление, либо заменить pg_try_advisory_xact_lock на таблицу уникальных ключей которые лочатся вместе с выборкой.

            Для меня просто ваш подход кажется опасным, сбой может быть очень болезненным

            По поводу скорости работы, оно конечно понятно, только непонятно откуда такие требования? Вам уже отгрузили запросы через кафка, вы уже потеряли в latency, на приёме и отправке обратно. Как тут такой подход может быть связан с требованиями low latency? А если latency не важно, то в чём смысл тогда делать это на redis?


            1. rinsvent Автор
              26.04.2026 17:45

              Это только звучит хорошо) Вы предлагаете открыть транзакцию и делать внутри нее http запросы, которые могут работать вплоть до минуты. Длинные транзакции будут убивать базу - чистка мусора, bloating, autovacuum, локи.
              постгрес не масштабируется горизонтально и с ней точно упремся в cpu / io на одном сревере.
              Вы предполагаете только успешный сценарий когда всегда акаем и удаляем данные, но на практике много reject и блокировок ключей в ожидание.
              как следствие при вашем подходе 1 и тоже сообщение будет бесконечно крутиться воркерами - взяли, словили 429 вернули, взяли словили 429 вернули, а таких ключей много. в итоге воркеры начнут крутить только их и по факту вся система встанет, не сможет обрабатывать те ключи которые реально сейчас доступны. Нужно будет городить более сложную систему блокировок.

              Судя по предложению удалять записи вы предполагаете что таблица будет стремиться к нулю, но нет. очень часто сейчас бывает что есть большой пулл собщений (700k) по 1 ключу, и они обрабатываются часами, есть отложенные сообщения которые лежат годами.
              База никогда не будет пуста и в ней будут миллионы сообщений, что сделает запросы медленными.

              Частые удаления под нагрузкой тоже будут приводить к деградации. Тут выгоднее было бы менять статус на done и потом чистить периодически.

              Я согласен что на постгресе можно сделать что-то подобное, но повторюсь - пропускная способность такой системы будет значительно ниже)

              Такое решение, что предложили, хорошо подходит для быстрых задач, когда сразу же в транзакции что-то обновить, маленькой нагрузке, для http запросов, считаю, плохая идея)

              Насчет latency. Тут зависит от того какие будут задержки. пара секунд - нормально, пара минут - плохо. Постгрес - не очередь и не сможет стабильно держать высокую пропускную способность для таких задач. в итоге система будет деградировать и все ключи будут отрабатывать медленнее. В случае редиса, остается большой запас прочности. 1 ключ никак не афектит другой, объем базы никак не влиет на перфоманс.


              1. pkokoshnikov
                26.04.2026 17:45

                Ну у вас же есть политика ретрая, через сколько вы хотите запустить обработку, задаёте поле execute_after которое как раз и решает эту проблему, при запросе фильтруете по нему. Это не проблема. Можете обрабатывать в транзакции не все ключи, а по одному это решит проблему транзакций. В этом кейсе даже лучше подходит.

                Проблема autovacuum не так критична, я уже привёл что можно разделить таблицу самой очереди где по партициям можно хранить сами сообщения и на таблицу статусов сообщений, тогда vacuum будет не такой большой. А очередь будет очищаться дропом партиций.

                Постгрес - не очередь и не сможет стабильно держать высокую пропускную способность для таких задач.

                Тут голословно.


                1. rinsvent Автор
                  26.04.2026 17:45

                  Введение execute_after не решает задачу.
                  если предлагаете фильтровать по нему, то тогда ломается порядок, первую строку отфильтруете и возьмете вторую.
                  если предлагаете проверять на клиенте, то возвращаемся к кейсу когда гоняем одно сообщение туда сюда.

                  Система блокировок будет далеко не тривиальна.
                  На редисе это делается просто. В postgresql - нет. И из-за доп таблиц, полей и т.д. чтобы заставить постгрес удовлетворять условиям его пропускная способность станет еще меньше.

                  я уже привёл что можно разделить таблицу самой очереди где по партициям можно хранить сами сообщения

                  Не совсем понял про отдельную таблицу статусов сообщений. Партицировать таблицу можно - согласен, но все равно это будет 1 сревер и его ресрусы.

                  А очередь будет очищаться дропом партиций.

                  Тоже не понятно как очередь будет очищаться дропом партиции если там помимо обработанных есть и живые сообщения которые только будут взяты в работу....

                  Тут голословно.

                  Нет. Тут я имел в виду ваше предложение с длинными транзакциями. Длинные транзакции всегда зло и однозначно будут негативные последствия стрелять. Причем в этой схеме проблемы будут когда внешняя система начнет тормозить, а это мы не можем контролировать. Они тормозят, у нас дольше транзакции, мы падаем.


                  1. pkokoshnikov
                    26.04.2026 17:45

                    Ну сделайте execute after на ключ. Он же не обязательно должен быть привязан к сообщению.

                    Идею с двумя таблицами можете глянуть тут https://github.com/pkokoshnikov/dbq

                    Для меня просто защита от сбоев и восстановление имеют высокий приоритет и я не могу ими пренебречь. Не знаю задач где можно спокойно пол дня не работать, причём закладывать это в архитектуру.


                    1. rinsvent Автор
                      26.04.2026 17:45

                      Вы сделали ровно тоже самое)
                      Восстановление в вашем решение будет работать хорошо. А защиты от сбоев нет...
                      Проблема долгих транзакций никуда не делась. Вы предлагаете обрабатывать по одному сообщению, но это не решит ошибку. Если один сервис начнет тормозить то все запросы к нему будут долгими, все воркеры рано или поздно встанут на запросах именно к этому сервису. И вот тут мы уже никак проблему не решим и можем так неделю просидеть пока сервису не полегчает (ну я конечно преувеличиваю, что-нибудь можно придумать будет).
                      То есть в вашем решении заложена бомба которая тригернет в рандомный момент.

                      На счет редис мы наверное друг друга не до конца поняли. При настройке AOF/RDB будут потеряны не все сообщения, а только те что не успели зафиксироваться
                      В целом придумал легкое решение... Доработаю чтобы ключ idempotency жил указанное окно. Тогда в случае сбоя достаточно будет сдвинуть offset в кафке за тот период который хотим перечитать.
                      Плюс добавлю команду корторая перечитает out и заполнит ключ idempotency если его нет за указанное окно времени. в таком решение. команда заполнения блокировки должна отработать за пару минут, перечитывания с offset тоже. Если реакция на инцидент быстрая, то у воркеров не будет болого лага.
                      Решение на час. На выходных добавлю и можно закрывать тему))


                      1. pkokoshnikov
                        26.04.2026 17:45

                        Так я в начале спросил какой объём данных в редис? И насколько я понял у вас не хранится в редис большая часть данных так как и их достаточно много, поэтому при падении редиса придётся перечитать данные полностью из кафка, но не факт что там будут хранится все данные.


                        Меня этот диалог тоже натолкнул на одну мысль как из схемы с постгрес сделать ещё более оптимальную схему которая обойдёт проблему с хот ключами. Скину чуть позже нужно время проверить.


                      1. rinsvent Автор
                        26.04.2026 17:45

                        На самом деле в редисе миллионы ключей и все горячие сообщения - тут в зависимости от нагрузки могут миллионы лежать, так как новые сообщения зашли, но пропускная способность внешнего сервиса допустим 30 запросов в секунду по rate limit. в итоге все будет по порядку постепенно обрабатываться. постоянно туда летит и воркеры после обработки вычищают.

                        Насчет AOF/RDB. Его можно настроить чтобы без потерь работал. Цена - пропускная способность. Каждый выбирает под себя. Можно настроить выгрузку раз в секунду, тогда перечитывать нужно будет не все, а только с момента инцидента - должно быть быстро.

                        Плюс еще замечание к Вашему решению.
                        У нас порядка 200 млн+ ключей, какие-то могут постепенно пропадать, какие-то новые появляются.

                        тут 2 вариант
                        1. добавляем и чистим
                        2. держим всегда в базе

                        1. тогда колличество запросов при добавлении сообщения. добавить сообщение / добавить ключ upsert всегда / прочитать ключи / залочить ключ / прочитать сообщение с локом / удалить сообщеие / если сообщения нет удалить ключ
                        2 добавить сообщение / добавить ключ upsert всегда / прочитать ключи (причем с джойном сообщений и фильтром по наличию иначе будет получать один и тот же ключ в котором нет сообщений) / залочить ключ / прочитать сообщение с локом / удалить сообщеие

                        В 1 будет просадка из-за увеличения колличества запросов
                        В 2 будет просадка из-за большой таблицы с ключами и джойном с фильтром.


                        В целом я точно не могу согласиться делать http запросы внутри транзакции. Если честно это вредный совет. Ну согласитесь же? Всем всегда советую этого избегать. Если предложите решение без транзакций и которое будет держать хотя бы 300 rps, то буду рад обсудить ))


          1. pkokoshnikov
            26.04.2026 17:45

            Ещё один странный момент, вам так важен порядок обработки, но вы можете потерять часть сообщений, тоже странно выглядит.


            1. rinsvent Автор
              26.04.2026 17:45

              Можем, но не теряем) Редис хорошо справляется со своей задачей. Если будут прициденты - это станет приоритетом. Я согласен что это важный вопрос, просто пока есть более приоритетные задачи.


  1. KON88
    26.04.2026 17:45

    Прочёл статью. Вопросов действительно много.

    Главная цель использования такого количества инструментов это 2500 rps? Или возможность контроля событий как в saga?


    1. rinsvent Автор
      26.04.2026 17:45

      Главная цель использования такого количества инструментов это 2500 rps?

      В приложении используется очередь / база / in memory хранилище. Это обычно минимум для чуть менее сложной системы) Без postgresql невозможно было бы реализовать сообщения отсылаемые через год. Без редиса невозможно было бы реализвоать партицирование и гарантию порядка для ограниченного числа воркеров. Очередь must have для такой системы.
      Все инструменты обоснованы и описаны в статье, что за что отвечает. rps был как маркер - выдержим ли мы текущие нагрузки. Ключевые факторы - неблокирующие партиции и порядок доставки в рамках портиции. Сами требования к сервису описаны в начале статьи)


  1. titan_pc
    26.04.2026 17:45

    О, давненько не было видно настоящих инжинерных статей.

    Затейный такой гигантский уведомлятор/прокси получился. Сразу напрашивается не только http выход, но и интеграция с почтой и мессенджерами всякими на развитие. Ещё прикольный сценарий - между двумя закрытыми контурами данные гонять, когда контур Б может в порт А. А А не может в Б.