Было уже достаточно туториалов на тему Rabbit + Spring. В данной статье чуть обновлено описание быстрого запуска очередей. Если ты только начинаешь разбираться с очередями добро пожаловать подcast.
Напишем и настроим минимальный набор для работы с очередями. Нам потребуется
SpringBoot
RabbitMQ
Docker
Запустим сервис RabbitMQ в container Docker
Подготовим docker-compose.yml
services:
rabbitmq:
image: rabbitmq:latest
container_name: rabbitmq
hostname: rabbitmq
restart: unless-stopped
environment:
TZ: Europe/Moscow
RABBITMQ_DEFAULT_USER: rab
RABBITMQ_DEFAULT_PASS: 1234
RABBITMQ_DEFAULT_VHOST: vhost
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- "5672:5672"
- "15672:15672"
5672 - порт для общения
15672 - порт для management (web interface) http://localhost:15672/
sudo docker-compose up -d
(файл docker-compose.yml по умолчанию в текущем каталоге)
или указываем конкретно файл
sudo docker-compose -f docker-compose.yml up -d
Если нет доступа к web-management, то смотрим какой контейнер запущен
sudo docker ps
![](https://habrastorage.org/getpro/habr/upload_files/840/716/70d/84071670d107a0337f6a50f192c314e6.png)
Подключаемся интерактивно к контейнеру по CONTAINER_ID или NAMES
sudo docker exec -ti a7005451a58e bash
Выводим список подключенных плагинов
rabbitmq-plugins list
Если status [ ] пусто, то включаем следующей командой
rabbitmq-plugins enable rabbitmq_management
![](https://habrastorage.org/getpro/habr/upload_files/a8d/cd2/a80/a8dcd2a8043da44a7f6bfcb86f7989ab.png)
Проверяем.
![](https://habrastorage.org/getpro/habr/upload_files/b3a/624/3ea/b3a6243eaa336681f28f7f4ae3e30117.png)
![](https://habrastorage.org/getpro/habr/upload_files/acb/544/506/acb544506635282a4804b67b3d5e459c.png)
Небольшой хауТо по установке rabbitMQ
https://www.cherryservers.com/blog/how-to-install-and-start-using-rabbitmq-on-ubuntu-22-04
Далее переходим к Java.
Создадим три модуля
Sender - будет отправлять сообщение в очередь
Receiver - будет получать сообщения из очереди 1
Receiver2 - будет получать сообщения из очереди 2
For Sender
dependencies:
SpringWeb
Lombok
RabbitMQ
![](https://habrastorage.org/getpro/habr/upload_files/9a3/059/0a3/9a30590a3988415278aaa895aae00172.png)
For Receiver1 and Receiver2
dependencies:
RabbitMQ
![](https://habrastorage.org/getpro/habr/upload_files/2c6/ce4/0d5/2c6ce40d5cc664bd43776c2dd5bbe2b5.png)
Начнем с Sender
Настроим application.properties
![application.properties application.properties](https://habrastorage.org/getpro/habr/upload_files/329/3db/d82/3293dbd829e724039d65fc77e6e48eee.png)
Настраиваем подключение к RabbitMQ и порт нашего приложения.
Создадим класс конфигурации MQConfiguration.java
![MQConfiguration.java MQConfiguration.java](https://habrastorage.org/getpro/habr/upload_files/8c3/163/545/8c31635456f9084a0b1ae9fdf1bb806e.png)
Эту конфигурацию можно было бы игнорировать при условии создания очереди через rabbit-management. Так же мы могли бы обойтись только @Bean Queue. Но получилось бы жиденько и совсем не интересно. Рассмотрим же подробнее данные бобы.
Первый боб Queue позволяет создать очередь если ее нет или подключиться к существующей. Первый параметр это название очереди, второй durable - при значении false если мы перезапускаем сервис сообщения в очереди теряются, при true очередь сохраняет свое состояние.
Второй боб Exchange является точкой обмена, распределяет сообщения между очередями на основе связей (bindings).
Exchange бывают нескольких типов:
Direct Exchange
Topic Exchange
Fanout Exchange
Headers Exchange
Consistent-Hashing Exchange
Более подробно можно ознакомиться https://habr.com/ru/articles/489086/
В данном примере будем рассматривать Topic Exchange который имеет несколько параметров:
name - имя exchange
durable — если установить true, то exchange будет являться постоянным. Он будет храниться на диске и сможет пережить перезапуск сервера/брокера. Если значение false, то exchange является временным и будет удаляться, когда сервер/брокер будет перезагружен
autoDelete — автоматическое удаление. Exchange будет удален, когда будут удалены все связанные с ним очереди
Topic exchange – дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #
В нашем случае мы связываем ключом “first.key”
Третий боб Binding соответственно создает связь между Exchange и Queue.
Далее создадим controller для отправки сообщений
![SendController SendController](https://habrastorage.org/getpro/habr/upload_files/0bc/5c4/11d/0bc5c411da66b378691575a8d127c20b.png)
Создадим два endpoints.
Первый будет отправлять сообщение в Exchange с ключом “first.key”, второй с ключом “second.key”. Сообщения отправляются через RabbitTemplate.
Закончили с созданием Sender. Переходим к созданию Receiver1.
![application.properties application.properties](https://habrastorage.org/getpro/habr/upload_files/de2/e3f/e40/de2e3fe40bb4398acef083aeedeaefb1.png)
![ReceiverApplication.java ReceiverApplication.java](https://habrastorage.org/getpro/habr/upload_files/b92/73d/d33/b9273dd33cd3364a1f09bff452f28428.png)
Здесь нам потребуется боб Queue для создания очереди если её не существует и собственно главное, создаем метод который подключится к очереди и будет слушать поступающие сообщения.
Метод void listen(String in) достаточно пометить аннотацией @RabbitListener(queues = “queueName”), данный метод будет записывать сообщения в переменную in.
Собственно это все что нужно для Receiver1.
Создадим Receiver2
![application.properties application.properties](https://habrastorage.org/getpro/habr/upload_files/63c/1e1/9a9/63c1e19a9124248098d69dd4d00dc850.png)
Создадим файл конфигурации, его нам будет достаточно
![MQConfiguration MQConfiguration](https://habrastorage.org/getpro/habr/upload_files/024/bd3/53a/024bd353a539908b70264c8e2dc901a6.png)
Здесь мы создаем вторую Queue, Exchange and Binding с ключем “second.key” , а так же @RabbitListener.
На этом настройка закончена. Будем тестировать.
Запускаем.
В Rabbit-management мы должны увидеть следующее:
![](https://habrastorage.org/getpro/habr/upload_files/d06/008/b60/d06008b6026906cd5bf3248a1bf47a50.png)
Появилась очередь name: testExchange и type: Topic
При клике на нее мы видим ее bindings
![](https://habrastorage.org/getpro/habr/upload_files/7ae/c87/d08/7aec87d08ae06f0580c11462422a89a6.png)
Появились связи с двумя очередями и routing keys. Для пущей уверенности зайдем в Queues и убедимся, что очереди создались.
![](https://habrastorage.org/getpro/habr/upload_files/127/5b8/e97/1275b8e97b8ae3b985acc869fdfe060b.png)
Отправляем сообщение для первой очереди, работаем через Postman
![](https://habrastorage.org/getpro/habr/upload_files/bd3/a0f/770/bd3a0f770178ef6af1eef526c1696521.png)
![](https://habrastorage.org/getpro/habr/upload_files/dae/fcc/b37/daefccb37b4fd5cc58c19643081f15bd.png)
Получили
Отправим во вторую
![](https://habrastorage.org/getpro/habr/upload_files/18c/144/d3d/18c144d3ded2cb307c493f5619475c56.png)
![](https://habrastorage.org/getpro/habr/upload_files/92a/296/63a/92a29663afa5fd89dbfad9e96f87730f.png)
Все прекрасно отработало.
Если мы хотим отправлять сообщения в обе очереди то достаточно сделать одинаковыми ключи в связке “all.key”
@Bean
Binding binding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("all.key").noargs();
}
Так же если вы, что-то меняете(например тип Exchange), проверяйте в rabbit-management. Т.к. здесь мы не пересоздаем exchange если он уже создан.
Если использовать тип Exchange Fanout вместо Topic то ключ не будет учитываться и сообщения будут отправляться во все очереди.
![](https://habrastorage.org/getpro/habr/upload_files/17e/1dd/f01/17e1ddf013fd62a3c0bfc075ced666a0.png)
Комментарии (2)
SilverRid Автор
22.12.2023 06:47Спасибо. Да вполне возможно.
Получаем из Rabbit, отправляем в Kafka через KafkaTemplate. (соответственно перед этим настроить кафку и подключение)
Crimsonland
Спасибо. Интересно.
Возможно ли после вычитки сообщений из rabbitmq, отправить сообщение в kafka? SpringBoot так умеет?