Всем привет! Я Павел Агалецкий, ведущий инженер команды PaaS в Авито — платформы для продуктовых разработчиков. Сейчас я занимаюсь разработкой шины данных и инструментов для синхронных обменов между сервисами.

В этой статье я расскажу про построение шины данных поверх Kafka, которая была бы удобна для всех потребителей. Покажу возможные варианты выбора оптимального числа партиций, поделюсь решением, которое мы для этого придумали внутри Авито, и расскажу про результаты его внедрения.

Содержание:

Шина данных с Kafka в Авито

В Авито у нас есть шина данных поверх Kafka, которая осуществляет межсервисное взаимодействие в асинхронном режиме. Несколько сервисов могут обмениваться различными событиями, в любой момент подписываться на новые события и публиковать их. Это упрощает интеграции между сервисами и повышает их надёжность. С шиной данных не нужно каждый раз ходить из одного сервиса в другой. 

Шина данных в Авито
Шина данных в Авито

В нашем случае шина данных ещё и убирает необходимость прямой интеграции с Kafka. Она скрывает особенности кластеров, сама валидирует контракты обменов и авторизирует доступы. 

Вот, с чем мы работаем: 

  • 700 сервисов-пользователей;

  • 2 тыс. топиков;

  • 20 тыс. партиций;

  • 20 млн. rpm чтения/записи.

График RPM шины данных в Авито
График RPM шины данных в Авито

На самом деле, сейчас мы уже перешли на Redpanda, но в рамках этой статьи будем говорить про наш опыт с Kafka — сути это не меняет.

Что такое Kafka

В Kafka есть 3 основные сущности: топики, партиции, группы консьюмеров. Если коротко: топики состоят из партиций, группы консьюмеров читают эти топики.

Схема структуры топика в распределённой системе
Схема структуры топика в распределённой системе

Топики — потоки, которые логически объединяют сообщения. По имени топика продюсер отправляет данные, а консьюмеры их считывают. 

Партиции — подмножества топика, в которых хранятся сообщения. Партиции являются единицей масштабирования и реплицируются. Они позволяют распределять данные по разным серверам в кластере. Мы можем изменять скорость чтения и записи, меняя число партиций в топике.

Группы консьюмеров — набор потребителей, которые совместно читают данные из топика. Консьюмеры в группе делят партиции, из которых состоит читаемый топик. Каждая партиция закреплена за одним консьюмером в группе.

Распределение партиций топика между двумя группами потребителей
Распределение партиций топика между двумя группами потребителей

На рисунке выше у топика T1 есть 4 партиции. В первой консьюмер группе каждую партицию читает отдельный потребитель, а во второй — потребители читают по несколько партиций. 

Проблема. Оптимальное число консьюмеров

Архитектура Kafka, которую мы обсудили выше, рождает проблему выбора оптимального числа консьюмеров:

  • с одной стороны, количество консьюмеров в одной группе не должно превышать число партиций в топике. Иначе часть потребителей будет простаивать;

  • с другой стороны, консьюмеров в группе не должно быть меньше, чем партиций в топике. Иначе потребители будут копить бэклог событий из-за неравномерного распределения трафика. 

Представим, что у нас есть топик, куда публикуются события о статусе заказа. Этот топик читает система, которая рассылает уведомления пользователю. Если система не успевает вычитывать события, у неё будет копиться бэклог непрочитанных сообщений. Из-за этого доставка уведомлений будет запаздывать, что плохо повлияет на бизнес-процессы и метрики — пользователи будут недовольны.

Хотелось бы, чтобы все консьюмеры успевали читать события своевременно и при этом мы не перерасходовали ресурсы системы на простаивающих потребителей.

Такой проблемы нет, если вы точно знаете, кто читает топики и кто в них пишет. В таком случае можно подстроить число партиций под профиль нагрузки и скорость чтения/записи, а потом менять его при необходимости. 

Например, у сервиса высокая скорость записи, а консьюмеры работают медленно. Не проблема — сделаем 100 партиций на топик. Или, наоборот, при низкой скорости нам важно соблюдать порядок событий, а топик при этом небольшой. Тогда сделаем 1 партицию с репликами. 

В нашем случае мы не знаем характеристики консьюмеров. Могут прийти быстрые потребители, а потом внезапно — медленные. А ещё в топике может резко возрасти объём публикаций. 

