Всем привет! Я Павел Агалецкий, ведущий инженер команды PaaS в Авито — платформы для продуктовых разработчиков. Сейчас я занимаюсь разработкой шины данных и инструментов для синхронных обменов между сервисами.
В этой статье я расскажу про построение шины данных поверх Kafka, которая была бы удобна для всех потребителей. Покажу возможные варианты выбора оптимального числа партиций, поделюсь решением, которое мы для этого придумали внутри Авито, и расскажу про результаты его внедрения.

Содержание:
Шина данных с Kafka в Авито
В Авито у нас есть шина данных поверх Kafka, которая осуществляет межсервисное взаимодействие в асинхронном режиме. Несколько сервисов могут обмениваться различными событиями, в любой момент подписываться на новые события и публиковать их. Это упрощает интеграции между сервисами и повышает их надёжность. С шиной данных не нужно каждый раз ходить из одного сервиса в другой.

В нашем случае шина данных ещё и убирает необходимость прямой интеграции с Kafka. Она скрывает особенности кластеров, сама валидирует контракты обменов и авторизирует доступы.
Вот, с чем мы работаем:
700 сервисов-пользователей;
2 тыс. топиков;
20 тыс. партиций;
20 млн. rpm чтения/записи.

На самом деле, сейчас мы уже перешли на Redpanda, но в рамках этой статьи будем говорить про наш опыт с Kafka — сути это не меняет.
Что такое Kafka
В Kafka есть 3 основные сущности: топики, партиции, группы консьюмеров. Если коротко: топики состоят из партиций, группы консьюмеров читают эти топики.

Топики — потоки, которые логически объединяют сообщения. По имени топика продюсер отправляет данные, а консьюмеры их считывают.
Партиции — подмножества топика, в которых хранятся сообщения. Партиции являются единицей масштабирования и реплицируются. Они позволяют распределять данные по разным серверам в кластере. Мы можем изменять скорость чтения и записи, меняя число партиций в топике.
Группы консьюмеров — набор потребителей, которые совместно читают данные из топика. Консьюмеры в группе делят партиции, из которых состоит читаемый топик. Каждая партиция закреплена за одним консьюмером в группе.

