Меня зовут Сергей Калинец, я — архитектор в компании Parimatch Tech, и в этой публикации хочу поделиться нашим опытом в области поиска сообщений в Kafka.

Для нашей компании Kafka является центральной нервной системой, через которую микросервисы обмениваются информацией. От входа до выхода сообщение может пройти через десяток сервисов, которые его фильтруют и трансформируют, перекладывая из одного топика в другой. Этими сервисами владеют разные команды, и очень полезно бывает посмотреть, что же содержится в том или ином сообщении. Особенно интересно это в случаях, когда что-то идет не по плану — важно понять, на каком этапе все превратилось в тыкву (ну и кому нужно в тыкву дать, чтобы такого больше не повторялось). С высоты птичьего полета решение простое — нужно взять соответствующие сообщения из кафки и посмотреть, что в них не так. Но, как обычно, интересное начинается в деталях.

Начнем с того, что кафка — это не просто брокер сообщений, как многие думают и пользуются ей, но и распределенный лог. Это значит много чего, но нам интересно то, что сообщения не удаляются из топиков после того, как их прочитали получатели, и технически можно в любой момент их прочитать заново и посмотреть, что там внутри. Однако все усложняется тем, что читать из Kafka можно только последовательно. Нужно знать смещение (для упрощения это — порядковый номер в топике), с которого нам нужны сообщения. Также возможно в качестве начальной точки указать время, но потом все равно можно только читать все сообщения по порядку.

Получается, что для того, чтобы найти нужное сообщение, нужно прочитать пачку и найти среди них те, которые интересны. Например, если мы хотим разобраться в проблемах игрока с id=42, нужно найти все сообщения, где он упоминается (playerId: 42), выстроить их в цепочку, ну и дальше уже смотреть, на каком этапе все пошло не так.

В отличие от баз данных вроде MySQL или MSSQL, у которых в комплекте поставки сразу есть клиентские приложения с графическим интерфейсом, ванильная Kafka не балует нас такими изысками и предлагает разве что консольные утилиты с довольно узким (на первый взгляд) функционалом.

Но есть и хорошие новости. На рынке есть ряд решений, которые помогают облегчить весь процесс. Сразу отмечу, что «на рынке» тут не в смысле «за деньги» — все рассмотренные ниже инструменты бесплатны.

Я опишу некоторые наиболее часто используемые, а в конце более детально разберем самое, на наш взгляд, близкое к идеальному. 

Итак, из чего можно выбирать?

Kafka Tool

(скриншот с официального сайта https://www.kafkatool.com/features.html

Наверное самый популярный инструмент, которым пользуются люди, привыкшие к GUI инструментам. Позволяет увидеть список топиков и прочитать отдельные сообщения. Есть возможность фильтровать по содержимому сообщений, указывать с какого смещения читать и сколько сообщений нужно прочитать. Но все же нужно знать примерные смещения интересующих нас сообщений, потому что Kafka Tool ищет только в рамках тех сообщений, которые были прочитаны. Говоря словами Дятлова из сериала Чернобыль (в оригинальной озвучке): «Not great not terrible».

Часто это единственная альтернатива, с которой вынуждены работать люди, волей судьбы столкнувшиеся с кафкой. Но есть и другие средства.

Kafka Console Consumer

Это одна из утилит, входящая в комплект поставки кафки. И она позволяет читать данные из нее. Как и сама Kafka, это JVM приложение, которое для своей работы требует установленной Java. В принципе, это справедливо и для Kafka Tool, но в данном случае можно обойтись без Java — если запускать через docker:

По сути, нужно просто перед самой командой добавить docker run --rm -it taion809/kafka-cli:2.2.0, что на докерском значит «запусти вот такой образ, выведи, то, что он показывает, на мой экран и удали образ, когда он закончит работу». Можно пойти еще дальше и добавить алиас типа

Утилита кажется архаичной и ее консольность может отпугнуть неискушенных адептов графических интерфейсов, но, как и большинство консольных инструментов, при правильном использовании она дает более мощный результат. Как именно — рассмотрим на примере следующего инструмента, тоже консольного.

Kafkacat

Это уже весьма могучая штука, которая позволяет читать и писать из Кафки, а также получать список топиков. Она тоже консольная, но при этом поудобнее в использовании, чем стандартные утилиты вроде kafka-console-consumer (и ее тоже можно запустить из докера).

Вот так можно сохранить 10 сообщений в файле messages (в формате JSON):

Файл можно будет использовать для анализа или чтобы воспроизвести какой-то сценарий. Безусловно, для решения этой задачи можно было бы и использовать родной консьюмер, но kafkacat позволяет выразить это короче. 

Или вот пример посложнее (в смысле букв немного больше, но решение проще многих других альтернатив):

Код взят с блога одного из самых топовых популяризаторов Kafka экосистемы — Robin Moffatt. В двух словах — один инстанс kafkacat читает сообщения из топика Kafka, потом сообщения трансформируются в необходимый формат и скармливаются другому инстансу kafkacat, который пишет их в другой топик. Многим такое может показаться вырвиглазным непонятным брейнфаком, однако на самом деле у нас готовое решение, которое можно запустить в докере и оно будет работать. Реализация такого сценария на вот этих ваших дотнетах и джавах потребует больше букв однозначно.

Но это меня уже занесло немного не в ту степь. Статья же про поиск. Так вот, похожим способом можно организовать и поиск сообщений — просто перенаправить вывод в какой-то grep и дело с концом.

Из возможностей для улучшений можно отметить тот факт, что kafkacat поддерживает Avro из коробки, а вот protobuf — нет. 

Kafka Connect + ELK

Все вышеперечисленные штуки работают и решают поставленные задачи, однако не всем удобны. При разборе инцидентов нужно именно поискать сообщения в разных топиках по определенному тексту — это может быть определенный идентификатор или имя. Наши QA (а именно они в 90% случаев занимаются подобными расследованиями) наловчились пользоваться Kafka Tool, а некоторые — и консольными утилитами. Но это все меркнет по сравнению с возможностями, которые дает Kibana, UI оболочка вокруг базы данных Elasticsearch. Kibana тоже используется нашии QA для анализа логов. И не раз поднимался вопрос: «давайте логировать все сообщения, чтобы можно было искать в Kibana». Но оказалось, что есть способ намного проще, чем добавление вызова логера в каждый из наших сервисов, и имя ему — Kafka Connect.

Kafka Connect — это решение для интеграции Kafka с другими системами. В двух словах, оно (или он?) позволяет экспортировать из и импортировать данные в Kafka без написания кода. Все, что нужно — это поднятый кластер Connect и конфиги наших хотелок в формате JSON. Слово «кластер» звучит дорого и сложно, но на самом деле это один или больше инстансов, которые можно поднять где угодно — мы, например, запускаем их там же, где и обычные сервисы — в Kubernetes. 

Kafka Connect предоставляет REST API, c помощью которого можно управлять коннекторами, занимающимися перегонкой данных из и в Kafka. Коннекторы задаются конфигурациями, в случае Elasticsearch эта конфигурация может быть вот такой:

Если такой конфиг через HTTP PUT передать на сервер Connect, то, при определенном стечении обстоятельств, создастся коннектор с именем ElasticSinkConnector, который будет в три потока читать данные из топика и писать их в Elastic. 

Выглядит всё крайне просто, но самое интересное, конечно же, в деталях. А деталей тут есть )