Проблема в том, что топики общие, а сервисы — разные, поэтому подстроить число партиций под каждого — сложно. Одному сервису нужно много консьюмеров, другому — одна партиция. Кто-то написан на Go и может параллелить обработку событий, а кому-то нужно много физических воркеров, поскольку он сделан на PHP. 

В итоге задача звучит так: как в общей Kafka выбрать оптимальное количество партиций под каждый из сотен топиков? 

Тут еще больше контента

Варианты решения в лоб. Всем по 1000 партиций

Самый очевидный вариант — давайте каждому топику сделаем по 1000 партиций. Такого количества должно хватить всем (но это не точно). 

Из плюсов:

  • можно обслуживать много консьюмеров;

  • всё работает из коробки одной командой scale через админское API. 

Однако вспомним, что у нас 2 тыс. топиков, то есть 2.000*N партиций. Если мы сделаем 1.000 партиций на топик, получим 2 млн. партиций на кластер. При этом разработчики Kafka рекомендуют не превышать 200.000 партиций на кластер — после этого брокер начинает медленнее сходиться и дольше восстанавливается после сбоев.

Такой «жирный» кластер неудобен ещё и по другой причине. Кому-то 1000 партиций будет мало, а для кого-то это перебор. А ещё каким-нибудь консьюмерам может быть важен порядок сообщений. 

Вариант решения в лоб. Задавать вручную

Попробуем задавать число партиций вручную. Скажем, 10 партиций по умолчанию, а при необходимости вручную подтюним до 1000. При выборе количества партиций будем ориентироваться на трафик, время обработки и бэклог сообщений.

Из плюсов: 

  • ресурсов всем хватит;

  • можно индивидуально управлять партициями.

Проблема в том, что всем этим надо управлять вручную. При наличии нескольких кластеров придётся, например, синхронизировать конфиги, чтобы введённый в строй новый кластер имел бы такие же настройки, как и старый. 

А ещё потребность на большее количество партиций может появиться не сразу, а когда уже накопился большой бэклог сообщений. Масштабирование партиций в этом случае старым сообщениям не поможет, поскольку они не распределятся между новыми консьюмерами. 

Варианты решения похитрее. Parallel-consumer

После базовых вариантов мы решили попробовать более хитрые решения. Например, confluent parallel-consumer — обёртку над клиентом Kafka на Java для параллельного консьюминга событий. Инструмент умеет читать события и раздавать их консьюмерам в рамках своего процесса. 

Схема работы parallel-consumer
Схема работы parallel-consumer

Parallel-consumer также трекает состояние индивидуальных обработанных событий и сохраняет их в offset metadata. Если один консьюмер уже обработал свое сообщение, а другие потребители ещё нет, статус обработанного события сохранится. Благодаря этому состояние восстановится, если, например, приложение-потребитель перезапустится. 

По сути parallel-consumer — это параллельный механизм обработки сообщений в рамках процесса. Подробнее про это разработчики рассказали в презентации на Kafka Summit Europe 2021

Из минусов:

  • написан на Java, а у нас ничего нет на Java. В основном мы пишем на Go;

  • фактически parallel-consumer запускается как один процесс, уже внутри которого есть несколько консьюмеров. Мы же деплоимся в Kubernetes, у нас разное количество потребителей, подов и т.д.

Однако идеи, которые заложены в parallel-consumer, — очень интересные. Как минимум принцип хранение данных об обработанных сообщениях мы переиспользовали в своем решении.

Жми сюда!

Варианты решения похитрее. Uber consumer proxy

Ещё один подход для решения нашей проблемы, который мы нашли, — uber consumer proxy. По сути это параллельный консьюминг событий для микросервисов.

Принцип работы consumer proxy
Принцип работы consumer proxy

Если коротко: у них есть один сторонний сервис, который читает события, раздаёт их клиентам-подписчикам и трекает статус. Это не всё, что он умеет, почитайте оригинальную статью — там интересно. 

Подробнее этот механизм описан в блоге Uber.

Из минусов:

  • это статья в блоге Uber, а не полноценный инструмент, который можно переиспользовать;

  • многие детали в статьи не раскрыты. Например, ребята не говорят о том, как хранятся статусы обращений.

Однако сама идея с отдельным сервисом для консьюминга и раздачи сообщений — прикольная. Ей мы тоже вдохновились при создании своего решения. 

Вариант решения долгий. Дождаться Queues for Kafka

Логично возникает идея введения очередей в Kafka. Если загуглить что-то вроде 'queues for kafka', вы, скорее всего, наткнётесь на KIP-932

