Иногда legacy живёт в компании годами не потому, что он плох, а потому что «работает — не трогай». Но однажды появляется триггер, который заставляет переосмыслить подход. В нашем случае таким триггером стала миграция на Kafka 4.0.

Привет! Меня зовут Роман, я инженер данных в компании CDEK и занимаюсь разработкой платформы данных и внедрением self‑service инструментов. В этой статье расскажу, как мы обеспечиваем Event Sourcing подход в платформе больших данных, с какой болью столкнулись при переходе на Kafka 4.0 и как решились отказаться от JSON‑формата.

Подробно про эволюцию платформы данных в нашей компании можно прочесть в статье коллеги, но многое изменилось за последнее время.

Содержание

Потоковый ingest: как было

Начнём с того: а откуда данные вообще попадают в BI.Фактически в CDEK основная ESB‑шина межмодульного взаимодействия происходит через кластер RabbitMQ. BI получает ивенты и держит буфер в кафке. Какого‑то явного хайлоада здесь нет, мы слушаем порядка 200 обменников со средним MPS 2–5k сообщений.

Далее данные пишем через Spark Structured Streaming (довольно нестандартно, да) в S3 для разных флоу. Основной флоу, конечно, происходит через основное хранилище в виде Greenplum. На стороне хранилища ивенты либо пишутся все, либо сохраняется лишь последний стейт.

Другой флоу для быстрой аналитики работает по такой же схеме, только данные пишутся в S3 как полноценные Iceberg‑таблицы. Данные глубиной до месяца кэшируются из хранилища, и Spark раз в 15–60 минут строит витрины данных и доставляет их в ClickHouse для конечного пользователя.

Ну и самый быстрый флоу — для плоских витрин, не требующих тяжелых JOIN‑операций: просто цепляемся к кафка‑топику через Kafka Engine кликхауса и сбрасываем данные раз в 1–5 минут.

На первый взгляд, всё красиво и прекрасно, но дьявол кроется в деталях.

Минусы решения на Camel Kafka Connect и переезд в новый ЦОД

Первая проблема заключалась в том, как забирались данные из RabbitMQ в кафку. Мы использовали открытый Camel Rabbitmq Source Connector, и здесь я остановлюсь подробнее.

В декабре CDEK переезжал в новый ЦОД, и со стороны инфраструктуры было много обновлений. Требовалось поднять версию кафки до 4.0, но сюрприз‑сюрприз: мы не могли просто так взять и натянуть бинарные файлы Camel‑коннектора на свежую кафку. Основная проблема хорошо описана в этом issue. Фактически мы тоже пересобрали Camel Kafka Connect, но такой подход выглядел как сидение на пороховой бочке, и любое минорное обновление кафки могло всё нам поломать. Кстати, похоже, что в предстоящих релизах Kafka 4.0 всё‑таки будет поддерживаться. Помимо этого, в Camel‑коннекторах концептуально SMT слишком ограничены.

Данные в кафке мы получали в виде JSON без всякой валидации и schema registry. Условно любой модуль мог отправить какое‑нибудь битое сообщение, и мы это отлавливали только на этапе загрузки в Greenplum или вообще на слоях выше, через DQ‑валидацию. Сами данные загружались в DWH (Greenplum) исключительно визуально по примерам JSON, которые пришли из публикации.

В общем, основной нашей задачей стало отказаться от Camel‑коннектора и решить: а хотим ли мы потратить ресурсы на адекватный инжест?

Чем заменить Camel Kafka Connect

Сели и стали думать: а что интересного есть на рынке.

• Spark Structured Streaming — поддержки AMQP из коробки нет. Поддерживать зависимости на тяжелом спарке — звучит больно.

• Flink — выпилили поддержку AMQP в свежих версиях. Как бы коннектор то есть, но методов под него уже нет.

• Kafka connect — RabbitMQ‑коннектор только проприетарный, за деньги. Пилить свой коннектор — уже звучит как решение «не из коробки».

Пришли к выводу, что быстро взять с рынка и заменить Camel‑коннектор особо то и не чем. В любом случае придётся, что‑то разрабатывать на своей стороне. Тогда начали отталкиваться от обратного: почему бы нам не слушать ESB‑шину так, как это делают все микросервисы в компании?

Так родилась идея просто написать отдельный модуль на Spring. Но раз уж пишем отдельный модуль, давайте выделим ресурсы и на закрытие тех болячек, которые преследовали нас годы с Camel‑коннектором.

