Было уже достаточно туториалов на тему Rabbit + Spring. В данной статье чуть обновлено описание быстрого запуска очередей. Если ты только начинаешь разбираться с очередями добро пожаловать подcast.
Напишем и настроим минимальный набор для работы с очередями. Нам потребуется
SpringBoot
RabbitMQ
Docker
Запустим сервис RabbitMQ в container Docker
Подготовим docker-compose.yml
services:
rabbitmq:
image: rabbitmq:latest
container_name: rabbitmq
hostname: rabbitmq
restart: unless-stopped
environment:
TZ: Europe/Moscow
RABBITMQ_DEFAULT_USER: rab
RABBITMQ_DEFAULT_PASS: 1234
RABBITMQ_DEFAULT_VHOST: vhost
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- "5672:5672"
- "15672:15672"
5672 - порт для общения
15672 - порт для management (web interface) http://localhost:15672/
sudo docker-compose up -d
(файл docker-compose.yml по умолчанию в текущем каталоге)
или указываем конкретно файл
sudo docker-compose -f docker-compose.yml up -d
Если нет доступа к web-management, то смотрим какой контейнер запущен
sudo docker ps
Подключаемся интерактивно к контейнеру по CONTAINER_ID или NAMES
sudo docker exec -ti a7005451a58e bash
Выводим список подключенных плагинов
rabbitmq-plugins list
Если status [ ] пусто, то включаем следующей командой
rabbitmq-plugins enable rabbitmq_management
Проверяем.
Небольшой хауТо по установке rabbitMQ
https://www.cherryservers.com/blog/how-to-install-and-start-using-rabbitmq-on-ubuntu-22-04
Далее переходим к Java.
Создадим три модуля
Sender - будет отправлять сообщение в очередь
Receiver - будет получать сообщения из очереди 1
Receiver2 - будет получать сообщения из очереди 2
For Sender
dependencies:
SpringWeb
Lombok
RabbitMQ
For Receiver1 and Receiver2
dependencies:
RabbitMQ
Начнем с Sender
Настроим application.properties
Настраиваем подключение к RabbitMQ и порт нашего приложения.
Создадим класс конфигурации MQConfiguration.java
Эту конфигурацию можно было бы игнорировать при условии создания очереди через rabbit-management. Так же мы могли бы обойтись только @Bean Queue. Но получилось бы жиденько и совсем не интересно. Рассмотрим же подробнее данные бобы.
Первый боб Queue позволяет создать очередь если ее нет или подключиться к существующей. Первый параметр это название очереди, второй durable - при значении false если мы перезапускаем сервис сообщения в очереди теряются, при true очередь сохраняет свое состояние.
Второй боб Exchange является точкой обмена, распределяет сообщения между очередями на основе связей (bindings).
Exchange бывают нескольких типов:
Direct Exchange
Topic Exchange
Fanout Exchange
Headers Exchange
Consistent-Hashing Exchange
Более подробно можно ознакомиться https://habr.com/ru/articles/489086/
В данном примере будем рассматривать Topic Exchange который имеет несколько параметров:
name - имя exchange
durable — если установить true, то exchange будет являться постоянным. Он будет храниться на диске и сможет пережить перезапуск сервера/брокера. Если значение false, то exchange является временным и будет удаляться, когда сервер/брокер будет перезагружен
autoDelete — автоматическое удаление. Exchange будет удален, когда будут удалены все связанные с ним очереди
Topic exchange – дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #
В нашем случае мы связываем ключом “first.key”
Третий боб Binding соответственно создает связь между Exchange и Queue.
Далее создадим controller для отправки сообщений
Создадим два endpoints.
Первый будет отправлять сообщение в Exchange с ключом “first.key”, второй с ключом “second.key”. Сообщения отправляются через RabbitTemplate.
Закончили с созданием Sender. Переходим к созданию Receiver1.
Здесь нам потребуется боб Queue для создания очереди если её не существует и собственно главное, создаем метод который подключится к очереди и будет слушать поступающие сообщения.
Метод void listen(String in) достаточно пометить аннотацией @RabbitListener(queues = “queueName”), данный метод будет записывать сообщения в переменную in.
Собственно это все что нужно для Receiver1.
Создадим Receiver2
Создадим файл конфигурации, его нам будет достаточно
Здесь мы создаем вторую Queue, Exchange and Binding с ключем “second.key” , а так же @RabbitListener.
На этом настройка закончена. Будем тестировать.
Запускаем.
В Rabbit-management мы должны увидеть следующее:
Появилась очередь name: testExchange и type: Topic
При клике на нее мы видим ее bindings
Появились связи с двумя очередями и routing keys. Для пущей уверенности зайдем в Queues и убедимся, что очереди создались.
Отправляем сообщение для первой очереди, работаем через Postman
Получили
Отправим во вторую
Все прекрасно отработало.
Если мы хотим отправлять сообщения в обе очереди то достаточно сделать одинаковыми ключи в связке “all.key”
@Bean
Binding binding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("all.key").noargs();
}
Так же если вы, что-то меняете(например тип Exchange), проверяйте в rabbit-management. Т.к. здесь мы не пересоздаем exchange если он уже создан.
Если использовать тип Exchange Fanout вместо Topic то ключ не будет учитываться и сообщения будут отправляться во все очереди.
Комментарии (2)
SilverRid Автор
22.12.2023 06:47Спасибо. Да вполне возможно.
Получаем из Rabbit, отправляем в Kafka через KafkaTemplate. (соответственно перед этим настроить кафку и подключение)
Crimsonland
Спасибо. Интересно.
Возможно ли после вычитки сообщений из rabbitmq, отправить сообщение в kafka? SpringBoot так умеет?