KIP (Kafka Improvement Process) — это механизм, с помощью которого Kafka развивается: туда несут предложения по улучшению инструмента. 

Небольшая часть оглавления KIP-932
Небольшая часть оглавления KIP-932

KIP-932: Queues for Kafka предлагает масштабирование консьюмеров (shared consumers). Реализовать это предполагается с помощью возможности чтения партиции сразу несколькими консьюмерами в группе. 

Основной минус — KIP ещё не готов к production и масштабирование консьюмеров через него станет полноценно доступно только через пару лет.

Тем не менее, работа над KIP-932 активно ведётся, сейчас он доступен в виде preview. Мы очень ждём и, скорее всего, перейдем на shared consumers, когда эта возможность появится в Kafka. 

Варианты решения радикальные. Заменить Kafka на Pulsar

Ещё один вариант — принять, что в Kafka нет возможности распараллелить сообщения. Это фундаментальная особенность инструмента, мы привязаны к партициям — всё. В конце концов, Kafka не единственная технология — выберем другую. Например, Apache/Pulsar. 

Pulsar — это как Kafka, только более гибкий. Он тоже хранит message log сообщений, но при этом позволяет параллельный консьюминг из коробки. А ещё Pulsar быстрый.

Подробнее про Pulsar на официальном сайте.

Схема работы Pulsar
Схема работы Pulsar

В Авито мы уже используем Pulsar в системе очередей и даже написали про него статью на Хабре. Но уже успели разочароваться: 

  • быстрый, но забагованный;

  • много фич, которые не работают;

  • много нюансов при эксплуатации. 

В первую очередь мы разочаровались стабильностью Pulsar на наших объемах. Поэтому заносить его дальше в нашу инфраструктуру не хотим.

Варианты решения радикальные. Заменить Kafka на *mq

Пока ждём очереди в Kafka, как вариант — заменить её на отдельную систему очередей: например, MessageMQ или RabbitMQ. 

Из минусов: 

  • нам нужен лог сообщений;

  • нам удобно читать сообщения последовательно и несколькими параллельными приложениями. 

Поэтому нам такой вариант тоже не подошёл.

Наше решение — Quasar

Мы изучили и попробовали несколько разных решений и подходов, но ни один нас полностью не удовлетворил. Поэтому мы решили сделать свой сервис, который бы решал проблему выбора оптимального числа партиций.

Вот какие требования для нашего сервиса мы сформулировали: 

  1. Должен работать поверх Kafka API, чтобы можно было поменять технологию под капотом. Например, когда реализуют KIP-932.

  2. Не должен иметь явных ограничений на число параллельных консьюмеров, чтобы не ставить себе жёстких лимитов.

  3. Не должен требовать изменений кода сервисов. К счастью, благодаря шине данных наши приложения не работают с Kafka напрямую. 

Parallel-consumer и Consumer proxy от Uber, о которых я рассказывал выше, стали источниками идей для нашего сервиса. Вот что мы оттуда позаимствовали:

  • в confluent/parallel-consumer мы подсмотрели способ хранения информации об обработанных сообщениях;

  • из uber/consumer-proxy мы взяли идею сделать отдельный сервис для чтения и раздачи сообщений.

В итоге мы запилили сервис Quasar, который позволяет масштабировать чтение из Kafka API.

Общая идея Quasar

Quasar — это отдельный сервис на Go, который стоит между шиной данных и Kafka. Его основные задачи: 

  1. Раздавать сообщения из топиков клиентам.

  2. Трекать обработанные сообщения.

  3. Коммитить офсеты в Kafka.

Вот как это работает в общих чертах: 

  • Quasar читает данные из Kafka по принципу «один топик — один консьюмер»;

  • клиенты подключаются к сервису через gRPC streams;

  • сервис раздает сообщения по принципу round-robin подключенным клиентам;

  • результат обработки сообщения Quasar сохраняет в памяти и коммитит их в offset metadata. 

Общий вид кластера Quasar 
Общий вид кластера Quasar 

Quasar мы деплоим в k8s в несколько ДЦ, как и остальные сервисы Авито. Это позволяет пережить отказ одного из ДЦ.

В Kubernetes используем stateful set для того, чтобы иметь фиксированные адреса для инстансов/подов. Релизимся с помощью ArgoCD.

Принципы работы Quasar

Теперь разберёмся с основными принципами работы Quasar подробнее.