Сформулировали, а чего именно мы хотим. Получились такие критерии:

1. Быстрая интеграция с новыми источниками. В Camel‑коннекторе было всё просто: добавляем в YAML эксчейндж, который нужно слушать, запускаем управляющий процесс из Airflow, и вуаля, JSON‑ы льются в кафку, минимальный time to market. Спойлер: минимальный time to market сохранить не удалось.

2. Отказ от JSON‑формата. Хотелось бы иметь бинарный формат protobuf или avro для большей пропускной способности по сетке и дискам. Ну и заодно уменьшить стоимость буферизации данных на стороне кафки.

3. Реестр схем. Если уж будет бинарный формат, то логично, что должен появиться и реестр схем. Битые сообщения вообще не должны попадать в кафку, а у аналитиков и ML появится возможность изначально знать, что за публикация, какие в ней есть атрибуты и что конкретно нужно для аналитики.

4. Confluent‑совместимость. Если будут схемы, они должны быть Confluent‑совместимы.

5. Мониторинг и алертинг. Хотим знать, как минимум MPS, какие сообщения не попали в кафку и по какой причине.

Стандартный набор требований для адекватного инжеста.

Свой ingest-сервис на Spring

Сели писать свой микросервис и начали думать, как закрыть первое требование. По сути, всё получилось банально просто. Нам нужно было как‑то сгенерировать биндинг, чтобы инженер, который не близок к Java, тоже мог интегрировать любой источник. То есть для каждого источника ему просто было достаточно описать (как раньше) откуда и куда доставлять данные в YAML‑формате. Эти конфиги подхватывает динамический генератор Spring Cloud Stream. Всё, что нужно указать инженерам — название бина, путь до схемы, обменник в RabbitMQ и топик кафки. Ну и при необходимости переопределить количество консьюмеров и наличие prefetch для нагруженных очередей.

app:
  bridge-configs:
    some-object:
      bean: "someObjectProcessor"
      schema-path: "avro_schemas/some_object.avsc"
      output-binding: "someObjectPublisher-out-0"
      properties:
        maxRetryCount: 10

