Всем привет! Я Павел Агалецкий, ведущий инженер в Авито. Мы в компании используем микросервисную архитектуру с синхронным и асинхронным обменом событиями. В какой-то момент нам стало нужно обеспечивать более надёжную передачу сообщений. Стандартной Apache Kafka нам для этого было мало. Так мы пришли к идее, что пора строить собственную шину данных. 

Зачем нужна шина данных

Микросервисы можно подключить к Apache Kafka напрямую. Это быстро и просто: нужно развернуть кластер, дать продюсерам (отправителям) и консьюмерам (получателям) адрес подключения и разрешить обмениваться событиями.

Прямое подключение микросервисов к Kafka 

У такого подхода есть весомые минусы:

  1. Сложно менять топологию системы. Если вы захотите отказаться от Kafka или разделить её на несколько кластеров, придётся обновить клиенты для всех пользователей, поменять адреса и настройки.

  2. Под каждый язык разработки нужен свой клиент. В Авито мы пишем на Go, PHP, Python и других, получается зоопарк из разных систем только для обмена событиями. 

  3. С ростом системы прочитать сообщения из Kafka становится сложнее из-за растущей нагрузки.

Поэтому мы решили пойти другим путём и поставили между Kafka и микросервисами data bus, или шину данных. Она изолирует Kafka от сервисов и берет не себя получение и отправку сообщений получателям и реализует взаимодействие с Apache Kafka.

Шина отделяет микросервисы друг от друга и от Kafka
Шина отделяет микросервисы друг от друга и от Kafka

Микросервисы подключаются к шине данных через протокол websocket. Они публикуют события через один метод и подписываются на интересующие их события по названию топиков через другой. Протокол простой, поддерживается всеми языками разработки, которые мы используем в Авито.

Микросервисы подключаются к шине по протоколу websocket, а сама шина работает с Kafka напрямую
Микросервисы подключаются к шине по протоколу websocket, а сама шина работает с Kafka напрямую

Шина напрямую взаимодействует с Kafka и скрывает все особенности работы с ней. Разработчикам микросервисов не нужно знать ничего об офсетах, хранении событий. Команда поддержки разработала три клиента для разных языков программирования, чтобы инженерам Авито не приходилось разбираться в Kafka.

Кроме того, шина предоставляет разработчикам универсальные метрики. Например, скорость записи и чтения, количество публикуемых событий, время обработки событий, размер бэклога. Все эти данные пользователи получают из нашего сервиса сразу после подключения, без специальных действий. Это помогает разработчикам в отладке микросервисов. 

Сейчас шина данных — центральная система Авито. Средняя нагрузка на неё составляет порядка 3 миллионов запросов на запись и около 10 миллионов запросов на чтение в минуту. Её постоянно используют больше 500 сервисов компании, поэтому наша главная цель — обеспечить физическую надёжность и доступность системы.

Помимо физической доступности, нам важна логическая надёжность шины данных. Все данные в ней должны быть консистентными, чтобы любой сервис-консьюмер смог прочитать сообщения. При этом нельзя, чтобы сервисы-продюсеры случайно или целенаправленно записывали невалидные события, которые могут вызвать поломку системы или просто останутся непрочитанными.

Архитектура шины данных на основе Apache Kafka

Сначала у нас был один большой кластер Kafka, распределенный между несколькими дата-центрами Авито. На тот момент мы полагались на возможность самой Kafka восстанавливаться после отказа части серверов, которые входят в её кластер.

Одна большая Kafka была растянута между всеми дата-центрами Авито
Одна большая Kafka была растянута между всеми дата-центрами Авито

Этот подход оказался для нас не очень удачным. Когда все серверы и дата-центры доступны, шина работала хорошо. Но при отключении дата-центра появлялась заметная просадка Latency.

Момент отключения одного дата-центра. Время записи в нашу систему достигает 30 секунд
Момент отключения одного дата-центра. Время записи в нашу систему достигает 30 секунд

Это происходило, потому что на момент отключения в Kafka было довольно много топиков, которые имели больше 15 тысяч партиций. При отключении дата-центра и потери связи с частью брокеров выполнялся ребаланс, выбирался новый мастер и в это время топики становились недоступны для записи. Это сильно влияло на работу всех наших пользователей. 

Запись данных в шину для Авито важнее, чем чтение. Если микросервисы не могут записать данные в систему, получают ошибку или большой тайм-аут, то ломаются многие процессы. В итоге это становится заметно даже конечным пользователям Авито — на сайте или в приложении. Поэтому нам очень важно, чтобы запись всегда была быстрой. 

