
Привет, Хабр! Я Артём Чаадаев, бэкенд-разработчик в Туту. Я занимаюсь разработкой на языке Go в команде ассортимента размещения. Мы отвечаем за работу с контентом отелей и интеграции с поставщиками.
Периодически мы собираем данные для аналитики, и в таких задачах нужно реализовывать схему отправки данных в ClickHouse. В этой схеме мы преобразовываем данные с помощью Redpanda Connect. В статье покажу простой практический пример работы с этим инструментом, который должен помочь тем, кто только начинает его использовать. А еще расскажу про преимущества и проблемы, с которыми мы столкнулись при работе с Redpanda Connect.
Постановка задачи
Команде аналитиков нужны данные по всем отелям из БД в их ClickHouse, с которыми можно работать. Эти данные должны стабильно выгружаться в назначенный период времени. Желательно реализовать логику отправки изменений, а не полных дампов, но это за рамками тематики данной статьи.
Описание архитектуры
Для решения задачи мы используем следующую архитектуру:
MongoDB, которая содержит все данные;
сервис сбора данных на Go из MongoDB;
Kafka, куда потоком отправляются записи для последующей обработки;
Redpanda Connect, который собирает данные из Kafka, преобразует и отправляет дальше в ClickHouse;
ClickHouse, с которым работает команда аналитиков.