spring:
  cloud:
    stream:
      bindings:
        someObjectProcessor-process-in-0:
          destination: some.object
          binder: rabbit
        someObjectPublisher-out-0:
          destination: ingest-module_some-object_v1
          binder: kafka
          content-type: application/*+avro
          producer:
            use-native-encoding: true

По второму требованию мы просто добавили отдельный класс BytesConverter. На входе он получает сырое JSON‑сообщение и согласно схеме сериализует сообщение в формат Avro. То есть прямо проходимся по всем полям сообщения, мапим типы данных в Avro, а также фиксируем ключ партицирования для паблишера. Опять же, человек, незнакомый с Java, не должен задумываться, а что происходит под капотом, подключать клиентские библиотеки и описывать поля в коде.

Третье требование закрыли следующим образом: решили хранить схемы Avro прямо в репозитории и пушить их в Apicurio (уже был развернут в компании под openAPI). Плюс сделали ряд правил формирования схем. Составили шпаргалку типов Java с маппингом к типам Avro. Естественно, оставили комментарии на все поля и в doc приложили ссылку на DTO. Кстати, выбрали именно Avro из‑за простой поддержки со стороны Spark Structured Streaming.

Четвертый пункт — это уже был патчинг настроек кафка‑биндера и Apicurio. Мы использовали стандартный Confluent wire format: 1 байт magic byte + 4 байта schema ID (в нашей версии Apicurio он неиронично назывался «legacy», в следующих версиях перестал помечаться как legacy). Ну и конечно, topicIdStrategy для полной управляемости схема=топик. По сути, Confluent‑совместимость нам нужна только для Kafbat UI, так как аналитики ходят в UI смотреть, что публикуется.

            configuration:
              key.serializer: org.apache.kafka.common.serialization.StringSerializer
              value.serializer: io.apicurio.registry.serde.avro.AvroKafkaSerializer
              apicurio:
                registry:
                  auto-register: true
                  artifact-resolver-strategy: io.apicurio.registry.serde.strategy.TopicIdStrategy
                  id-handler: io.apicurio.registry.serde.Legacy4ByteIdHandler
                  check-period-ms: 86400000
                  retry-backoff-ms: 5000
                  retry-count: 10

Пятый пункт просто шёл из коробки шаблона модуля. У нас уже был даже шаблон дашборда для Grafana, в котором фиксировали общую нагрузку на поды кубера, количество отправленных сообщений (успешных и неуспешных), MPS, DLQ‑очереди.

Бонусом добавили в CI‑интеграцию с Backstage. Каждый новый биндинг фиксируется на существующий компонент (RabbitMQ‑обменник) модуля. Таким образом, про нас точно не забудут при изменении контракта.

Итого весь флоу разработчика выглядит следующим образом:
Написал YAML, описал модель данных в схеме Avro (через DTO или с помощью разработчика клиентской библиотеки), положил тестовое сообщение в виде JSON, всё это протестировал локально и прогнал CI/CD на валидность схемы. Далее отправил на препрод — на проверку с работающими публикациями. Ну и финальный этап — деплой в продакшн.

Со стороны модуля всё работало максимально примитивно: Модуль при старте кэширует схемы из репозитория модуля → регистрирует схемы в Apicurio → создаёт очереди в RabbitMQ и получает сырые сообщения → прогоняет их через BytesConverter → сообщения уходят в Kafka Publisher с обязательным условием acks=all → Ack в RabbitMQ отправляем только после успешной публикации в Kafka → неуспешные сообщения мы отправляем в DLQ‑очередь и пытаемся их обработать повторно.

Сколько ресурсов затратили

Процесс создания микросервиса и интеграции с дата‑платформой у нас занял 4 рабочих дня двумя сотрудниками. И огромные 600 человеко‑часов исключительно на описание всего и вся в Avro‑схемы.

Сразу всплыли проблемы по обработке тех типов данных, которые мы изначально не заложили в BytesConverter, а сложные классы довольно нелегко было описать в Avro. В процессе мы делали много ошибок, но набивали шишки. Всё-таки не‑Java‑разработчикам пришлось немного разбираться в клиентских библиотеках модулей.

С плоскими структурами всё было отлично, но проблемы начались со сложными многоуровневыми вложенными классами. А некоторые источники вообще могли отправить record из разряда {data: any}, что в принципе невозможно было описать: сегодня там строка, завтра — объект, послезавтра — массив. Тут начиналась настоящая боль для инженеров. Хорошо, что таких объектов было буквально пара из всех обменников, и пришлось «костылять» их record‑класс как JSON‑строку в string‑поле Avro.

Ещё пару шишек набили в процессе эксплуатации. Оказывается, Spring вполне может переопределить acks в 1 на биндере, и есть такая штука как dynamic‑destination‑cache‑size, которая по дефолту = 10.

В течение переходного периода мы писали данные и в старый топик через Camel‑коннектор, и в новый через Avro‑сериализацию. Таким образом, при любых ошибках всегда можно было откатиться, а при полном успехе и успешной DQ‑валидации — удалить Camel‑коннектор и топик кафки.

Немного итогов

В итоге спустя два месяца у нас получился полноценный адекватный инжест. Для быстрой аналитики все схемы уже описаны, и создание нового отчёта происходит без дополнительных «приседаний» типа описания полей для Iceberg‑таблицы.

Spark Structured Streaming кэширует схемы из Apicurio по topicIdStrategy, срезаем первые байты в сообщении и пишем в S3. Единственный минус, что Spark в принципе не поддерживает эволюцию схем при чтении топиков. Есть, конечно, плагины типа ABRiS, но полностью он потребности не закрывает, а то, что вроде как считается BACKWARD‑совместимым, на практике Spark просит включить PERMISSIVE‑режим, который заполнит все колонки нулами.

Таким образом, мы вообще пока не используем проверку совместимости схем. Любое изменение схемы сопровождается созданием нового топика (v2, v3 и так далее). Спарк дочитывает старый топик, и мы переключаемся на новый. Не совсем хороший подход. Возможно, будем думать в сторону Kafka Connect S3 Sink.

Пропускная способность заметно увеличилась, а данные на дисках кафки стали занимать на 40–80% меньше в зависимости от количества полей в схеме.

Добавлю, что это решение в целом поможет быстро перейти на какое‑нибудь новомодное Lakehouse‑решение в будущем.

На данный момент активно пушим решение на продуктовые команды, чтобы дать им возможность быстро интегрироваться с платформой через собственные схемы Avro. Модули, не участвующие в бизнес‑процессах (например, веб‑метрика сервисов CDEK), уже подхватили текущее решение и реализуют публикацию нам напрямую в кафку по аналогичным правилам.

Задавайте вопросы в комментариях — с удовольствием отвечу. И интересно почитать про ваш опыт в миграциях на бинарный формат!

Комментарии (0)