На рисунке выше у топика T1 есть 4 партиции. В первой консьюмер группе каждую партицию читает отдельный потребитель, а во второй — потребители читают по несколько партиций.
Проблема. Оптимальное число консьюмеров
Архитектура Kafka, которую мы обсудили выше, рождает проблему выбора оптимального числа консьюмеров:
с одной стороны, количество консьюмеров в одной группе не должно превышать число партиций в топике. Иначе часть потребителей будет простаивать;
с другой стороны, консьюмеров в группе не должно быть меньше, чем партиций в топике. Иначе потребители будут копить бэклог событий из-за неравномерного распределения трафика.
Представим, что у нас есть топик, куда публикуются события о статусе заказа. Этот топик читает система, которая рассылает уведомления пользователю. Если система не успевает вычитывать события, у неё будет копиться бэклог непрочитанных сообщений. Из-за этого доставка уведомлений будет запаздывать, что плохо повлияет на бизнес-процессы и метрики — пользователи будут недовольны.
Хотелось бы, чтобы все консьюмеры успевали читать события своевременно и при этом мы не перерасходовали ресурсы системы на простаивающих потребителей.
Такой проблемы нет, если вы точно знаете, кто читает топики и кто в них пишет. В таком случае можно подстроить число партиций под профиль нагрузки и скорость чтения/записи, а потом менять его при необходимости.
Например, у сервиса высокая скорость записи, а консьюмеры работают медленно. Не проблема — сделаем 100 партиций на топик. Или, наоборот, при низкой скорости нам важно соблюдать порядок событий, а топик при этом небольшой. Тогда сделаем 1 партицию с репликами.
В нашем случае мы не знаем характеристики консьюмеров. Могут прийти быстрые потребители, а потом внезапно — медленные. А ещё в топике может резко возрасти объём публикаций.
Проблема в том, что топики общие, а сервисы — разные, поэтому подстроить число партиций под каждого — сложно. Одному сервису нужно много консьюмеров, другому — одна партиция. Кто-то написан на Go и может параллелить обработку событий, а кому-то нужно много физических воркеров, поскольку он сделан на PHP.
В итоге задача звучит так: как в общей Kafka выбрать оптимальное количество партиций под каждый из сотен топиков?
Варианты решения в лоб. Всем по 1000 партиций
Самый очевидный вариант — давайте каждому топику сделаем по 1000 партиций. Такого количества должно хватить всем (но это не точно).
Из плюсов:
можно обслуживать много консьюмеров;
всё работает из коробки одной командой scale через админское API.
Однако вспомним, что у нас 2 тыс. топиков, то есть 2.000*N партиций. Если мы сделаем 1.000 партиций на топик, получим 2 млн. партиций на кластер. При этом разработчики Kafka рекомендуют не превышать 200.000 партиций на кластер — после этого брокер начинает медленнее сходиться и дольше восстанавливается после сбоев.
Такой «жирный» кластер неудобен ещё и по другой причине. Кому-то 1000 партиций будет мало, а для кого-то это перебор. А ещё каким-нибудь консьюмерам может быть важен порядок сообщений.
Вариант решения в лоб. Задавать вручную
Попробуем задавать число партиций вручную. Скажем, 10 партиций по умолчанию, а при необходимости вручную подтюним до 1000. При выборе количества партиций будем ориентироваться на трафик, время обработки и бэклог сообщений.
Из плюсов:
ресурсов всем хватит;
можно индивидуально управлять партициями.
Проблема в том, что всем этим надо управлять вручную. При наличии нескольких кластеров придётся, например, синхронизировать конфиги, чтобы введённый в строй новый кластер имел бы такие же настройки, как и старый.
А ещё потребность на большее количество партиций может появиться не сразу, а когда уже накопился большой бэклог сообщений. Масштабирование партиций в этом случае старым сообщениям не поможет, поскольку они не распределятся между новыми консьюмерами.
Варианты решения похитрее. Parallel-consumer
После базовых вариантов мы решили попробовать более хитрые решения. Например, confluent parallel-consumer — обёртку над клиентом Kafka на Java для параллельного консьюминга событий. Инструмент умеет читать события и раздавать их консьюмерам в рамках своего процесса.

Parallel-consumer также трекает состояние индивидуальных обработанных событий и сохраняет их в offset metadata. Если один консьюмер уже обработал свое сообщение, а другие потребители ещё нет, статус обработанного события сохранится. Благодаря этому состояние восстановится, если, например, приложение-потребитель перезапустится.
По сути parallel-consumer — это параллельный механизм обработки сообщений в рамках процесса. Подробнее про это разработчики рассказали в презентации на Kafka Summit Europe 2021.
Из минусов:
написан на Java, а у нас ничего нет на Java. В основном мы пишем на Go;
фактически parallel-consumer запускается как один процесс, уже внутри которого есть несколько консьюмеров. Мы же деплоимся в Kubernetes, у нас разное количество потребителей, подов и т.д.
Однако идеи, которые заложены в parallel-consumer, — очень интересные. Как минимум принцип хранение данных об обработанных сообщениях мы переиспользовали в своем решении.
Варианты решения похитрее. Uber consumer proxy
Ещё один подход для решения нашей проблемы, который мы нашли, — uber consumer proxy. По сути это параллельный консьюминг событий для микросервисов.

Если коротко: у них есть один сторонний сервис, который читает события, раздаёт их клиентам-подписчикам и трекает статус. Это не всё, что он умеет, почитайте оригинальную статью — там интересно.
Подробнее этот механизм описан в блоге Uber.
Из минусов:
это статья в блоге Uber, а не полноценный инструмент, который можно переиспользовать;
многие детали в статьи не раскрыты. Например, ребята не говорят о том, как хранятся статусы обращений.
Однако сама идея с отдельным сервисом для консьюминга и раздачи сообщений — прикольная. Ей мы тоже вдохновились при создании своего решения.
Вариант решения долгий. Дождаться Queues for Kafka
Логично возникает идея введения очередей в Kafka. Если загуглить что-то вроде 'queues for kafka', вы, скорее всего, наткнётесь на KIP-932.
KIP (Kafka Improvement Process) — это механизм, с помощью которого Kafka развивается: туда несут предложения по улучшению инструмента.