Кроме того, мы хотели, чтобы работа с Kafka всегда была локальной. Для любого сервиса, который взаимодействует с шиной данных, все операции должны происходить внутри одного кластера. Поэтому мы переработали архитектуру и создали Kafka Federation.

Общая схема архитектуры Kafka Federation
Общая схема архитектуры Kafka Federation

Мы отказались от одного большого кластера и заменили его несколькими маленькими. В каждом дата-центре разместили свою Kafka и экземпляр шины данных, а также экземпляры микросервисов, которые тоже должны быть устойчивыми к отказу.

Микросервисы-продюсеры записывают данные в шину, которая находится в одном с ними кластере. Затем шина передает событие во Write Kafka, доступную только для записи.

Первый шаг: микросервисы могут только записывать данные локально
Первый шаг: микросервисы могут только записывать данные локально

Затем события из Write Kafka с помощью репликатора записываются в общую центральную Kafka. Её мы реализовали по старой схеме — распределили один кластер между всеми дата-центрами.

Второй шаг: репликатор копирует событие из локальных Kafka в центральную
Второй шаг: репликатор копирует событие из локальных Kafka в центральную

Центральная Kafka доступна для записи и чтения. Она подвержена тем же проблемам: медленная запись, высокий Latency. Но для пользователей это становится незаметным, потому что для них запись выполняется сначала в локальную Write Kafka. 

Задержку замечает только репликатор: при возникновении проблем с дата-центром, он какое-то время не может переложить данные в большую Kafka. Но это не влияет на качество работы всех систем Авито.

Чтение с центральной Kafka тоже осуществляется репликами шины данных, расположенными в каждом из дата-центров. А уже к ним, в свою очередь, подключаются микросервисы-консьюмеры.

Третий шаг: шина данных считывает сообщение из центральной Kafka и передает его консьюмерам
Третий шаг: шина данных считывает сообщение из центральной Kafka и передает его консьюмерам

Так у нас получилась система, которая всегда доступна на запись вне зависимости от отказа того или иного дата-центра. Даже если один из них недоступен, микросервисы в остальных дата-центрах продолжают работу.

Как мы обеспечили логическую надежность шины данных

Микросервисы Авито постоянно эволюционируют: у них меняются данные и наборы событий, которые они публикуют и читают. Система должна уметь проверять сообщения и сохранять консистентность данных, чтобы ничего не сломалось из-за изменений. При этом все пользователи шины должны быть уверены, что они при любых условиях смогут прочитать события и получить валидные данные. Поэтому с точки зрения логической надёжности нам нужно было реализовать схемы сообщений и их валидацию. 

Схема сообщения, или контракт — это описание того, что продюсер записывает, а консьюмер читает: какие у его события есть поля, типы данных. Разработчики прямо в коде микросервиса в специальной директории указывают события, которые они публикуют или читают. 

Внутри Авито мы используем для описания синхронных и асинхронных схем обменов Brief — текстовый формат строгой типизации. Он недоступен вне компании, но по структуре описания похож на gRPC.

Схема события в формате Brief:

schema “service.update.commited” ServiceUpdateCommited ‘Коммит библиотеки для сервиса’

message ServiceUpdateCommited {
    serviceName string ‘Название сервиса’
    repository string ‘Адрес репозитория’
    repositoryClone string ‘Адрес репозитория для клонирования’
    owner string ‘Владелец сервиса’
    update LibUpdateVersion ‘Библиотека для обновления’
    commitData CommitData ‘Данные о коммите в репозиторий’
    jiraTaskId *string ‘Уникальный идентификатор апдейта — название Jira тикета eg: ARCH-123’
}

message LibUpdateVersion {
    libName string ‘Название библиотеки’
    language string ‘Язык программирования’
    desired string ‘Желаемое обновление’
}

message CommitData {
    branchName string ‘Имя ветки, в которую был произведён коммит’
    comment string ‘Комментарий к pr’
}

Для удобства разработчиков мы реализовали кодогенерацию на основе описания событий. В автоматически сгенерированном коде прописаны модели данных и клиенты, программисту достаточно указать схему события. В итоге получается работающий клиент, который можно сразу использовать для публикации или чтения сообщений. 

Проверку валидности схемы выполняет шина данных. Когда один из сервисов пытается выкатиться в продакшен, система сравнивает его схемы событий со схемами его продюсеров и консьюмеров.

Схема валидации данных при развертывании микросервера
Схема валидации данных при развертывании микросервера

Шина проверяет ряд критериев: наличие обязательных полей, совпадение полей и типов данных для продюсера и консьюмера.

