Всем привет. Поскольку не смог найти полноценной статьи о том, как с нуля написать свой spring boot сервис с подключением к нему rabbitMQ, с конфигурацией всего это чуда через графический интерфейс и успешной отправкой и получением сообщения из очереди, то решил написать свою статью, что бы облегчить жизнь тем, кто захочет с этим познакомиться. Так же добавлю теоретическую часть, что бы не искать все эту информацию по всем источникам. Я постараюсь своими словами объяснять все, с чем мы будет сталкиваться. Если хотите узнать более подробно, то советую обратиться к официальной документации: Официальный сайт RabbitMQ
Практическая часть будет прерываться на теоретическую, что бы было понимание того, что происходит.
Давайте начнем.
Подготовим наше окружение для работы:
В статье использовался Docker Desktop. Система Windows
-
Первое что нам нужно сделать, это поднять нашу очередь в docker(о том как его поставить к себе на машину, я рассказывать не буду. На хабре огромное количество материала о том, как это сделать). Пишем простейший docker-compose.yml
version: '3' services: localRabbitMQ: image: rabbitmq:3-management-alpine environment: RABBITMQ_DEFAULT_USER: user RABBITMQ_DEFAULT_PASS: password ports: - 5672:5672 - 15672:15672
запускаем через консоль из папки, в которой лежит наш файл командой: docker compose up
Теперь на http://localhost:15672/ должны видеть вот такую картину:
Credentials для входа мы указали в docker-compose.yml (user и password)
-
Создание Exchange. После авторизации нам нужно создать exchange. Параметры, указаны ниже на картинке в Add a new exchange. После нажимаем кнопку Add exchange. Тип указываем direct(это важно) Потому что типа direct, мы можем задать routingKey(ниже описано что это), а для fanout не можем, потому что он пропускает все сообщения.
Теперь немного погрузимся в теорию, что бы было понимание того, что за зверь этот exchange. Общая схема взаимодействия выглядит следующим образом:
Producer - производитель сообщений (отдельное приложение на Java)
Consumer 1, Consumer 2 - потребители сообщений (отдельные приложения)
Exchanges - обменник. Cущность Rabbit, точка входа для публикации всех сообщений.
Binding - связь между Exchange и очередью
Queue - очередь сообщенийВсе сообщения из приложения Producer попадают в Exchanges, после этого обрабатываются на основе binding и routingKey(дальше сделаем это на практике и вы поймете, как это работает), после сообщение попадает в очередь и забирается из нее consumer'ом.
-
Создание очереди. Теперь давайте создадим очередь (Queue) через которую будем передавать сообщения. В процессе работы у вас может фигурировать такая сущность, как virtual host, давайте его мы тоже создадим (потому что в продакшене точно не будет использоваться дефолтный) Заходим в admin, справа выбираем Virtual Hosts и создаем новый, чеhез кнопку add virtual host. Я назвал его cpp.
Далее переходим в Queue и создаем новую очередь. Я назвал ее Queue1
Остался последний шаг, для настройки окружения. Теперь зададим Binding. Он нужен для того, что бы данные, которые попадают в exchange(а туда попадают все сообщения из Producer'a) распределялись по разным очередям (Разные Bingings будут распределять сообщения в разные очереди). Для того, что бы создать Binding заходим в нашу созданную очередь и во вкладке Bindings добавляем новый binging
Теперь все сообщения, которые попадают в наш exchange, который называется testExchange и имеют routingKey с названием testRoutingKey, будут попадать в очередь queue1
-
Мы закончили настройку окружения. Теперь, перейдем к написанию кода. Для этого создадим два простейших приложения на Java с использованием Spring Boot.
Я использую 17 Java. Зависимости: Spring for RabbitMQ, Spring Web
Разработка Producer'a и Consumer'a
Я создам два простейших приложения. Одно назову RabbitMQProducer другое RabbitMQConsumer. Я использую порты: 8086 и 8087 соответственно. Вы можете использовать любые другие.
Код RabbitMQConsumer:
Создам два пакета: config, consumer. В пакете config будет класс RabbitConf, а в пакете consumer класс RabbitMQConsumer
RabbitConf
@Configuration
public class RabbitConf {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
cachingConnectionFactory.setUsername("user");
cachingConnectionFactory.setPassword("password");
cachingConnectionFactory.setVirtualHost("cpp");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("queue1");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("testExchange", true, false);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
RabbitMQConsumer
@Component
@EnableRabbit
public class RabbitMQConsumer {
@RabbitListener(queues = "queue1")
public void processMyQueue(String message) {
System.out.printf("Received from myQueue : %s ", new String(message.getBytes()));
}
}
RabbitMqConsumerApplication
@SpringBootApplication
public class RabbitMqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqConsumerApplication.class, args);
}
}
Код RabbitMQProducer:
Создам 4 пакета: config, controller, model, producer В пакете config будет класс RabbitConf, в пакете controller класс RabbitController, в пакете model класс MessageModel и в пакете producer интерфейс RabbitMQProducerService и его имплементация RabbitMQProducerServiceImpl. Пакеты service и пакет impl в пакете producer создавать не стал (прошу прощения, если кого-то это задело).
RabbitConfig
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
cachingConnectionFactory.setUsername("user");
cachingConnectionFactory.setPassword("password");
cachingConnectionFactory.setVirtualHost("cpp");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("queue");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("testExchange", true, false);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("testRoutingKey");
}
}
RabbitController
@RestController
public class RabbitController {
private final RabbitMQProducerService rabbitMQProducerService;
@Autowired
public RabbitController(RabbitMQProducerService rabbitMQProducerService) {
this.rabbitMQProducerService = rabbitMQProducerService;
}
@GetMapping("/send")
public void sendMessageToRabbit(@RequestBody MessageModel messageModel) {
rabbitMQProducerService.sendMessage(messageModel.getMessage(), messageModel.getRoutingKey());
}
@GetMapping("/health")
public String healthCheck() {
return "OK";
}
}
MessageModel
@Data
public class MessageModel {
private String message;
private String routingKey;
}
RabbitMQProducerService
public interface RabbitMQProducerService {
void sendMessage(String message, String routingKey);
}
RabbitMQProducerServiceImpl
@Service
public class RabbitMQProducerServiceImpl implements RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQProducerServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message, String routingKey) {
rabbitTemplate.convertAndSend("testExchange", routingKey, message);
}
}
RabbitMqProducerApplication
@SpringBootApplication
public class RabbitMqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqProducerApplication.class, args);
}
}
На всякий случай прикрепляю свой pom.xml он одинаковый для обоих проектов.
Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitMQProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitMQProducer</name>
<description>rabbitMQProducer</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
-
Тестирование работоспособности:
При отправке запроса через Postman на наш Producer с параметрами:
{
"message": "just text",
"routingKey": "testRoutingKey"
}
В нашем Consumer'e мы видим следующее:
Таким образом мы видим, что наше сообщение получено из очереди queue1, но если мы передадим в нашем запросе к Producer'у
"routingKey": отличный от значения "testRoutingKey"
, то наш Consumer не получит это сообщение из очереди поскольку он получает только сообщения сroutingKey = testRoutingKey.
На этом можно завершать данную статью, надеюсь что она была вам полезна. и вам стало немного понятнее, как начать свое знакомство с RabbitMQ. А самое главное теперь стало понятно, в каком направлении копать дальше.
Если есть комментарии и замечания, прошу писать. Постараюсь исправить.
Комментарии (8)
grossws
05.12.2022 00:48Посмотрев на первые же листинги немного удивился.
разве автоконфигурация из
spring-boot-starter-amqp
не создаетConnectionFactory
/AmqpAdmin
/RabbitTemplate
?почему
@EnableRabbit
на компоненте (а не на конфигурации)?если хотите health endpoints -- используйте челевоческий
spring-boot-starter-actuator
elfs_shadow
06.12.2022 13:08Минутка занудства: если lombok нужен только для "@Data" не лучше уж использовать рекорды, раз проект на 17й версии?
binakot
06.12.2022 22:02Я бы сказал, если есть рекорды, то лучше вообще отказаться от ломбока.
AndreiVerbitskii Автор
07.12.2022 09:26Спасибо за советы, что-то вылетел из головы рекорд, если честно.
scruff
Сразу на начале завалился. Во-первых docker-compose. Через тире. ERROR: In file './docker-compose.yml', service 'ports' must be a mapping not an array.. Что может быть?
binakot
Раньше docker-compose (через тире) был отдельной утилитой, которую как бинарник отдельно качали и делали симлинк на каталог бинарников. Сейчас docker compose (без тире), начиная с версии 2.0, входит в стандартный пакет докера и ставить отдельно его не надо. По сути все тоже самое, поменяли просто API обращения к Docker Compose в CLI.
По поводу ошибки маппинга портов, уверен на 99%, что проблема в отступах в yaml файле, в статье все съехало. Должно быть так:
binakot
Приложу свой пример 2х летней давности, если Автор не против. По сути ничего не изменилось. Оно и 5 лет назад делалось также :) https://github.com/binakot/Java-Spring-RabbitMQ-Example
AndreiVerbitskii Автор
Код в статье поправил. Действительно, просто поехал yml при переносе. Про docker compose up, @binakot верно подметил. В статье использовался Docker Desktop. Система Windows