KIP-932: Queues for Kafka предлагает масштабирование консьюмеров (shared consumers). Реализовать это предполагается с помощью возможности чтения партиции сразу несколькими консьюмерами в группе.
Основной минус — KIP ещё не готов к production и масштабирование консьюмеров через него станет полноценно доступно только через пару лет.
Тем не менее, работа над KIP-932 активно ведётся, сейчас он доступен в виде preview. Мы очень ждём и, скорее всего, перейдем на shared consumers, когда эта возможность появится в Kafka.
Варианты решения радикальные. Заменить Kafka на Pulsar
Ещё один вариант — принять, что в Kafka нет возможности распараллелить сообщения. Это фундаментальная особенность инструмента, мы привязаны к партициям — всё. В конце концов, Kafka не единственная технология — выберем другую. Например, Apache/Pulsar.
Pulsar — это как Kafka, только более гибкий. Он тоже хранит message log сообщений, но при этом позволяет параллельный консьюминг из коробки. А ещё Pulsar быстрый.

В Авито мы уже используем Pulsar в системе очередей и даже написали про него статью на Хабре. Но уже успели разочароваться:
быстрый, но забагованный;
много фич, которые не работают;
много нюансов при эксплуатации.
В первую очередь мы разочаровались стабильностью Pulsar на наших объемах. Поэтому заносить его дальше в нашу инфраструктуру не хотим.
Варианты решения радикальные. Заменить Kafka на *mq
Пока ждём очереди в Kafka, как вариант — заменить её на отдельную систему очередей: например, MessageMQ или RabbitMQ.
Из минусов:
нам нужен лог сообщений;
нам удобно читать сообщения последовательно и несколькими параллельными приложениями.
Поэтому нам такой вариант тоже не подошёл.
Наше решение — Quasar
Мы изучили и попробовали несколько разных решений и подходов, но ни один нас полностью не удовлетворил. Поэтому мы решили сделать свой сервис, который бы решал проблему выбора оптимального числа партиций.
Вот какие требования для нашего сервиса мы сформулировали:
Должен работать поверх Kafka API, чтобы можно было поменять технологию под капотом. Например, когда реализуют KIP-932.
Не должен иметь явных ограничений на число параллельных консьюмеров, чтобы не ставить себе жёстких лимитов.
Не должен требовать изменений кода сервисов. К счастью, благодаря шине данных наши приложения не работают с Kafka напрямую.
Parallel-consumer и Consumer proxy от Uber, о которых я рассказывал выше, стали источниками идей для нашего сервиса. Вот что мы оттуда позаимствовали:
в confluent/parallel-consumer мы подсмотрели способ хранения информации об обработанных сообщениях;
из uber/consumer-proxy мы взяли идею сделать отдельный сервис для чтения и раздачи сообщений.
В итоге мы запилили сервис Quasar, который позволяет масштабировать чтение из Kafka API.
Общая идея Quasar
Quasar — это отдельный сервис на Go, который стоит между шиной данных и Kafka. Его основные задачи:
Раздавать сообщения из топиков клиентам.
Трекать обработанные сообщения.
Коммитить офсеты в Kafka.
Вот как это работает в общих чертах:
Quasar читает данные из Kafka по принципу «один топик — один консьюмер»;
клиенты подключаются к сервису через gRPC streams;
сервис раздает сообщения по принципу round-robin подключенным клиентам;
результат обработки сообщения Quasar сохраняет в памяти и коммитит их в offset metadata.

Quasar мы деплоим в k8s в несколько ДЦ, как и остальные сервисы Авито. Это позволяет пережить отказ одного из ДЦ.
В Kubernetes используем stateful set для того, чтобы иметь фиксированные адреса для инстансов/подов. Релизимся с помощью ArgoCD.
Принципы работы Quasar
Теперь разберёмся с основными принципами работы Quasar подробнее.
1. Один консьюмер на топик, чтобы можно было раздавать сообщения всем подключенным потребителям. Для этого мы используем raft и с его помощью выбираем лидера в кластере Quasar.
Лидер распределяет топики между нодами и назначает инстанс, который будет читать соответствующий топик/группу. Благодаря этому можно иметь любое количество нод, одна из которых будет лидером.

