Данная статья будет полезна тем, кто столкнулся с проблемой интеграции kafka и rabbitmq. Материал не претендует на подробный туториал, но поможет помочь настроить рабочий процесс. Я расскажу, как отправить сообщение в rabbitmq и получить его на стороне kafka, а также обратный процесс, с которым, спойлер, всё оказалось сложнее.
Путешествие «туда и обратно».
Зачем?
Ситуация максимально банальна и проста. Когда разные команды, делают свои проекты с использованием разных message broker’ов, рано или поздно наступает момент, когда появляется необходимость интеграции (и да, я в курсе, что kafka - нечто большее, чем просто mb). Первое, что приходит на ум – это сделать интеграцию через REST, но мы получаем сразу несколько значительных минусов. Например, long pulling. Как поставить на расчет долгий процесс? Как дождаться ответа? Итог: реализации на REST очень далеки от идеала. Внедрение REST так же повлечет танцы с бубном, так как текущая архитектура уже заточена под message broker.
Следующей технологией для интеграции можно рассмотреть grpc. Хороший вариант, но тоже имеет изъян, так как не очень хорошо работает с асинхронными запросами, а нас интересовали именно длительные запросы.
Итак, мы пришли к решению использовать брокер сообщений, но тут появилась главная проблема: одна команда использовала rabbitmq, а другая apache kafka. Первой мыслью было выбрать одну технологию и использовать ее, но оценив трудозатраты по переписыванию pipleline взаимодействия в любой из систем решено было искать альтернативные варианты. Да, конечно, apache kafka – это нечто большее, чем просто брокер сообщений, но в данной ситуации нам требовалась именно эта его функция.
Просматривая интернеты, я наткнулся на интересную статью, в которой описывалась именно та ситуация, в которую я попал. Воодушевленный находкой, я подумал, вот она, «моя прелесть», начал изучать туториал и пытаться развернуть сервис локально.
Был собран и использован контейнер
from kafka-connect:6.2.0
Далее во всех туториалах была обозначена schema-registry
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
container_name: schema-registry
ports:
- 8081:8081
hostname: schema-registry
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://server-name.local:9092
SCHEMA_REGISTRY_CUB_KAFKA_TIMEOUT: 300
Ее я тоже поднял, но в дальнейшем она оказалась для меня бесполезной, ее предназначение – это использование дополнительных сериализаторов, меня полностью устроили дефолтные.
Из kafka в rabbitmq
Следующим этапом была настройка коннекторов. Для успешного создания коннектора нужно сначала подготовить kafka и rabbitmq.
На стороне kafka создаем топик kafka_result. Со стороны rabbitmq нужно создать очередь kafka_to_rabbit, еще понадобиться exchange kafka_to_rabbit_exch и, конечно, выполнить binding (операцию привязки очереди к exchange) с ключом kafka-connectors.
Kafka, в отличие от rabbitmq, не удаляет сообщение из топиков, и для контроля прочитанных сообщений нам необходимы конфигурационные топики, которые наш сервис создаст самостоятельно, нам остается только при старте в окружение добавить названия этих топиков. В некоторых ситуациях создание топиков запрещено извне, поэтому придется обратиться к администраторам kafka.
В нашем случае это
CONNECT_OFFSET_STORAGE_TOPIC _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC _kafka-connect-group-01-status
CONNECT_CONFIG_STORAGE_TOPIC _kafka-connect-group-01-configs
После чего, запускаем наш сервис и выполняем POST запрос.
https://service_addres/connectors
{
"name": "kafka-to-rabbit",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector",
"tasks.max" : "1",
"topics" : "kafka_result",
"rabbitmq.queue" : "kafka_to_rabbit",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "host-rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.exchange": "kafka-to-rabbit-exch",
"rabbitmq.routing.key": "kafka-connectors",
"rabbitmq.delivery.mode": "PERSISTENT",
"confluent.topic.bootstrap.servers":"addres_kafka_server:9092",
"rabbitmq.virtual.host": "/",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
В случае удачного выполнения запроса, вернется 201 ответ. Проверить создавшийся коннектор можно в браузере по тому же адресу (или через get запрос)
https://service_addres/connectors
Можно увидеть массив коннекторов.
Отлично, если все получилось. Отправляем сообщение в топик kafka и получаем это сообщение на стороне rabbitmq, при чем конфигурационные топики будут контролировать, чтобы нам не приходило дубликатов, и сообщения не потерялись.
Прекрасно, но есть один нюанс. Почти все англоязычные туториалы на этом заканчиваются, как будто всех интересует только перепушивание сообщений из kafka rabbitmq.
А обратно то как? Как перепушить сообщение из rabbitmq в kafka?
Из rabbitmq в kafka
Вернемся к созданию нашего коннектора. Там есть строчка.
"connector.class" : "io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector"
RabbitMQSinkConnector – отвечает за перепушивание из rabbitmq в kafka.
Теперь нам нужен RabbitMQSourceConnector. Пытаемся создать новый коннектор и получаем ошибку, что такой коннектор не найден в нашем контейнере. Неприятно.
Решение проблемы заключается в том, что при сборке нашего контейнера нужно дополнительно поставить плагин. Исправляем наш докер файл и добавляем туда строчкуconfluent-hub install --no-prompt --verbose confluentinc/kafka-connect-rabbitmq:1.5.2
Пересобираем контейнер и пробуем создать обратный коннектор
{
"name": " rabbit-to-kafka",
"config": {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"kafka.topic" : " kafka_task",
"rabbitmq.queue" : "rabbitmq_to_kafka",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "hostname",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"kafka_host:9092",
"confluent.topic.replication.factor":1,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Выполняем POST запрос и дожидаемся 201 ответа, проверяем коннекторы и видим успешно создавшийся
Отлично, теперь визуализируем нашу схему взаимодействия rabbitmq с kafka.
1. Отправляем сообщение в очередь rabbit_to_kafka и получаем его в топике kafka_task
2. Отправляем сообщение в топик kafka_result и получаем сообщение в очереди kafka_to_rabbit
В данной статье описано двусторонне взаимодействие, из rabbitMQ в kafka и обратно, в большинстве примеров найденных в сети описаны ситуации одностороннего взаимодействия, я потратил значительное время чтобы соединить все в одно.
Комментарии (6)
makar_crypt
07.04.2022 22:19Кафка конект теперь часть кафки? то я раньше статьи видел там отдельный DOCKER образ для кафки конект поднимался.
SimplExample Автор
08.04.2022 07:35Kafka-connect это отдельный DOCKER-контейнер, который не является частью kafka
dakuan
08.04.2022 09:42Первой мыслью было выбрать одну технологию и использовать ее, но оценив
трудозатраты по переписыванию pipleline взаимодействия в любой из систем
решено было искать альтернативные варианты.А по какому принципу технологии выбирались изначально? Почему-то часто приходится слышать мнение, что Kafka - это вроде как более модный молодежный заменитель RabbitMQ. Но это же совершенно не так. Это разные технологии, созданные для решения разных задач. Если замене одной технологии на другую препятствуют только трудозатраты, то тут явный просчет на этапе проектирования.
SimplExample Автор
08.04.2022 19:29Добрый вечер. Спасибо за комментарий. В одной системе нужна была просто шина данных и поэтому был выбран RabbitMQ, во второй нужно было хранить телеметрию внутри шины поэтому была выбрана Kafka, системы достаточно долго развивались параллельно, до момента возникновения интеграции. Согласен с Вами, что при построения архитектуры в целом нужно учитывать этот момент.
AristarXXXX
Спасибо, бро. Такая же задача была. Смотрел в сторону этого сервиса, но не решился идти по этому пути. Теперь, думаю, попробую.
SimplExample Автор
Пожалуйста, если будут вопросы с внедрением, обращайся!