Привет, Habr!
Меня зовут Магомед, я руководитель команды «Платформа платежей и коммуникаций» в Lenta tech («Группа Лента»). В статье хочу разобрать задачу приоритезации уведомлений в Kafka на высоких нагрузках. Речь пойдет о платформе коммуникаций, которая обрабатывает SMS, push, email и мессенджеры и за месяц отправляет более 301 млн сообщений.
Проблема, с которой пришлось столкнуться, типовая для таких систем: как гарантировать быструю доставку критичных сообщений, если основной объем трафика — это массовые рассылки.
Бизнес-контекст и постановка задачи
Единая платформа коммуникаций обрабатывает все исходящие уведомления: SMS, push, email, мессенджеры. За последний месяц отправлено 301 млн сообщений. Требования делятся на три класса:

Ключевое бизнес-требование: маркетинговый трафик не должен создавать «пробки» для OTP и транзакционных уведомлений. Приоритет должен соблюдаться для всей платформы.
Проблема «шумных соседей»: миллионные очереди маркетинговых рассылок не должны влиять на задержки критических OTP.
Ограничения Kafka и архитектурный подход
Kafka не поддерживает приоритезацию «из коробки». Внутри одного топика и партиции сообщения читаются строго последовательно (FIFO). Это означает, что если в одном потоке окажутся OTP-коды и маркетинговые рассылки, критичные сообщения неизбежно будут стоять в очереди за менее приоритетными. Это поведение нельзя изменить настройками, поэтому приоритезация решается только на уровне архитектуры.
В системе потоки были физически разделены сразу по нескольким осям:
по приоритету (P1, P2, P3);
по каналу доставки (SMS, push, email, мессенджеры);
по бренду.
Такой подход позволил изолировать нагрузку и задать разные параметры обработки для разных типов сообщений.
Для критичных уведомлений (P1) важна минимальная задержка, поэтому чтение и запись идут по одному сообщению. Для сервисных уведомлений (P2) используется умеренный батчинг. Для маркетинга (P3) — большой batch.size, где ключевым становится пропускная способность.
Почему не подошли альтернативные решения
Логичный вопрос: почему не использовать один топик и разнести приоритеты по партициям, например: P1 → партиция 0, P2 → партиция 1, P3 → партиция 2, а затем читать их параллельно?
У такого подхода действительно есть плюсы. Он проще с точки зрения структуры, не требует большого количества топиков и, например, не сталкивается с проблемой «мерцания», которая возникает при батчевой обработке. Но на практике проявляются ограничения:
Невозможно задать разные параметры потребления для разных партиций в рамках одной consumer group. Для P1 нужна минимальная задержка и маленькие батчи, для P3 — наоборот, крупные батчи и высокая пропускная способность. В одной группе это не разделить.
Появляется проблема управления ограниченным каналом отправки. Даже если сообщения читаются параллельно, маркетинг может занять все доступные ресурсы, например, лимит SMS-провайдера, и начать вытеснять OTP. В Kafka нет встроенного механизма, который гарантирует приоритет на уровне внешнего канала.
Возникают сложности с масштабированием. Увеличение числа партиций для P3 приводит к ребалансировке consumer group, что затрагивает и P1, и P2. В результате критичные потоки начинают зависеть от изменений в низкоприоритетных.
При равномерной нагрузке и отсутствии ограничений на стороне каналов такой подход мог бы работать лучше. Но в условиях, где маркетинг генерирует основной объем трафика, а каналы доставки имеют жесткие лимиты, он не решает ключевую задачу — гарантированную приоритезацию.
В качестве альтернативы также рассматривался RabbitMQ с приоритетными очередями, где поддержка приоритетов есть на уровне брокера.
Но и этот вариант не подошел. При миллионных нагрузках возникают ограничения по пропускной способности, а добавление RabbitMQ означало бы введение дополнительной технологии в стек с дублированием уже существующего функционала.
При этом Kafka уже использовалась как основа инфраструктуры и обеспечивала необходимую пропускную способность и репликацию, поэтому переход на отдельный инструмент ради приоритезации не выглядел оправданным.
Почему выбрана текущая архитектура
Гибкость конфигурации — возможность оптимизировать настройки Kafka под уникальный профиль нагрузки каждого приоритета (батчинг, количество партиций, таймауты).
Управление ресурсами канала — мьютекс создает backpressure на чтении, освобождая лимитированный канал отправки (SMS-агрегатор, email-провайдер) для критических OTP.
Операционная изоляция — проблемы с маркетинговыми рассылками (лаги, сбои провайдера) не влияют на доставку OTP.
Многоподовая координация — mutex на redis обеспечивает синхронизацию между несколькими подами.
Модель с отдельными топиками и иерархическими мьютексами выбрана как оптимальный баланс между:
Бизнес-требованиями — OTP доставляются максимально быстро независимо от загрузки маркетинга.
Нагрузочными характеристиками — миллионы P3 и единицы P1 требуют принципиально разных настроек Kafka.
Инфраструктурными ограничениями — несколько подов с распределением партиций и лимитированный канал отправки.
Альтернативные подходы либо не обеспечивали нужной гибкости настройки, либо не решали проблему приоритезации ограниченного канала отправки.
Единая логика потребителя: «Мьютекс на вычитку» для всех каналов
Унифицированный подход: один и тот же механизм приоритезации работает для SMS, push, email и любых других каналов.
Принцип работы диспетчера:
Для каждого канала (а для push — и для каждого бренда) выстраивается своя цепочка мьютексов (P1 → P2 → P3).
Потребитель сначала пытается читать из P1-топика конкретного канала.
Формат топика: commservice.<канал>.messages.[<бренд>.]priority<уровень>.
Если в P1 есть сообщения — обрабатывает их и не переходит к P2/P3 этого же канала.
Только когда P1-топик канала пуст, потребитель переключается на P2, затем на P3.
Зачем это нужно: критичные сообщения (OTP) не должны ждать маркетинговых рассылок. Для push-канала дополнительно обеспечивается изоляция трафика разных брендов.
Реализация: пакет podmutex (Go) — универсальный координатор
Для координации между обработчиками разных приоритетов в многоподовой среде был разработан пакет podmutex. Он реализует иерархическую блокировку с поддержкой отложенной разблокировки для решения проблемы «мерцания».
Базовый иерархический Mutex
В системе для каждого канала связи (SMS, push, email, call) выстраивается своя цепочка мьютексов, отражающая иерархию приоритетов: P1 → P2 → P3.
Структура Mutex:
go type Mutex struct { lockCount atomic.Int32 // 0 — разблокирован, >0 — заблокирован ch chan any // Канал для получения сигнала пробуждения от родителя childs chan chan any // Очередь дочерних каналов, ожидающих разблокировки parent *Mutex // Ссылка на родительский мьютекс (высший приоритет) }
Создание иерархии в коде (функция newMutexex):
go func newMutexex(ctx context.Context, cont *app.Container) (priorityMutex1, priorityMutex2, priorityMutex3 priorityMutex) { // Опциональное выключение мьютексов через флаг UsePodMutex if !cont.Conf.App.UsePodMutex { return podmutex.NewNoop(), podmutex.NewNoop(), podmutex.NewNoop() } // Создаем корневой мьютекс для P1 mutex1 := podmutex.NewMutex(nil) // Оборачиваем его в DelayedMutex для решения проблемы мерцания delayedMutex := podmutex.NewDelayedMutex(mutex1) cont.HandlerWaitGroup.Add(1) go delayedMutex.Run(ctx, cont.HandlerWaitGroup) // Создаем дочерние мьютексы для P2 и P3 mutex2 := podmutex.NewMutex(mutex1) // P2 зависит от P1 mutex3 := podmutex.NewMutex(mutex2) // P3 зависит от P2 return delayedMutex, mutex2, mutex3 }
Логика работы Lock():
go func (m *Mutex) Lock(ctx context.Context) { // Пытаемся захватить блокировку (CAS: 0 → 1) if m.lockCount.CompareAndSwap(0, 1) && m.parent != nil && m.parent.Locked() { // Условие выполняется ТОЛЬКО если: // 1. Мы успешно захватили блокировку // 2. У нас есть родитель // 3. Родитель УЖЕ заблокирован (кем-то выше по иерархии) // В этом случае мы встаем в очередь ожидания родителя m.parent.childs <- m.ch select { case <-ctx.Done(): // По таймауту или отмене контекста case <-m.ch: // Получили сигнал пробуждения от родителя } } }
Ключевые моменты:
Если родитель не заблокирован, дочерний мьютекс захватывает блокировку и не встает в очередь.
Блокировка захватывается до проверки родителя, а не после.
Ожидание происходит только в случае, если вышестоящий приоритет уже активен.
Пробуждение ожидающих обработчиков (Unlock):
go func (m *Mutex) Unlock() { if m.lockCount.CompareAndSwap(1, 0) { // Снимаем блокировку (1 → 0) // Проходим по всем дочерним очередям и пробуждаем их for { select { case ch := <-m.childs: select { case ch <- nil: // Отправляем сигнал пробуждения default: // Канал уже закрыт или не готов } default: return // Очередь пуста } } } }
Проверка состояния блокировки:
go func (m *Mutex) Locked() bool { return m.lockCount.Load() > 0 }
Эта проверка используется обработчиками P2 и P3 перед началом обработки батча: если Locked() возвращает true (блокировка захвачена P1), низкоприоритетный обработчик приостанавливается.
Проблема «мерцания» (flickering) для всех каналов
В нашей системе P1-топики (OTP) настроены на чтение по одному сообщению (BatchSize = 1), чтобы минимизировать задержку. P3-топики (маркетинг) читаются батчами (например, по 1000 сообщений) для обеспечения пропускной способности. Несколько подов работают параллельно.
Симптом: P1 обрабатывается и вызывает Unlock() за микросекунды. P3, читающий батч из 1000 сообщений, проверяет Locked() только между батчами, а не между отдельными сообщениями. Короткая блокировка P1 просто не попадает в окно проверки P3.
Итог: приоритет нарушается — маркетинговые сообщения продолжают обрабатываться, забивая канал отправки, хотя в очереди есть критичные OTP. Та же проблема актуальна для push, email и call-каналов.
Решение: DelayedMutex — отложенная разблокировка
DelayedMutex — это обертка над обычным мьютексом, которая разблокирует его не сразу, а с небольшой задержкой. Это дает «окно», в течение которого блокировка остается захваченной, даже если P1 уже завершил обработку. За это время все поды успевают заметить блокировку.
go const DelayDuration = 50 * time.Microsecond type DelayedMutex struct { lockCount atomic.Int32 // 0 — нейтрально, 1 — ожидает разблокировки, 2 — подтверждение parent *Mutex } func (m *DelayedMutex) Lock(ctx context.Context) { m.lockCount.Store(0) // Сбрасываем счетчик при новой блокировке m.parent.Lock(ctx) // Блокируем родительский мьютекс } func (m *DelayedMutex) Unlock() { m.lockCount.Store(1) // Не разблокируем сразу, только выставляем флаг } func (m *DelayedMutex) Locked() bool { return m.parent.Locked() // Делегируем родителю } func (m DelayedMutex) Run(ctx context.Context, wg sync.WaitGroup) { defer wg.Done() ticker := time.NewTicker(DelayDuration) for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: switch m.lockCount.Load() { case 1: m.lockCount.Store(2) // Первый тик — потенциал на разблокировку case 2: m.lockCount.Store(0) // Второй тик — точно разблокируем m.parent.Unlock() } } } }
Инициализация в реальном коде (пример для бренда lo):
go // Создаем иерархию мьютексов для бренда lo loMutexPriority1, loMutexPriority2, loMutexPriority3 := newMutexex(ctx, cont) // Запускаем ResponseWriter (горутины для записи ответов) cont.LoKafkaPushServicePriority1.ResponseWriter().Run(cont.AppCtx, cont.AppWaitGroup) // Запускаем потребителей с разными мьютексами cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP1, batchSizeP1, kafkaService, loMutexPriority1, // DelayedMutex для P1 serviceP1, responseWriterP1, log, )) cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP2, batchSizeP2, kafkaService, loMutexPriority2, // Обычный Mutex для P2 serviceP2, responseWriterP2, log, )) cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP3, batchSizeP3, kafkaService, loMutexPriority3, // Обычный Mutex для P3 serviceP3, responseWriterP3, log, ))

Эффект: создается искусственное «окно» удержания блокировки длительностью 50–100 мкс. За это время:
Все поды, обрабатывающие P3, успевают заметить блокировку (через вызов Locked()) и приостановиться.
Если приходит новое OTP, блокировка не снимается вовсе — она плавно переходит к следующему сообщению без разрыва.
Интеграция с Kafka-консюмерами
В реальной системе каждый потребитель (SMS, call, email, push) инициализируется со своим мьютексом. Мьютекс передается в хендлер и используется внутри него для координации.
Как хендлер использует мьютекс (внутренняя логика):
Перед началом обработки батча хендлер проверяет mutex.Locked().
Если Locked() == true (мьютекс заблокирован более высоким приоритетом), хендлер приостанавливает чтение и встает в ожидание.
Если Locked() == false, хендлер вызывает mutex.Lock() (через DelayedMutex для P1 или напрямую для P2/P3), захватывает блокировку и начинает обработку батча.
После обработки батча вызывается mutex.Unlock().
Важное уточнение: для P1 используется DelayedMutex, поэтому Unlock() не снимает блокировку мгновенно, а создает «окно» в 50–100 мкс. Это эмпирически подобранное значение. Будет слишком мало — не поймаем мерцание. Слишком много — искусственно затормозим P3, даже когда P1 реально простаивает. Для P2 и P3 используется обычный Mutex, и их Unlock() снимает блокировку сразу.
Нагрузочные нюансы и оптимизации для многоканальности
Изоляция каналов:
Проблемы с email-провайдером (замедление отправки) влияют только на email-очереди, но не затрагивают SMS и push.
Для push-канала дополнительно обеспечивается изоляция по брендам.
Размер батчей по каналам:
Р1: max.poll.records = 1 — минимальная задержка для OTP.
P2: max.poll.records = 50 — баланс задержки и пропускной способности.
P3: max.poll.records = 1000 — максимальная пропускная способность.
Мониторинг по каналам и приоритетам (отслеживаем через Grafana, настроены алерты на рост лагов):
Lag P1 — всегда около 0 (красный индикатор).
Lag P3 — колеблется, показывает загрузку конвейера (зеленый индикатор).
Управление жизненным циклом:
ResponseWriter (горутины для записи ответов) запускаются с контекстом AppCtx (долгоживущим).
Kafka-консюмеры запускаются с контекстом команды ctx.
Это гарантирует, что ResponseWriter завершат работу после консюмеров.
Подводные камни и ограничения подхода
Гранулярность блокировок — текущая реализация блокирует ВСЕ низкоприоритетные обработчики конкретного канала при активности P1 этого же канала. Например, пришедшее OTP по SMS блокирует всю маркетинговую SMS-рассылку, но не трогает email-маркетинг. Для push-канала блокировка изолирована внутри бренда.
Риск голодания P2 — при постоянном потоке P1, P2 может не получать обработку (в реальности OTP не создает непрерывного потока).
Сложность отладки — распределенные мьютексы сложнее отлаживать, чем локальные.
Зависимость от времени — DelayDuration = 50 мкс может потребовать перенастройки при изменении нагрузки или инфраструктуры.
Итог
Наша система выдерживает 301 млн сообщений в месяц (пик до 1500 msg/сек), сохраняя мгновенную доставку критических OTP и транзакционных уведомлений по всем каналам связи. Для push-канала дополнительно обеспечена изоляция трафика разных брендов.
Выводы, которые мы вынесли из этого решения:
Единая платформа коммуникаций требует единого подхода к приоритезации, независимо от канала доставки.
Kafka — распределенный лог, а не очередь с приоритетами. Приоритеты эмулируются архитектурно через систему топиков <канал>-<приоритет> (с возможным разделением по брендам для push).
Физическое разделение топиков + умный консюмер с иерархическими мьютексами — базовый паттерн, масштабируемый на любое количество каналов.
DelayedMutex решает проблему «мерцания» высокоприоритетных сообщений для всех типов каналов, гарантируя, что OTP (P1) не будут задерживаться маркетингом (P3), даже при огромной разнице в объемах трафика.
Флаг UsePodMutex позволяет отключить механизм при необходимости (для тестирования или аварийного переключения).
Как думаете, хороший способ приоритезации уведомлений мы придумали или есть варианты получше?
kmatveev
Мне очень тяжело заходило. Вот фраза "В системе потоки были физически разделены" - что значит "физически разделены" - разные кластеры, или что? Не сразу стало понятно, что это разные топики, вы же про Kafka пишете. Про mutex-ы не понял вообще ничего, почему разные сообщения нельзя обрабатывать параллельно?
maga_lak Автор
Спасибо
1. «Физически разделены» — да, речь о разных топиках. У каждого приоритета свой топик: один для OTP, другой для маркетинга. Они никак не мешают друг другу, пока мы сами не решим иначе. Фраза «физически» — чтобы подчеркнуть: это не просто поле «приоритет» в сообщении, а реальная изоляция хранения и чтения.
2. Почему не обрабатывать всё параллельно? Можно, но проблема не в скорости чтения, а в ограниченном канале отправки (например, SMS-провайдер даёт 100 рпс). Если запустить всё параллельно, маркетинг (миллионы сообщений) забьёт все слоты, и OTP встанет в очередь. А мы хотим, чтобы OTP всегда проскакивало первым. Поэтому понадобился механизм блокировок менее приоритетных каналов.
kmatveev
Я стал понимать ещё меньше. Если проблема в ограниченном канале отправки, через который идёт несколько kafka-топиков, то вам нужен mutex на отправку, а у вас в статье про mutex на чтение
maga_lak Автор
Отправка осуществляется посредством http запросов, поэтому и нужен мьютекс на чтение чтобы читать и отправлять сначала из более приоритетного канала, где очередь на отправку сообшений.