Также шина запрещает микросервисам выполнять не валидные действия. Например, мы не даем возможность продюсеру удалить событие, у которого есть консьюмер. То есть сначала нужно убрать ожидание этого сообщения во всех сервисах-получателях, а затем — в сервисе-продюсере. Другой пример — нельзя поменять тип поля со строки на число или обратно.

Итоги

Шина данных способна пережить отказ отдельных серверов или даже дата-центра. В продакшене система работает уже больше трех лет. За последние полтора года не было ни одного серьезного инцидента, связанного с недоступностью или отказом. 

Шина гарантирует соблюдение контрактов между продюсерами и консьюмерами. В конечном итоге это обеспечивает надежность системы в длительной перспективе: пользователи уверены, что их сервисы всегда смогут прочитать события, которые публикуют другие сервисы.

Шина данных скрывает от пользователей все нюансы работы с Kafka. Нам это помогло незаметно заменить схему с одним экземпляром системы на Kafka Federation. 

Сейчас мы думаем о дальнейших улучшения топологии. Одна из главных задач — уйти от единой точки отказа в виде общей для всех Apache Kafka. Если мы решим поменять топологию или даже заменить Apache Kafka на какое-то другое решение, то сможем сделать это незаметно для пользователей. Для них переход будет прозрачным, так как мы сохраняем наш API и не меняем точки входа, как бы ни менялась топология шины данных внутри.

Если решим поменять инструмент, нам не придется беспокоить пользователей — для них переход снова будет незаметным.

Предыдущая статья: Ультимативный гайд по HTTP. Часть 1. Структура запроса и ответа

