Привет, Хабр! Я Артём Чаадаев, бэкенд-разработчик в Туту. Я занимаюсь разработкой на языке Go в команде ассортимента размещения. Мы отвечаем за работу с контентом отелей и интеграции с поставщиками.

Периодически мы собираем данные для аналитики, и в таких задачах нужно реализовывать схему отправки данных в ClickHouse. В этой схеме мы преобразовываем данные с помощью Redpanda Connect. В статье покажу простой практический пример работы с этим инструментом, который должен помочь тем, кто только начинает его использовать. А еще расскажу про преимущества и проблемы, с которыми мы столкнулись при работе с Redpanda Connect.

Постановка задачи

Команде аналитиков нужны данные по всем отелям из БД в их ClickHouse, с которыми можно работать. Эти данные должны стабильно выгружаться в назначенный период времени. Желательно реализовать логику отправки изменений, а не полных дампов, но это за рамками тематики данной статьи. 

Описание архитектуры

Для решения задачи мы используем следующую архитектуру:

  • MongoDB, которая содержит все данные;

  • сервис сбора данных на Go из MongoDB;

  • Kafka, куда потоком отправляются записи для последующей обработки;

  • Redpanda Connect, который собирает данные из Kafka, преобразует и отправляет дальше в ClickHouse;

  • ClickHouse, с которым работает команда аналитиков.

В MongoDB содержатся все необходимые для сбора данные об отелях. Сервис на Go собирает данные из MongoDB, он может дозировать нагрузку на БД по чтению и партиционировать все данные. Этот же сервис отправляет полученные записи из БД в Kafka. После Redpanda Connect собирает данные и проверяет, соответствуют ли они контракту, который согласован с командой аналитики. Этот контракт представлен в виде JSON-схемы. Дальше Redpanda Connect преобразовывает запись к формату хранения в ClickHouse и отправляет запрос INSERT. 

С этими данными теперь можно работать. В этой статье я сделал фокус именно на работе с Redpanda Connect, который в Туту принят как стандарт для задач доставки данных для аналитики. По этой ссылке можно найти пример, который воспроизводит описанную архитектуру в упрощенном виде: вместо MongoDB и сервиса небольшой скрипт на Go, который отправляет в Kafka данные по отелям. Формат данных в примере тоже сильно упрощен: в боевых условиях он намного более разветвленный.

Как использовать Redpanda Connect

Redpanda Connect — это высокопроизводительный и отказоустойчивый процессор потоковых данных написанный на языке Go. Redpanda Connect способен подключаться к нескольким источникам-брокерам сообщений и осуществлять операции с данными: обогащать, преобразовывать и фильтровать на предмет содержания полезной нагрузки. Инструмент достаточно прост для развертывания и мониторинга: его можно легко запустить бинарным файлом или через Docker и использовать с нужным потоком данных. Также Redpanda Connect включает свой язык маппинга для входных данных — Bloblang.

В нашей схеме Redpanda Connect: 

  • получает данные из Kafka;

  • проверяет их на соответствие JSON-схеме;

  • преобразует данные в JSON-объект, который соответствует схеме в ClickHouse;

  • отправляет полученный объект в ClickHouse.

Все это можно достаточно просто описать в конфигурационном YAML-файле. 

Этот файл состоит из трех частей:

  • вход (input);

  • пайплайн (pipeline);

  • выход (output).

Вход (input)

Здесь описывается источник данных.

Ниже показан фрагмент кода с входом:

input:
  kafka:
    addresses: [localhost:9093]
    topics: [hotels_snapshot]
    consumer_group: go_clickhouse_kafka
    batching:
      count: 1
      period: 5s