В MongoDB содержатся все необходимые для сбора данные об отелях. Сервис на Go собирает данные из MongoDB, он может дозировать нагрузку на БД по чтению и партиционировать все данные. Этот же сервис отправляет полученные записи из БД в Kafka. После Redpanda Connect собирает данные и проверяет, соответствуют ли они контракту, который согласован с командой аналитики. Этот контракт представлен в виде JSON-схемы. Дальше Redpanda Connect преобразовывает запись к формату хранения в ClickHouse и отправляет запрос INSERT.
С этими данными теперь можно работать. В этой статье я сделал фокус именно на работе с Redpanda Connect, который в Туту принят как стандарт для задач доставки данных для аналитики. По этой ссылке можно найти пример, который воспроизводит описанную архитектуру в упрощенном виде: вместо MongoDB и сервиса небольшой скрипт на Go, который отправляет в Kafka данные по отелям. Формат данных в примере тоже сильно упрощен: в боевых условиях он намного более разветвленный.
Как использовать Redpanda Connect
Redpanda Connect — это высокопроизводительный и отказоустойчивый процессор потоковых данных написанный на языке Go. Redpanda Connect способен подключаться к нескольким источникам-брокерам сообщений и осуществлять операции с данными: обогащать, преобразовывать и фильтровать на предмет содержания полезной нагрузки. Инструмент достаточно прост для развертывания и мониторинга: его можно легко запустить бинарным файлом или через Docker и использовать с нужным потоком данных. Также Redpanda Connect включает свой язык маппинга для входных данных — Bloblang.
В нашей схеме Redpanda Connect:
получает данные из Kafka;
проверяет их на соответствие JSON-схеме;
преобразует данные в JSON-объект, который соответствует схеме в ClickHouse;
отправляет полученный объект в ClickHouse.
Все это можно достаточно просто описать в конфигурационном YAML-файле.
Этот файл состоит из трех частей:
вход (input);
пайплайн (pipeline);
выход (output).
Вход (input)
Здесь описывается источник данных.
Ниже показан фрагмент кода с входом:
input:
kafka:
addresses: [localhost:9093]
topics: [hotels_snapshot]
consumer_group: go_clickhouse_kafka
batching:
count: 1
period: 5s
Здесь показаны настройки для подключения к Kafka:
kafka.addresses
— адрес брокера;kafka.topics
— список топиков, из которых происходит чтение;kafka.consumer_group
— консьюмер-группа (группа идентифицирует клиентов, которые используют топик в многопоточном режиме);kafka.batching.count
— количество сообщений, при достижении которого батч отправляется в обработку пайплайном;kafka.batching.period
— количество секунд, через которое неполный батч отправляется в обработку независимо от размера.
Полный список настроек можно изучить в документации.
Пайплайн (pipeline)
После настройки получения входных данных нужно описать пайплайн их обработки.
Ниже показан фрагмент кода с простейшим пайплайном:
pipeline:
processors:
- mapping: |
root.id = this.id
root.geo_id = this.geo_id
root.emails = this.emails
root.type = this.type
if this.content_ru != null {
root.content_ru_address = this.content_ru.address
root.content_ru_name = this.content_ru.name
root.content_ru_description = this.content_ru.description
}
root.created_at = this.created_at.ts_format("2006-01-02 15:04:05")
В пайплайне указываются процессоры, которые и формируют логику обработки данных. В данном фрагменте кода задействован процессор mapping
. Он преобразовывает данные из полученного входного формата в более плоский JSON-объект, который подходит для импорта в Clickhouse.
Для перекладывания данных из одного формата в другой процессор использует язык Bloblang. Здесь root — выходной документ, который будет отправляться в ClickHouse, а this — полученный объект. Также в объекте можно проследить использование конструкции if c проверкой содержимого поля на null и использование функции ts_format, которая изменяет дату в подходящий для ClickHouse формат. Подробнее обо всех функциях и языковых конструкциях можно прочитать в документации.
Помимо обработки данных нам нужно проверять, соответствует ли документ согласованной JSON-схеме.
Ниже показан фрагмент кода с использованием валидации cхемы:
pipeline:
processors:
- json_schema:
schema_path: "file://schemas/hotel.json"
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- mapping: 'root = deleted()' # Drop messages that fail
- mapping: |
root.id = this.id
root.geo_id = this.geo_id
root.emails = this.emails
root.type = this.type
if this.content_ru != null {
root.content_ru_address = this.content_ru.address
root.content_ru_name = this.content_ru.name
root.content_ru_description = this.content_ru.description
}
root.created_at = this.created_at.ts_format("2006-01-02 15:04:05")
В данном фрагменте с помощью json_schema
указан путь к файлу с нужной схемой, а с помощью catch
указана обработка ситуации, когда валидация не прошла. В этом случае логгируется ошибка, и сообщение отбрасывается с помощью 'root = deleted()'
. Так сделано для упрощения примера, но в боевых условиях мы используем механизм DLQ (Dead letters queue) — сохраняем отброшенные сообщения для последующей обработки.
Выход (output)
Помимо входа и пайплайна нужно описать, куда отправляются обработанные сообщения.
Ниже показан фрагмент кода с выходом:
output:
http_client:
url: http://localhost:18123?query=INSERT+INTO+hotels.content+FORMAT+JSONEachRow
verb: POST
headers:
Content-Type: application/json
Content-Encoding: gzip
rate_limit: ""
timeout: 5s
max_in_flight: 5
batching:
count: 1
period: 5s
processors:
- archive:
format: lines
- compress:
algorithm: gzip
level: 5
Здесь использован выход http_client
, который подразумевает отправку HTTP-запроса на нужный сервер. В нашем случае это API ClickHouse.
В параметрах указаны:
http_client.url
— URL;http_client.verb
— HTTP-метод;http_client.headers
— HTTP-заголовки, необходимые для взаимодействия с Clickhouse (например, авторизация);http_client.timeout
— таймаут клиента;http_client.max_in_flight
— максимальное число отправленных батчей сообщений в момент времени, для которых еще не получено подтверждение о получении;http_client.batching.count
— количество сообщений, при достижении которого батч отправляется в ClickHouse;http_client.batching.period
— количество секунд, через которое неполный батч отправляется в ClickHouse независимо от размера;http_client.batching.processors.archive
— процессор, который позволяет архивировать все сообщения батча в одно;http_client.batching.processors.compress
— процессор, который сжимает сообщения с помощью алгоритма gzip.
Полный список настроек можно изучить в документации.
Для отладки полезно использовать выход stdout
и посмотреть на результаты маппинга данных своими глазами.
Преимущества Redpanda Connect
Не нужно писать полноценный отдельный сервис для сбора данных из Kafka. Достаточно написать простой конфиг со входами, выходами и маппингом и задействовать контейнер с установленным Redpanda Connect. При желании контейнеры можно масштабировать.
Также есть возможность добавлять разные конфиги и параллельно встраивать туда другие процессы сбора и обработки данных. Redpanda Connect позволяет выбрать подходящие настройки для чтения и для отправки данных, а встроенный язык маппинга — выполнить преобразования.
Помимо этого есть функциональность для написания юнит-тестов. В примере мы используем Kafka, но Redpanda Connect позволяет в качестве источников выбирать огромное количество вариантов: MongoDB, любой HTTP-клиент или сервер, AMQP-совместимые решения, NATS, Redis, SQL-ные СУБД, csv-файлы и другие. Полный список источников есть в документации. Аналогичная ситуация и с типами выходов. Инструмент достаточно универсальный.
Проблемы, с которыми мы столкнулись
Главная проблема, с которой мы столкнулись, — недостаток экспертизы. Мне как разработчику-бэкендеру нужно было хорошо погружаться в Redpanda Connect, хотя мы в команде не так часто пишем процессы сбора данных для аналитики.
Это привело к трудностям в отладке: при локальном запуске и в среде отлавливались разные ошибки. Не всегда логи позволяли быстро понять истинную причину и разобраться, почему документ не обрабатывался (и обрабатывался ли вообще) перед отправкой в ClickHouse.
Эту проблему можно решить, если в команде появится отдельный эксперт по инструменту. Нам в итоге помогли коллеги из Data Team. Теперь при добавлении нового потока данных для аналитики можно быстро обратиться за помощью к эксперту, который еще и может подобрать оптимальные конфигурации для Kafka и ClickHouse. И в целом процессы с потоками данных для аналитики теперь стандартизированы.
Выводы
Redpanda Connect помог нам решить задачу, но по пути возникли трудности, с которыми могут справиться люди с профильной экспертизой в инструменте.
Проще всего, если в команде есть эксперты, для которых не составит труда написать новый маппинг и по шаблону развернуть Redpanda Connect. Для частых задач сбора аналитики в стандартизированных процессах это отличный универсальный и легко масштабируемый инструмент.
Если же задача по сбору данных разовая, как это было на начальном этапе у нас, а в команде при этом нет человека с профильной экспертизой, то возможно быстрее и эффективнее будет написать свой сервис на нужном языке по своим стандартам логирования и отладки.
Пример
Большой пример со скриптом на Go, Kafka, ClickHouse и Redpanda Connect можно найти по ссылке.
sl4mmer
А почему просто не использовать табличку на kafkaengine + материализованная вью которая будет вставлять в целевую табличку? В чем конкретный плюс redpanda
perfectgentlemande Автор
>В чем конкретный плюс redpanda
в универсальности, не обязательно использовать с Кафкой и кликхаусом. В статье есть ссылка на то, с чем дружит Редпанда:
https://docs.redpanda.com/redpanda-connect/components/inputs/about/
https://docs.redpanda.com/redpanda-cloud/develop/connect/components/outputs/about/