1. Один консьюмер на топик, чтобы можно было раздавать сообщения всем подключенным потребителям. Для этого мы используем raft и с его помощью выбираем лидера в кластере Quasar. 

Лидер распределяет топики между нодами и назначает инстанс, который будет читать соответствующий топик/группу. Благодаря этому можно иметь любое количество нод, одна из которых будет лидером. 

2. Подключение клиентов по gRPC происходит с помощью двух API: discovery и read. 

  • Discovery позволяет узнать, какой инстанс обслуживает топик; 

  • Read позволяет потоком запрашивать сообщения и отсылать ack/nack.

Клиенты узнают адрес ответственного за топик инстанса с помощью discovery и читают его с помощью read. 

3. Хранение индивидуальных аков (подтверждений) необходимо, чтобы не посылать сообщения дважды. Вот как мы это делаем: 

  • сохраняем полученные аки в памяти;

  • периодически коммитим офсет в Kafka, куда в metadata добавляем информацию о подтверждённых сообщениях;

  • когда есть непрерывная последовательность акнутых сообщения — сдвигаем офсет в Kafka. 

Процесс хранения индивидуальных подтверждений для предотвращения повторной отправки сообщений
Процесс хранения индивидуальных подтверждений для предотвращения повторной отправки сообщений

На рисунке выше: 

  • зелёные сообщения — уже обработаны консьюмером и подтверждены. Сохранённый офсет находится после последнего обработанного события; 

  • жёлтые сообщения — сейчас отданы клиенту и ещё обрабатываются;

  • синие сообщения — акнутые события, между которыми есть необработанные жёлтые сообщения. Сдвинуть офсет на них мы ещё не можем. 

Quasar как раз отвечает за то, чтобы хранить в себе состояние индивидуально акнутых событий и коммитить соответствующий офсет в Kafka. 

Кликни здесь и узнаешь

В итоге

Что сейчас. Запустились мы не так давно, но уже есть профит — пользователи могут по желанию масштабировать консьюмеров, а нам при этом не нужно вручную управлять партициями. 

По умолчанию мы устанавливаем 9 партиций на топик в Kafka — среднее значение, удовлетворяющее пользователей Quasar. Но есть и клиенты с 50+ консьюмеров на партицию. Они масштабируются через Quasar, штатные настройки Kafka при этом мы не меняем.

Технология полностью под нашим контролем и мы развиваем её в нужном нам направлении.

Проблемы тоже есть

  • сложная кодовая база;

  • решение требует поддержки;

  • деплоимся через внутреннюю платформу, но с особенностями.

Подробнее про платформу можно посмотреть в записи Avito Platform engineering meetup.

Что дальше:

  • хотим внедрить механику балансировки сообщений между консьюмерами в зависимости от нагрузки; 

  • выпустим Quasar в open source, если поймём, что запрос на такую систему поверх Kafka API есть не только у нас;

  • возможно, перейдём на KIP-932, когда он выйдет. Благо, Quasar использует внутри Kafka API — значит, сможем поменять технологию под капотом при необходимости.

Полезные ссылки в одном месте:

А если хотите вместе с нами помогать людям и бизнесу через технологии — присоединяйтесь к командам. Свежие вакансии есть на нашем карьерном сайте.

Комментарии (2)


  1. mosinnik
    22.10.2025 17:10

    Пара вопросов:
    1) как там метрики считаются для консумер групп? в частности косумер лаг как показываете командам-владельцам сервисов

    2) какой трафик сейчас пережевывает это решение? не на НТ, оно же в проме уже как понимаю?

    3) смотрели ли в сторону ydb? вроде как все что вам надо там есть без извращений над кафкой

    4) не увидел ничего про гарантии, что там с очередностью и тразакциями?


    1. ewolf Автор
      22.10.2025 17:10

      Привет!

      Метрики мы отдаем сами из quasar и шины данных. Для беклога мы делаем следующее

      • смотрим положения курсоров и оффсетов по партициям и получаем базовый беклог как partition end - cursor offset

      • распаковываем оффсеты, вычитаем индивидуально акнутые события.

      • итоговый рассчет оказывается приблизительным, но близким

      И уже это пишется в метрику для пользователей

      По нагрузке - сейчас около 40-50 млн rpm проходит.

      В сторону ydb смотрели, но не хочется снова идти в не самую стандартную технологию. Пока предпочитаем api, которое если что можно подменить любой реализацией.

      Гарантии у нас at least once, порядок и транзакционность доставки событий не гарантируем