Здесь показаны настройки для подключения к Kafka:

  • kafka.addresses — адрес брокера;

  • kafka.topics — список топиков, из которых происходит чтение;

  • kafka.consumer_group — консьюмер-группа (группа идентифицирует клиентов, которые используют топик в многопоточном режиме);

  • kafka.batching.count — количество сообщений, при достижении которого батч отправляется в обработку пайплайном;

  • kafka.batching.period — количество секунд, через которое неполный батч отправляется в обработку независимо от размера.

Полный список настроек можно изучить в документации

Пайплайн (pipeline)

После настройки получения входных данных нужно описать пайплайн их обработки.

Ниже показан фрагмент кода с простейшим пайплайном:

pipeline:
  processors:
    - mapping: |
        root.id = this.id
        root.geo_id = this.geo_id
        root.emails = this.emails
        root.type = this.type
        
        if this.content_ru != null {
             root.content_ru_address = this.content_ru.address
             root.content_ru_name = this.content_ru.name
             root.content_ru_description = this.content_ru.description
        }
       
        root.created_at = this.created_at.ts_format("2006-01-02 15:04:05")

В пайплайне указываются процессоры, которые и формируют логику обработки данных. В данном фрагменте кода задействован процессор mapping. Он преобразовывает данные из полученного входного формата в более плоский JSON-объект, который подходит для импорта в Clickhouse. 

Для перекладывания данных из одного формата в другой процессор использует язык Bloblang. Здесь root — выходной документ, который будет отправляться в ClickHouse, а this — полученный объект. Также в объекте можно проследить использование конструкции if c проверкой содержимого поля на null и использование функции ts_format, которая изменяет дату в подходящий для ClickHouse формат. Подробнее обо всех функциях и языковых конструкциях можно прочитать в документации.

Помимо обработки данных нам нужно проверять, соответствует ли документ согласованной JSON-схеме.

Ниже показан фрагмент кода с использованием валидации cхемы:

pipeline:
  processors:
    - json_schema:
        schema_path: "file://schemas/hotel.json"
    - catch:
        - log:
            level: ERROR
            message: "Schema validation failed due to: ${!error()}"
        - mapping: 'root = deleted()' # Drop messages that fail
    - mapping: |
        root.id = this.id
        root.geo_id = this.geo_id
        root.emails = this.emails
        root.type = this.type
        
        if this.content_ru != null {
             root.content_ru_address = this.content_ru.address
             root.content_ru_name = this.content_ru.name
             root.content_ru_description = this.content_ru.description
        }
       
        root.created_at = this.created_at.ts_format("2006-01-02 15:04:05")

В данном фрагменте с помощью json_schema указан путь к файлу с нужной схемой, а с помощью catch указана обработка ситуации, когда валидация не прошла. В этом случае логгируется ошибка, и сообщение отбрасывается с помощью 'root = deleted()'. Так сделано для упрощения примера, но в боевых условиях мы используем механизм DLQ (Dead letters queue) — сохраняем отброшенные сообщения для последующей обработки.

Выход (output)

Помимо входа и пайплайна нужно описать, куда отправляются обработанные сообщения.

Ниже показан фрагмент кода с выходом:

output:
  http_client:
    url: http://localhost:18123?query=INSERT+INTO+hotels.content+FORMAT+JSONEachRow
    verb: POST
    headers:
      Content-Type: application/json
      Content-Encoding: gzip
    rate_limit: ""
    timeout: 5s
    max_in_flight: 5
    batching:
      count: 1
      period: 5s
      processors:
        - archive:
            format: lines
        - compress:
            algorithm: gzip
            level: 5

Здесь использован выход http_client, который подразумевает отправку HTTP-запроса на нужный сервер. В нашем случае это API ClickHouse.

