Всем привет! Я Павел Агалецкий, ведущий инженер в Авито. Мы в компании используем микросервисную архитектуру с синхронным и асинхронным обменом событиями. В какой-то момент нам стало нужно обеспечивать более надёжную передачу сообщений. Стандартной Apache Kafka нам для этого было мало. Так мы пришли к идее, что пора строить собственную шину данных.
Зачем нужна шина данных
Микросервисы можно подключить к Apache Kafka напрямую. Это быстро и просто: нужно развернуть кластер, дать продюсерам (отправителям) и консьюмерам (получателям) адрес подключения и разрешить обмениваться событиями.
У такого подхода есть весомые минусы:
Сложно менять топологию системы. Если вы захотите отказаться от Kafka или разделить её на несколько кластеров, придётся обновить клиенты для всех пользователей, поменять адреса и настройки.
Под каждый язык разработки нужен свой клиент. В Авито мы пишем на Go, PHP, Python и других, получается зоопарк из разных систем только для обмена событиями.
С ростом системы прочитать сообщения из Kafka становится сложнее из-за растущей нагрузки.
Поэтому мы решили пойти другим путём и поставили между Kafka и микросервисами data bus, или шину данных. Она изолирует Kafka от сервисов и берет не себя получение и отправку сообщений получателям и реализует взаимодействие с Apache Kafka.
Микросервисы подключаются к шине данных через протокол websocket. Они публикуют события через один метод и подписываются на интересующие их события по названию топиков через другой. Протокол простой, поддерживается всеми языками разработки, которые мы используем в Авито.
Шина напрямую взаимодействует с Kafka и скрывает все особенности работы с ней. Разработчикам микросервисов не нужно знать ничего об офсетах, хранении событий. Команда поддержки разработала три клиента для разных языков программирования, чтобы инженерам Авито не приходилось разбираться в Kafka.
Кроме того, шина предоставляет разработчикам универсальные метрики. Например, скорость записи и чтения, количество публикуемых событий, время обработки событий, размер бэклога. Все эти данные пользователи получают из нашего сервиса сразу после подключения, без специальных действий. Это помогает разработчикам в отладке микросервисов.
Сейчас шина данных — центральная система Авито. Средняя нагрузка на неё составляет порядка 3 миллионов запросов на запись и около 10 миллионов запросов на чтение в минуту. Её постоянно используют больше 500 сервисов компании, поэтому наша главная цель — обеспечить физическую надёжность и доступность системы.
Помимо физической доступности, нам важна логическая надёжность шины данных. Все данные в ней должны быть консистентными, чтобы любой сервис-консьюмер смог прочитать сообщения. При этом нельзя, чтобы сервисы-продюсеры случайно или целенаправленно записывали невалидные события, которые могут вызвать поломку системы или просто останутся непрочитанными.
Архитектура шины данных на основе Apache Kafka
Сначала у нас был один большой кластер Kafka, распределенный между несколькими дата-центрами Авито. На тот момент мы полагались на возможность самой Kafka восстанавливаться после отказа части серверов, которые входят в её кластер.
Этот подход оказался для нас не очень удачным. Когда все серверы и дата-центры доступны, шина работала хорошо. Но при отключении дата-центра появлялась заметная просадка Latency.
Это происходило, потому что на момент отключения в Kafka было довольно много топиков, которые имели больше 15 тысяч партиций. При отключении дата-центра и потери связи с частью брокеров выполнялся ребаланс, выбирался новый мастер и в это время топики становились недоступны для записи. Это сильно влияло на работу всех наших пользователей.
Запись данных в шину для Авито важнее, чем чтение. Если микросервисы не могут записать данные в систему, получают ошибку или большой тайм-аут, то ломаются многие процессы. В итоге это становится заметно даже конечным пользователям Авито — на сайте или в приложении. Поэтому нам очень важно, чтобы запись всегда была быстрой.
Кроме того, мы хотели, чтобы работа с Kafka всегда была локальной. Для любого сервиса, который взаимодействует с шиной данных, все операции должны происходить внутри одного кластера. Поэтому мы переработали архитектуру и создали Kafka Federation.
Мы отказались от одного большого кластера и заменили его несколькими маленькими. В каждом дата-центре разместили свою Kafka и экземпляр шины данных, а также экземпляры микросервисов, которые тоже должны быть устойчивыми к отказу.
Микросервисы-продюсеры записывают данные в шину, которая находится в одном с ними кластере. Затем шина передает событие во Write Kafka, доступную только для записи.
Затем события из Write Kafka с помощью репликатора записываются в общую центральную Kafka. Её мы реализовали по старой схеме — распределили один кластер между всеми дата-центрами.
Центральная Kafka доступна для записи и чтения. Она подвержена тем же проблемам: медленная запись, высокий Latency. Но для пользователей это становится незаметным, потому что для них запись выполняется сначала в локальную Write Kafka.
Задержку замечает только репликатор: при возникновении проблем с дата-центром, он какое-то время не может переложить данные в большую Kafka. Но это не влияет на качество работы всех систем Авито.
Чтение с центральной Kafka тоже осуществляется репликами шины данных, расположенными в каждом из дата-центров. А уже к ним, в свою очередь, подключаются микросервисы-консьюмеры.
Так у нас получилась система, которая всегда доступна на запись вне зависимости от отказа того или иного дата-центра. Даже если один из них недоступен, микросервисы в остальных дата-центрах продолжают работу.
Как мы обеспечили логическую надежность шины данных
Микросервисы Авито постоянно эволюционируют: у них меняются данные и наборы событий, которые они публикуют и читают. Система должна уметь проверять сообщения и сохранять консистентность данных, чтобы ничего не сломалось из-за изменений. При этом все пользователи шины должны быть уверены, что они при любых условиях смогут прочитать события и получить валидные данные. Поэтому с точки зрения логической надёжности нам нужно было реализовать схемы сообщений и их валидацию.
Схема сообщения, или контракт — это описание того, что продюсер записывает, а консьюмер читает: какие у его события есть поля, типы данных. Разработчики прямо в коде микросервиса в специальной директории указывают события, которые они публикуют или читают.
Внутри Авито мы используем для описания синхронных и асинхронных схем обменов Brief — текстовый формат строгой типизации. Он недоступен вне компании, но по структуре описания похож на gRPC.
Схема события в формате Brief:
schema “service.update.commited” ServiceUpdateCommited ‘Коммит библиотеки для сервиса’
message ServiceUpdateCommited {
serviceName string ‘Название сервиса’
repository string ‘Адрес репозитория’
repositoryClone string ‘Адрес репозитория для клонирования’
owner string ‘Владелец сервиса’
update LibUpdateVersion ‘Библиотека для обновления’
commitData CommitData ‘Данные о коммите в репозиторий’
jiraTaskId *string ‘Уникальный идентификатор апдейта — название Jira тикета eg: ARCH-123’
}
message LibUpdateVersion {
libName string ‘Название библиотеки’
language string ‘Язык программирования’
desired string ‘Желаемое обновление’
}
message CommitData {
branchName string ‘Имя ветки, в которую был произведён коммит’
comment string ‘Комментарий к pr’
}
Для удобства разработчиков мы реализовали кодогенерацию на основе описания событий. В автоматически сгенерированном коде прописаны модели данных и клиенты, программисту достаточно указать схему события. В итоге получается работающий клиент, который можно сразу использовать для публикации или чтения сообщений.
Проверку валидности схемы выполняет шина данных. Когда один из сервисов пытается выкатиться в продакшен, система сравнивает его схемы событий со схемами его продюсеров и консьюмеров.
Шина проверяет ряд критериев: наличие обязательных полей, совпадение полей и типов данных для продюсера и консьюмера.
Также шина запрещает микросервисам выполнять не валидные действия. Например, мы не даем возможность продюсеру удалить событие, у которого есть консьюмер. То есть сначала нужно убрать ожидание этого сообщения во всех сервисах-получателях, а затем — в сервисе-продюсере. Другой пример — нельзя поменять тип поля со строки на число или обратно.
Итоги
Шина данных способна пережить отказ отдельных серверов или даже дата-центра. В продакшене система работает уже больше трех лет. За последние полтора года не было ни одного серьезного инцидента, связанного с недоступностью или отказом.
Шина гарантирует соблюдение контрактов между продюсерами и консьюмерами. В конечном итоге это обеспечивает надежность системы в длительной перспективе: пользователи уверены, что их сервисы всегда смогут прочитать события, которые публикуют другие сервисы.
Шина данных скрывает от пользователей все нюансы работы с Kafka. Нам это помогло незаметно заменить схему с одним экземпляром системы на Kafka Federation.
Сейчас мы думаем о дальнейших улучшения топологии. Одна из главных задач — уйти от единой точки отказа в виде общей для всех Apache Kafka. Если мы решим поменять топологию или даже заменить Apache Kafka на какое-то другое решение, то сможем сделать это незаметно для пользователей. Для них переход будет прозрачным, так как мы сохраняем наш API и не меняем точки входа, как бы ни менялась топология шины данных внутри.
Если решим поменять инструмент, нам не придется беспокоить пользователей — для них переход снова будет незаметным.
Предыдущая статья: Ультимативный гайд по HTTP. Часть 1. Структура запроса и ответа
Комментарии (11)
maxzh83
12.04.2023 12:14Мне показалось или вы переизобрели enterprise service bus? Еще интересно насколько хорошо работает WebSocket, который из коробки довольно примитивный в плане разрыва соединения и не имеет поддержки схемы данных (в отличие от Kafka)?
ewolf Автор
12.04.2023 12:14+1По-сути, переизобрели :)
Можете чуть подробнее раскрыть вопрос про разрыв соединения и схему данных?
maxzh83
12.04.2023 12:14Грубо говоря, если соединение рвется в websocket, то очень долго можно об этом не знать, особенно если это сетевая проблема и соединение именно разрывается, а не закрывается стандартно. Т.е. с точки зрения клиента ws все хорошо, а на самом деле нет и часть сообщений может не дойти после такого разрыва. Ну и вроде как нет какого-то стандартного механизма восстановления соединения после разрыва, каждый по своему делает.
А про схему имеется в виду некий аналог avro у kafka, которого для ws нет. Кстати, вы шлете сообщения в тексте или бинарно?
illia-ivanou
12.04.2023 12:14очень долго
Ровно столько, сколько вы заложите в timeout на ping/pong с последующей логикой на переустановку соединения, WS специфицирует такие фреймы для использования как часть протокола: https://www.rfc-editor.org/rfc/rfc6455#section-5.5.2
Очевидно, и клиент и сервер должны уметь играть в ping/pong и делать это с последующим переотсылкой/перезапросом данных в соответствии с установленным между ними контрактом для разрешения таких ситуаций.
А про схему имеется в виду некий аналог avro у kafka, которого для ws нет. Кстати, вы шлете сообщения в тексте или бинарно?
В статье описан закрытый формат сериализации данных:
Внутри Авито мы используем для описания синхронных и асинхронных схем обменов Brief — текстовый формат строгой типизации. Он недоступен вне компании, но по структуре описания похож на...
ewolf Автор
12.04.2023 12:14Выше уже ответили на многие вопросы.
У нас есть таймауты на подключения, keep-alive сообщения, чтобы убедиться, что клиент жив.
Также у нас есть реестр схем событий, благодаря которому мы можем валидировать контракты как в момент попытки деплоя сервиса, так и при публикации
mark_ablov
12.04.2023 12:14То есть у вас нет единого реестра схем, а каждый сервис имеет контракты как пордьсер и консьюмер?
ewolf Автор
12.04.2023 12:14Единый реестр есть, в статье есть кратко про это. Все схемы событий при деплое попадают в этот реестр и потом мы валидируем изменения на допустимость и публикуемые события на корректность
illia-ivanou
12.04.2023 12:14Как альтернативу выбранному WS и всплывающему в голове gRPC я бы все-таки предложил RSocket: хорошо проработанный протокол явно прикладного уровня, реализующий очень гибкие механизмы, которые так или иначе, придется организовывать на голом WS для контроля потребления данных.
Если можно, несколько вопросов:
Если клиентские WS соединения к event bus соотносятся с потребителями соответствующей kafka consumer group на стороне event bus как 1:1, формируя что-то схожее, что из себя и представляла бы топология соединений без промежуточных серверов абстракции, просто подменяя протокол и сливая данные с партиций одного потребителя в одно соединение на клиентскую ноду: разве это не вызывает дисбаланс в нагрузке на ноды data bus при группе неравномерно распределенных соединений в случае proxy-load-balancer стоящим перед нодами? Или вы запарились и реализовали заумный client load balancing?
По поводу отмирания сети датацентра: не может ли этот сценарий породить нарушение последовательности записи?
Событие, успевшее попасть в локальный кластер kafka, но неуспевшее отреплицироваться после отмирания сети в одном датацентре, может попасть в глобальный кластер при восстановлении сети уже после того, как другой центр, куда переключился конечный пользователь, отреплицирует туда более новые события, относящиеся к одной и той же сущности.При выборе решения не рассматривали Apache Camel? Есть коннектор к Kafka и возможность поднимать WS эндпоинты, как и добавлять собственную логику.
но по структуре описания похож на gRPC
Вероятно, здесь описка и имелся ввиду protobuf - бинарный формат сериализации данных, используемый gRPC по умолчанию, однако gRPC как протокол не определяет его как единственный и/или обязательный, например, с ним можно спокойно использовать JSON.
ewolf Автор
12.04.2023 12:14Спасибо, на RSocket посмотрим обязательно.
Дисбаланс подключений действительно возможен. Но нагрузка от таких подключений на databus не очень большая, а подключений много, поэтому в среднем распределение достаточно равномерное. Поэтому в этом плане у нас есть упрощение: сложной балансировки нет.
События действительно могут перепутаться, также, да, в текущей схеме может быть ситуация, когда часть событий останется в умершем ДЦ. Но мы намеренно не предоставляем гарантий порядка событий на данном этапе эволюции нашей системы и требуем от клиентов устойчивости к нарушениям порядка и дублям.
Camel для нашего случая не рассматривали. Наша шина представляет собой просто механизм обмена событий и не предполагает наличие логики по их обработке или трансформации. В будущем - возможно вокруг нее добавится какой-то механизм для обработки, тогда мы посмотрим на доступные варианты, может быть и camel, хотя он не очень ложится в стек нашей компании (из-за java/kotlin)
Да, формат конечно же похож на описание protobuf, спасибо за замечание
c3gdlk
Павел, спасибо за статью. В архитектурных статьях мне всегда интересно не только само решение, но и ход рассуждений при выборе альтернатив. Если есть возможность, расскажите об этом немного.
Например,
Можно использовать Outbox и записывать сообщения в рамках транзакции в базу, для того чтобы у пользователя все прошло гладко, а вот в шину уже отправлять тогда, когда получится
Для продюсера можно написать библиотеку с общим интерфейсом, которая под капотом скрывает реализацию работы с Кафкой. Вы в итоге такой клиент и написали, только для Вашей собственной шины
Даже не представляю насколько эта шина должна быть сложным продуктом. Кафка довольно специфичный инструмент и ее конзюмеры не просто так сложные. Все эти сдвигания оффсетов, обработки ошибок, хертбиты и прочее. Судя по всему Ваша шина должна все это повторять и как-то с этим работать.
ewolf Автор
Паттерн outbox у нас поддерживается. Есть реализация клиента databus, которая на самом деле пишет события в отдельную специальную таблицу в той же базе данных, что и остальные сущности, а потом фоновый отдельный воркер перекладывает их в databus. Для клиента все работает атомарно внутри одной транзакции.
Написать общую библиотеку можно, но потом сложно следить за тем, чтобы ее своевременно обновляли и чтобы например, когда мы включаем один кластер или переводим его на другие адреса, или когда мы хотим заменить кафку на что-то другое, то это прошло бы незаметно для клиента.
Наш же сервис - это фасад.
Сама по себе шина не так сложна, фактически в ней просто websocket соединение вида pub/sub, где клиент может сказать в какие топики он хочет писать, а из каких читать.
Пока жив коннект от клиента - мы держим коннект в кафку. Плюс под капотом пара дополнительных штук типа набирания батча для клиента на консьюминге и так далее.
Общая задача была в том, чтобы защитить клиентов от знания о внутренней технологии, ее топологии и не дать совершать невалидные действия, так что решение в этом плане оправдывает себя