2. Подключение клиентов по gRPC происходит с помощью двух API: discovery и read.
Discovery позволяет узнать, какой инстанс обслуживает топик;
Read позволяет потоком запрашивать сообщения и отсылать ack/nack.
Клиенты узнают адрес ответственного за топик инстанса с помощью discovery и читают его с помощью read.
3. Хранение индивидуальных аков (подтверждений) необходимо, чтобы не посылать сообщения дважды. Вот как мы это делаем:
сохраняем полученные аки в памяти;
периодически коммитим офсет в Kafka, куда в metadata добавляем информацию о подтверждённых сообщениях;
когда есть непрерывная последовательность акнутых сообщения — сдвигаем офсет в Kafka.

На рисунке выше:
зелёные сообщения — уже обработаны консьюмером и подтверждены. Сохранённый офсет находится после последнего обработанного события;
жёлтые сообщения — сейчас отданы клиенту и ещё обрабатываются;
синие сообщения — акнутые события, между которыми есть необработанные жёлтые сообщения. Сдвинуть офсет на них мы ещё не можем.
Quasar как раз отвечает за то, чтобы хранить в себе состояние индивидуально акнутых событий и коммитить соответствующий офсет в Kafka.
В итоге
Что сейчас. Запустились мы не так давно, но уже есть профит — пользователи могут по желанию масштабировать консьюмеров, а нам при этом не нужно вручную управлять партициями.
По умолчанию мы устанавливаем 9 партиций на топик в Kafka — среднее значение, удовлетворяющее пользователей Quasar. Но есть и клиенты с 50+ консьюмеров на партицию. Они масштабируются через Quasar, штатные настройки Kafka при этом мы не меняем.
Технология полностью под нашим контролем и мы развиваем её в нужном нам направлении.
Проблемы тоже есть:
сложная кодовая база;
решение требует поддержки;
деплоимся через внутреннюю платформу, но с особенностями.
Подробнее про платформу можно посмотреть в записи Avito Platform engineering meetup.
Что дальше:
хотим внедрить механику балансировки сообщений между консьюмерами в зависимости от нагрузки;
выпустим Quasar в open source, если поймём, что запрос на такую систему поверх Kafka API есть не только у нас;
возможно, перейдём на KIP-932, когда он выйдет. Благо, Quasar использует внутри Kafka API — значит, сможем поменять технологию под капотом при необходимости.
Полезные ссылки в одном месте:
confluent/parallel-consumer — на GitHub;
consumer-proxy — статья в блоге Uber;
KIP-932: Queues for Kafka — описание запланированной реализации масштабирования консьюмеров в Kafka;
Apache Pulsar — официальный сайт;
Моя статья «Apache Pulsar как основа для системы очередей» — на Хабре.
А если хотите вместе с нами помогать людям и бизнесу через технологии — присоединяйтесь к командам. Свежие вакансии есть на нашем карьерном сайте.
mosinnik
Пара вопросов:
1) как там метрики считаются для консумер групп? в частности косумер лаг как показываете командам-владельцам сервисов
2) какой трафик сейчас пережевывает это решение? не на НТ, оно же в проме уже как понимаю?
3) смотрели ли в сторону ydb? вроде как все что вам надо там есть без извращений над кафкой
4) не увидел ничего про гарантии, что там с очередностью и тразакциями?
ewolf Автор
Привет!
Метрики мы отдаем сами из quasar и шины данных. Для беклога мы делаем следующее
смотрим положения курсоров и оффсетов по партициям и получаем базовый беклог как
partition end - cursor offsetраспаковываем оффсеты, вычитаем индивидуально акнутые события.
итоговый рассчет оказывается приблизительным, но близким
И уже это пишется в метрику для пользователей
По нагрузке - сейчас около 40-50 млн rpm проходит.
В сторону ydb смотрели, но не хочется снова идти в не самую стандартную технологию. Пока предпочитаем api, которое если что можно подменить любой реализацией.
Гарантии у нас at least once, порядок и транзакционность доставки событий не гарантируем