Комментарии (11)


  1. c3gdlk
    12.04.2023 12:14

    Павел, спасибо за статью. В архитектурных статьях мне всегда интересно не только само решение, но и ход рассуждений при выборе альтернатив. Если есть возможность, расскажите об этом немного.

    Например,

    1. Можно использовать Outbox и записывать сообщения в рамках транзакции в базу, для того чтобы у пользователя все прошло гладко, а вот в шину уже отправлять тогда, когда получится

    2. Для продюсера можно написать библиотеку с общим интерфейсом, которая под капотом скрывает реализацию работы с Кафкой. Вы в итоге такой клиент и написали, только для Вашей собственной шины

    3. Даже не представляю насколько эта шина должна быть сложным продуктом. Кафка довольно специфичный инструмент и ее конзюмеры не просто так сложные. Все эти сдвигания оффсетов, обработки ошибок, хертбиты и прочее. Судя по всему Ваша шина должна все это повторять и как-то с этим работать.


    1. ewolf Автор
      12.04.2023 12:14
      +1

      Паттерн outbox у нас поддерживается. Есть реализация клиента databus, которая на самом деле пишет события в отдельную специальную таблицу в той же базе данных, что и остальные сущности, а потом фоновый отдельный воркер перекладывает их в databus. Для клиента все работает атомарно внутри одной транзакции.

      Написать общую библиотеку можно, но потом сложно следить за тем, чтобы ее своевременно обновляли и чтобы например, когда мы включаем один кластер или переводим его на другие адреса, или когда мы хотим заменить кафку на что-то другое, то это прошло бы незаметно для клиента.

      Наш же сервис - это фасад.

      Сама по себе шина не так сложна, фактически в ней просто websocket соединение вида pub/sub, где клиент может сказать в какие топики он хочет писать, а из каких читать.

      Пока жив коннект от клиента - мы держим коннект в кафку. Плюс под капотом пара дополнительных штук типа набирания батча для клиента на консьюминге и так далее.

      Общая задача была в том, чтобы защитить клиентов от знания о внутренней технологии, ее топологии и не дать совершать невалидные действия, так что решение в этом плане оправдывает себя


  1. maxzh83
    12.04.2023 12:14

    Мне показалось или вы переизобрели enterprise service bus? Еще интересно насколько хорошо работает WebSocket, который из коробки довольно примитивный в плане разрыва соединения и не имеет поддержки схемы данных (в отличие от Kafka)?


    1. ewolf Автор
      12.04.2023 12:14
      +1

      По-сути, переизобрели :)

      Можете чуть подробнее раскрыть вопрос про разрыв соединения и схему данных?


      1. maxzh83
        12.04.2023 12:14

        Грубо говоря, если соединение рвется в websocket, то очень долго можно об этом не знать, особенно если это сетевая проблема и соединение именно разрывается, а не закрывается стандартно. Т.е. с точки зрения клиента ws все хорошо, а на самом деле нет и часть сообщений может не дойти после такого разрыва. Ну и вроде как нет какого-то стандартного механизма восстановления соединения после разрыва, каждый по своему делает.

        А про схему имеется в виду некий аналог avro у kafka, которого для ws нет. Кстати, вы шлете сообщения в тексте или бинарно?


        1. illia-ivanou
          12.04.2023 12:14

          очень долго

          Ровно столько, сколько вы заложите в timeout на ping/pong с последующей логикой на переустановку соединения, WS специфицирует такие фреймы для использования как часть протокола: https://www.rfc-editor.org/rfc/rfc6455#section-5.5.2

          Очевидно, и клиент и сервер должны уметь играть в ping/pong и делать это с последующим переотсылкой/перезапросом данных в соответствии с установленным между ними контрактом для разрешения таких ситуаций.

          А про схему имеется в виду некий аналог avro у kafka, которого для ws нет. Кстати, вы шлете сообщения в тексте или бинарно?

          В статье описан закрытый формат сериализации данных:

          Внутри Авито мы используем для описания синхронных и асинхронных схем обменов Brief — текстовый формат строгой типизации. Он недоступен вне компании, но по структуре описания похож на...


        1. ewolf Автор
          12.04.2023 12:14

          Выше уже ответили на многие вопросы.

          У нас есть таймауты на подключения, keep-alive сообщения, чтобы убедиться, что клиент жив.

          Также у нас есть реестр схем событий, благодаря которому мы можем валидировать контракты как в момент попытки деплоя сервиса, так и при публикации


  1. mark_ablov
    12.04.2023 12:14

    То есть у вас нет единого реестра схем, а каждый сервис имеет контракты как пордьсер и консьюмер?


    1. ewolf Автор
      12.04.2023 12:14

      Единый реестр есть, в статье есть кратко про это. Все схемы событий при деплое попадают в этот реестр и потом мы валидируем изменения на допустимость и публикуемые события на корректность


  1. illia-ivanou
    12.04.2023 12:14

    Как альтернативу выбранному WS и всплывающему в голове gRPC я бы все-таки предложил RSocket: хорошо проработанный протокол явно прикладного уровня, реализующий очень гибкие механизмы, которые так или иначе, придется организовывать на голом WS для контроля потребления данных.

    Если можно, несколько вопросов:

    1. Если клиентские WS соединения к event bus соотносятся с потребителями соответствующей kafka consumer group на стороне event bus как 1:1, формируя что-то схожее, что из себя и представляла бы топология соединений без промежуточных серверов абстракции, просто подменяя протокол и сливая данные с партиций одного потребителя в одно соединение на клиентскую ноду: разве это не вызывает дисбаланс в нагрузке на ноды data bus при группе неравномерно распределенных соединений в случае proxy-load-balancer стоящим перед нодами? Или вы запарились и реализовали заумный client load balancing?

    2. По поводу отмирания сети датацентра: не может ли этот сценарий породить нарушение последовательности записи?
      Событие, успевшее попасть в локальный кластер kafka, но неуспевшее отреплицироваться после отмирания сети в одном датацентре, может попасть в глобальный кластер при восстановлении сети уже после того, как другой центр, куда переключился конечный пользователь, отреплицирует туда более новые события, относящиеся к одной и той же сущности.

    3. При выборе решения не рассматривали Apache Camel? Есть коннектор к Kafka и возможность поднимать WS эндпоинты, как и добавлять собственную логику.

    но по структуре описания похож на gRPC

    Вероятно, здесь описка и имелся ввиду protobuf - бинарный формат сериализации данных, используемый gRPC по умолчанию, однако gRPC как протокол не определяет его как единственный и/или обязательный, например, с ним можно спокойно использовать JSON.


    1. ewolf Автор
      12.04.2023 12:14

      Спасибо, на RSocket посмотрим обязательно.

      Дисбаланс подключений действительно возможен. Но нагрузка от таких подключений на databus не очень большая, а подключений много, поэтому в среднем распределение достаточно равномерное. Поэтому в этом плане у нас есть упрощение: сложной балансировки нет.

      События действительно могут перепутаться, также, да, в текущей схеме может быть ситуация, когда часть событий останется в умершем ДЦ. Но мы намеренно не предоставляем гарантий порядка событий на данном этапе эволюции нашей системы и требуем от клиентов устойчивости к нарушениям порядка и дублям.

      Camel для нашего случая не рассматривали. Наша шина представляет собой просто механизм обмена событий и не предполагает наличие логики по их обработке или трансформации. В будущем - возможно вокруг нее добавится какой-то механизм для обработки, тогда мы посмотрим на доступные варианты, может быть и camel, хотя он не очень ложится в стек нашей компании (из-за java/kotlin)

      Да, формат конечно же похож на описание protobuf, спасибо за замечание