В параметрах указаны:

  • http_client.url — URL;

  • http_client.verb — HTTP-метод;

  • http_client.headers — HTTP-заголовки, необходимые для взаимодействия с Clickhouse (например, авторизация);

  • http_client.timeout — таймаут клиента;

  • http_client.max_in_flight — максимальное число отправленных батчей сообщений в момент времени, для которых еще не получено подтверждение о получении;

  • http_client.batching.count — количество сообщений, при достижении которого батч отправляется в ClickHouse;

  • http_client.batching.period — количество секунд, через которое неполный батч отправляется в ClickHouse независимо от размера;

  • http_client.batching.processors.archive — процессор, который позволяет архивировать все сообщения батча в одно;

  • http_client.batching.processors.compress — процессор, который сжимает сообщения с помощью алгоритма gzip.

Полный список настроек можно изучить в документации.

Для отладки полезно использовать выход stdout и посмотреть на результаты маппинга данных своими глазами.

Преимущества Redpanda Connect

Не нужно писать полноценный отдельный сервис для сбора данных из Kafka. Достаточно написать простой конфиг со входами, выходами и маппингом и задействовать контейнер с установленным Redpanda Connect. При желании контейнеры можно масштабировать. 

Также есть возможность добавлять разные конфиги и параллельно встраивать туда другие процессы сбора и обработки данных. Redpanda Connect позволяет выбрать подходящие настройки для чтения и для отправки данных, а встроенный язык маппинга — выполнить преобразования. 

Помимо этого есть функциональность для написания юнит-тестов. В примере мы используем Kafka, но Redpanda Connect позволяет в качестве источников выбирать огромное количество вариантов: MongoDB, любой HTTP-клиент или сервер, AMQP-совместимые решения, NATS, Redis, SQL-ные СУБД, csv-файлы и другие. Полный список источников есть в документации. Аналогичная ситуация и с типами выходов. Инструмент достаточно универсальный.

Проблемы, с которыми мы столкнулись

Главная проблема, с которой мы столкнулись, — недостаток экспертизы. Мне как разработчику-бэкендеру нужно было хорошо погружаться в Redpanda Connect, хотя мы в команде не так часто пишем процессы сбора данных для аналитики. 

Это привело к трудностям в отладке: при локальном запуске и в среде отлавливались разные ошибки. Не всегда логи позволяли быстро понять истинную причину и разобраться, почему документ не обрабатывался (и обрабатывался ли вообще) перед отправкой в ClickHouse. 

Эту проблему можно решить, если в команде появится отдельный эксперт по инструменту. Нам в итоге помогли коллеги из Data Team. Теперь при добавлении нового потока данных для аналитики можно быстро обратиться за помощью к эксперту, который еще и может подобрать оптимальные конфигурации для Kafka и ClickHouse. И в целом процессы с потоками данных для аналитики теперь стандартизированы.

Выводы

Redpanda Connect помог нам решить задачу, но по пути возникли трудности, с которыми могут справиться люди с профильной экспертизой в инструменте. 

Проще всего, если в команде есть эксперты, для которых не составит труда написать новый маппинг и по шаблону развернуть Redpanda Connect. Для частых задач сбора аналитики в стандартизированных процессах это отличный универсальный и легко масштабируемый инструмент.

Если же задача по сбору данных разовая, как это было на начальном этапе у нас, а в команде при этом нет человека с профильной экспертизой, то возможно быстрее и эффективнее будет написать свой сервис на нужном языке по своим стандартам логирования и отладки.

Пример

Большой пример со скриптом на Go, Kafka, ClickHouse и Redpanda Connect можно найти по ссылке.

Комментарии (2)


  1. sl4mmer
    19.02.2025 10:40

    А почему просто не использовать табличку на kafkaengine + материализованная вью которая будет вставлять в целевую табличку? В чем конкретный плюс redpanda


    1. perfectgentlemande Автор
      19.02.2025 10:40

      >В чем конкретный плюс redpanda

      в универсальности, не обязательно использовать с Кафкой и кликхаусом. В статье есть ссылка на то, с чем дружит Редпанда:
      https://docs.redpanda.com/redpanda-connect/components/inputs/about/
      https://docs.redpanda.com/redpanda-cloud/develop/connect/components/outputs/about/