Понять статью не составит труда тем, кто знаком с Spring и Spring Web и хотя бы раз создавал простое приложение с контроллерами, сервисами и моделями (проще говоря - реализовывал паттерн Model View Controller).
С чего всё начиналось
По работе ко мне пришли с предложением начать разработку небольшого проекта с использованием RabbitMQ в связке с Spring Framework. До того момента я только лишь читал о RabbitMQ и с очередями сообщений особо не работал, так что часть своих выходных решил потратить на изучение данной технологии и её применениях.
О RabbitMQ вкратце
RabbitMQ - брокер сообщений. Данная технология позволяет вести асинхронную обработку данных, а также делать микросервисы слабосвязанными, что может облегчить разработку. RabbitMQ можно сравнить с почтой, куда приходят письма. Если письмо пришло на почту, то его можно прочитать в любое удобное время. Также и в RabbitMQ: пришедшее письмо микросервис сможет забрать в любой момент. Тем самым можно ослабить нагрузку на микросервис, ведь "письма" он сможет обрабатывать в своём темпе.
Сообщения отправляются в очереди, где они хранятся до момента прочтения. После прочтения сообщение удаляется. Дальнейшая его судьба определяется сервисом, прочитавшим сообщение. Также сообщение обладает некоторыми атрибутами, в зависимости от которых определяется, в какую очередь оно попадёт. Данные атрибуты задаются разработчиком. Для примера атрибутом может служить название очереди или topic. Подробнее о атрибутах и обмене сообщениями по ним стоит прочитать на официальном сайте: https://www.rabbitmq.com/tutorials/amqp-concepts.
Для более полного понимания приведу для примера следующую схему:
На схеме P - producer, сервис отправляющий сообщения. X - маршрутизатор сообщений (он определяет в какую очередь попадёт сообщение), Q1, Q2, Q3 - очереди, C1, C2, C3 - потребители. Схема взаимосвязи микросервисов не обязательно такая, как указана на изображении. У одной очереди вполне может быть несколько продюсеров и несколько потребителей, всё зависит от самой архитектуры микросервисов.
Для получения полной информации о RabbitMQ стоит посетить официальный сайт: https://www.rabbitmq.com/. В данной же статье будет описание простого проекта на Java с двумя микросервисами, взаимодействующими посредством RabbitMQ.
Развёртывание RabbitMQ
Первым делом я взялся за развёртывание RabbitMQ. Благо для этого на Windows нужен лишь предустановленный Docker и интернет. Чтобы поднять контейнер с RabbitMQ на своём компьютере необходимо выполнить следующую команду:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
После успешного выполнения этой команды перейдя по ссылке http://localhost:15672 можно увидеть следующую картину:
По дефолту логин и пароль: guest
После авторизации можно увидеть следующую чудесную картину:
При должном знании английского разобраться в UI не составит труда.
UI предоставляет всю информацию о состоянии RabbitMQ, о её очередях, пользователях, использовании, готовых к обработке сообщений и так далее...
Дальше перейдём к написанию проекта
RabbitMQProducer
Данный сервис отправляет сообщения в одну из двух очередей RabbitMQ.
Для начала подключим необходимые библиотеки:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>RabbitMQProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQProducer</name>
<description>RabbitMQProducer</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Среди них Spring Web для написания REST, Spring RabbitMQ для работы с RabbitMQ и Srping Openapi для подключения Swagger.
Сперва сконфигурируем наше приложение с помощью файла application.properties:
spring.application.name=RabbitMQProducer
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
server.port=8081
rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue
Под spring.rabbitmq указываем подключения к rabbitmq, server.port для того, чтобы открыть веб приложение на определённом порту (нужно учитывать, что он может быть попросту занят другим приложением). Две последних строки указывают названия очередей (де-факто их можно указать и напрямую в Java Code, но такой подход не совсем правильный, как мне кажется, да и менять названия очередей затем проще из application.properties, чем потом копаться в коде и менять там).
Затем укажем конфигурации для очередей:
package org.example.rabbitmqproducer.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.queue_with_delay.name}")
private String queueWithDelayName;
/**
* Конфигурируем очередь
* @return Очередь RabbitMQ
*/
@Bean
public Queue queue() {
return new Queue(queueName, false);
}
@Bean
public Queue queue2() {
return new Queue(queueWithDelayName, false);
}
}
Здесь с помощью аннотации @Value забираем значения из application.properties.
Далее напишем небольшой сервис, который будет просто посылать сообщения в брокер:
package org.example.rabbitmqproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MessageService {
private final RabbitTemplate template;
@Value("${rabbitmq.queue.name}")
private String messageQueue;
@Value("${rabbitmq.queue_with_delay.name}")
private String messageWithDelayQueue;
@Autowired
public MessageService(RabbitTemplate template) {
this.template = template;
}
/**
* Отправляет сообщение в очередь
* @param message Сообщение
*/
public void sendMessage(String message) {
template.convertAndSend(messageQueue, message);
}
/**
* Посылает сообщение в очередь, которая обрабатывается слушателем с некоторой задержкой
* @param message Сообщение
*/
public void sendMessageToQueueWithDelay(String message) {
template.convertAndSend(messageWithDelayQueue, message);
}
}
И теперь напишем контроллер, где метод POST по пути /message отправит сообщение в message_queue очередь, а метод POST по пути /message/with_delay отправит сообщение в очередь delay_queue:
package org.example.rabbitmqproducer.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.example.rabbitmqproducer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@Tag(name = "Message Controller", description = "Controller to send message to RabbitMQ")
@RequestMapping("/message")
public class MessageController {
private final MessageService service;
@Autowired
public MessageController(MessageService service) {
this.service = service;
}
@Operation(description = "Send message to RabbitMQ")
@ApiResponse(responseCode = "204")
@PostMapping
public ResponseEntity<Void> sendMessage(String message) {
service.sendMessage(message);
return ResponseEntity.noContent().build();
}
@Operation(description = "Send message to RabbitMQ in queue with delay")
@ApiResponse(responseCode = "204")
@PostMapping("/with_delay")
public ResponseEntity<Void> sendMessageWithDelay(String message) {
service.sendMessageToQueueWithDelay(message);
return ResponseEntity.noContent().build();
}
}
Запустим приложение.
Если всё правильно написано и сконфигурировано, то по ссылке http://localhost:8081/swagger-ui/index.html#/ можно увидеть поднятый Swagger:
Воспользуемся эндпоинтом /message два раза и эндпоинтом /message/with_delay один раз:
Здесь мы можем увидеть, что в очереди пришли сообщения. Теперь обработаем их.
RabbitMQConsumer
Данный сервис отвечает за обработку сообщений из очереди.
Подключим нужные библиотеки:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>RabbitMQConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RabbitMQProducer</name>
<description>RabbitMQConsumer</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Сконфигурируем в application.properties:
spring.application.name=RabbitMQConsumer
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
server.port=8082
rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue
Здесь из Spring библиотек нужна только Spring RabbitMQ
Напишем слушатель сообщений:
package org.example.rabbitmqconsumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class RabbitMQListener {
private final static Random RANDOM = new Random();
@RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue.name')}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
@RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_delay.name')}")
public void receiveMessageWithDelay(String message) throws InterruptedException {
var delay = RANDOM.nextInt(10000);
Thread.sleep(delay);
System.out.println("Received message: " + message + " with delay: " + delay);
}
}
Особым образом здесь подтягиваются названия очередей из application.properties.
В методе receiveMessage сообщение сразу печатается в консоле. В методе receiveMessageWithDelay симулируется задержка до 10 секунд. Можно увидеть, что сообщения постепенно вытаскиваются из очереди delay_queue.
Запустим приложение и увидим вывод в консоле:
Приложение запустилось и само забрало из очереди сообщение. Можно ещё "побаловаться" и поотправлять сообщений. Посмотрим, что в мониторинге очереди delay_queue RabbitMQ:
Также на message_queue:
Видим разницу. В message_queue сообщения не задерживаются. А вот в delay_queue задерживаются из-за случайной задержки.
GitHub проекта: https://github.com/3abubenni/rabbitmq
Комментарии (3)
olku
14.01.2025 17:53У Кролика много фишек. Нет ли желания продолжить серию темами подтверждения сообщений, версионирования типов сообщений, доставки один ко многим, сериализации для связывания сервисов на спринге и ноде, фоллбека недоставленных?
egribanov
14.01.2025 17:53Подушню, но вот такие штуки можно не писать, только усложняете себе жизнь
ResponseEntity<Void>
На ютуб канале Young&&Yandex помню было видео про архитектуру спринговых приложений, много полезного было
sarkhan69
дедовский совет автору и всем джуникам:
@Bean public Queue queue() { return new Queue(queueName, false); }
@Bean public Queue queue2() { return new Queue(queueWithDelayName, false); }
при ручном объявлении бинов, давайте им читаемые и понятные по смыслу названия
потому что у вас в контексте будут болтаться в данном конкретном случае
два бина с названиям queue и queue2, что может навести смуту в умах других молодых разработчиков