Ссылка на проект

Было уже достаточно туториалов на тему Rabbit + Spring. В данной статье чуть обновлено описание быстрого запуска очередей. Если ты только начинаешь разбираться с очередями добро пожаловать подcast.

Напишем и настроим минимальный набор для работы с очередями. Нам потребуется 

  • SpringBoot

  • RabbitMQ

  • Docker

Запустим сервис RabbitMQ в container Docker

Подготовим docker-compose.yml

services:   
  rabbitmq:
  image: rabbitmq:latest
  container_name: rabbitmq 
  hostname: rabbitmq
  restart: unless-stopped
  environment:
  TZ: Europe/Moscow
    RABBITMQ_DEFAULT_USER: rab
    RABBITMQ_DEFAULT_PASS: 1234
    RABBITMQ_DEFAULT_VHOST: vhost
  volumes:
    - ./rabbitmq:/var/lib/rabbitmq    
  ports:
    - "5672:5672"
    - "15672:15672"

5672 - порт для общения 
15672 - порт для management (web interface) http://localhost:15672/

sudo docker-compose up -d 

(файл docker-compose.yml по умолчанию в текущем каталоге)

или указываем конкретно файл

sudo docker-compose -f docker-compose.yml up -d 

Если нет доступа к web-management, то смотрим какой контейнер запущен

sudo docker ps

Подключаемся интерактивно к контейнеру по CONTAINER_ID или NAMES

sudo docker exec -ti a7005451a58e bash

Выводим список подключенных плагинов

rabbitmq-plugins list

Если status [  ]  пусто, то включаем следующей командой

rabbitmq-plugins enable rabbitmq_management

Проверяем.

Небольшой хауТо по установке rabbitMQ 
https://www.cherryservers.com/blog/how-to-install-and-start-using-rabbitmq-on-ubuntu-22-04

Далее переходим к Java.

Создадим три модуля 

  • Sender - будет отправлять сообщение в очередь

  • Receiver - будет получать сообщения из очереди 1

  • Receiver2 - будет получать сообщения из очереди 2

For Sender 

dependencies:

  • SpringWeb

  • Lombok

  • RabbitMQ

For Receiver1 and Receiver2

dependencies:

  • RabbitMQ

Начнем с Sender

Настроим application.properties

application.properties
application.properties

Настраиваем подключение к RabbitMQ и порт нашего приложения.

Создадим класс конфигурации MQConfiguration.java

MQConfiguration.java
MQConfiguration.java

Эту конфигурацию можно было бы игнорировать при условии создания очереди через rabbit-management. Так же мы могли бы обойтись только @Bean Queue. Но получилось бы жиденько и совсем не интересно. Рассмотрим же подробнее данные бобы. 

Первый боб Queue позволяет создать очередь если ее нет или подключиться к существующей. Первый параметр это название очереди, второй durable - при значении false если мы перезапускаем сервис сообщения в очереди теряются, при true очередь сохраняет свое состояние.

Второй боб Exchange является точкой обмена, распределяет сообщения между очередями на основе связей (bindings). 

Exchange бывают нескольких типов:

  • Direct Exchange

  • Topic Exchange

  • Fanout Exchange

  • Headers Exchange

  • Consistent-Hashing Exchange

Более подробно можно ознакомиться https://habr.com/ru/articles/489086/

В данном примере будем рассматривать Topic Exchange который имеет несколько параметров:

  • name - имя exchange

  • durable — если установить true, то exchange будет являться постоянным. Он будет храниться на диске и сможет пережить перезапуск сервера/брокера. Если значение false, то exchange является временным и будет удаляться, когда сервер/брокер будет перезагружен

  • autoDelete — автоматическое удаление. Exchange будет удален, когда будут удалены все связанные с ним очереди

Topic exchange – дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #

В нашем случае мы связываем ключом “first.key

Третий боб Binding соответственно создает связь между Exchange и Queue.

Далее создадим controller для отправки сообщений

SendController
SendController

Создадим два endpoints. 

Первый будет отправлять сообщение в Exchange с ключом “first.key”, второй с ключом “second.key”. Сообщения отправляются через RabbitTemplate.

Закончили с созданием Sender. Переходим к созданию Receiver1.

application.properties
application.properties
ReceiverApplication.java
ReceiverApplication.java

Здесь нам потребуется боб Queue для создания очереди если её не существует и собственно главное, создаем метод который подключится к очереди и будет слушать поступающие сообщения.
Метод void listen(String in) достаточно пометить аннотацией @RabbitListener(queues = “queueName”), данный метод будет записывать сообщения в переменную in.
Собственно это все что нужно для Receiver1.

Создадим Receiver2

application.properties
application.properties

Создадим файл конфигурации, его нам будет достаточно

MQConfiguration
MQConfiguration

Здесь мы создаем вторую Queue, Exchange and Binding с ключем “second.key” , а так же @RabbitListener.

На этом настройка закончена. Будем тестировать. 

Запускаем.

В Rabbit-management мы должны увидеть следующее:

Появилась очередь name: testExchange и type: Topic
При клике на нее мы видим ее bindings 

Появились связи с двумя очередями и routing keys. Для пущей уверенности зайдем в Queues и убедимся, что очереди создались.

Отправляем сообщение для первой очереди, работаем через Postman

Получили 

Отправим во вторую 

Все прекрасно отработало.

Если мы хотим отправлять сообщения в обе очереди то достаточно сделать одинаковыми ключи в связке “all.key” 

@Bean 
Binding binding(Queue queue, Exchange exchange) {   
  return BindingBuilder.bind(queue).to(exchange).with("all.key").noargs(); 
} 

Так же если вы, что-то меняете(например тип Exchange), проверяйте в rabbit-management. Т.к. здесь мы не пересоздаем exchange если он уже создан.

Если использовать тип Exchange Fanout вместо Topic то ключ не будет учитываться и сообщения будут отправляться во все очереди.

Ссылка на проект

Комментарии (2)


  1. Crimsonland
    22.12.2023 06:47

    Спасибо. Интересно.

    Возможно ли после вычитки сообщений из rabbitmq, отправить сообщение в kafka? SpringBoot так умеет?


  1. SilverRid Автор
    22.12.2023 06:47

    Спасибо. Да вполне возможно.

    Получаем из Rabbit, отправляем в Kafka через KafkaTemplate. (соответственно перед этим настроить кафку и подключение)