Большинство проблем связанно с данными. Обычно, как и в нашем случае, нужно работать с форматами данных, разработчики которого явно не думали, что когда-то эти данные будут попадать в Elasticsearch. 

Для решения нюансов с данными есть трансформации. Это такие себе функции, которые можно применять к данным и мутировать их, подстраивая под требования получателя. При этом всегда есть возможность использовать любую Kafka клиент технологию для случаев, когда трансформации бессильны. Какие же сценарии мы решали с помощью трансформаций? 

В нашем случае есть 4 трансформации. Вначале мы их перечисляем, а потом конфигурим. Трансформации применяются в порядке их перечисления, и это позволяет интересно их комбинировать.

Имена

Вначале добавляем возможность поиска по имени топика — просто дополняем наши сообщения полем с нужной информацией.

Индексы

Elasticsearch работает с индексами, которые имеют свойство забиваться и нервировать девопсов. Нужна поддержка удобной для управления схемой индексирования. В нашем случае мы остановились на индексе на сервис / команду с ежедневной ротацией. Плюс решения — хранение данные из разных топиков в одном индексе с возможностью контролировать его возраст.

Даты 

Для того, чтобы в Kibana можно было искать по дате, необходимо задать поле, в котором эта дата содержится. Мы используем дату публикации сообщения в Kafka. Чтобы ее получить, мы вначале вставляем поле с датой сообщения, а потом конвертируем ее в UTC формат. Конвертация была добавлена, чтобы помочь Elasticsearch распознать в этом поле timestamp, однако в нашем случае это не всегда происходило, поэтому мы добавили index template, который явно говорил «в этом поле — дата»: 

В результате у нас сообщения практически мгновенно становятся доступными для анализа, и время, потраченное на разбор инцидентов, уменьшается. 

Тут, конечно, стоит отметить, что данная инициатива сейчас у нас на этапе внедрения, поэтому нельзя сказать,что все массово ищут все что нужно в Kibana, но мы к этому уверенно идем.

Вообще, Kafka Connect применимо не только для таких задач. Его можно вполне использовать в тех случаях, где нужна интеграция с другими системами, Реально, например, реализовать полнотекстовый поиск в вашем приложении с помощью двух коннекторов. Один будет читать из операционной базы обновления и писать их в Kafka. А второй — читать из Kafka и отсылать в Elasticsearch. Приложение делает поисковый запрос в Elasticsearch, получает id и по нему находит нужные данные в базе. 

Заключение

Ну а наша публикация подходит к концу. Очень надеюсь, что вы узнали что-то новое для себя, а иначе — зачем это все? Если что-то не получилось раскрыть, или вы категорически не согласны с чем-то, или может есть более удобные способы решения подобных проблем — напишите про это в комментариях, обсудим )