Понять статью не составит труда тем, кто знаком с Spring и Spring Web и хотя бы раз создавал простое приложение с контроллерами, сервисами и моделями (проще говоря - реализовывал паттерн Model View Controller).

С чего всё начиналось

По работе ко мне пришли с предложением начать разработку небольшого проекта с использованием RabbitMQ в связке с Spring Framework. До того момента я только лишь читал о RabbitMQ и с очередями сообщений особо не работал, так что часть своих выходных решил потратить на изучение данной технологии и её применениях.

О RabbitMQ вкратце

RabbitMQ - брокер сообщений. Данная технология позволяет вести асинхронную обработку данных, а также делать микросервисы слабосвязанными, что может облегчить разработку. RabbitMQ можно сравнить с почтой, куда приходят письма. Если письмо пришло на почту, то его можно прочитать в любое удобное время. Также и в RabbitMQ: пришедшее письмо микросервис сможет забрать в любой момент. Тем самым можно ослабить нагрузку на микросервис, ведь "письма" он сможет обрабатывать в своём темпе.

Сообщения отправляются в очереди, где они хранятся до момента прочтения. После прочтения сообщение удаляется. Дальнейшая его судьба определяется сервисом, прочитавшим сообщение. Также сообщение обладает некоторыми атрибутами, в зависимости от которых определяется, в какую очередь оно попадёт. Данные атрибуты задаются разработчиком. Для примера атрибутом может служить название очереди или topic. Подробнее о атрибутах и обмене сообщениями по ним стоит прочитать на официальном сайте: https://www.rabbitmq.com/tutorials/amqp-concepts.

Для более полного понимания приведу для примера следующую схему:

Схема отправления сообщений
Схема отправления сообщений

На схеме P - producer, сервис отправляющий сообщения. X - маршрутизатор сообщений (он определяет в какую очередь попадёт сообщение), Q1, Q2, Q3 - очереди, C1, C2, C3 - потребители. Схема взаимосвязи микросервисов не обязательно такая, как указана на изображении. У одной очереди вполне может быть несколько продюсеров и несколько потребителей, всё зависит от самой архитектуры микросервисов.

Для получения полной информации о RabbitMQ стоит посетить официальный сайт: https://www.rabbitmq.com/. В данной же статье будет описание простого проекта на Java с двумя микросервисами, взаимодействующими посредством RabbitMQ.

Развёртывание RabbitMQ

Первым делом я взялся за развёртывание RabbitMQ. Благо для этого на Windows нужен лишь предустановленный Docker и интернет. Чтобы поднять контейнер с RabbitMQ на своём компьютере необходимо выполнить следующую команду:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

После успешного выполнения этой команды перейдя по ссылке http://localhost:15672 можно увидеть следующую картину:

страница авторизации в RabbitMQ
страница авторизации в RabbitMQ

По дефолту логин и пароль: guest

После авторизации можно увидеть следующую чудесную картину:

UI RabbitMQ
UI RabbitMQ

При должном знании английского разобраться в UI не составит труда.

UI предоставляет всю информацию о состоянии RabbitMQ, о её очередях, пользователях, использовании, готовых к обработке сообщений и так далее...

Дальше перейдём к написанию проекта

RabbitMQProducer

Данный сервис отправляет сообщения в одну из двух очередей RabbitMQ.

Для начала подключим необходимые библиотеки:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.4.1</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>org.example</groupId>
	<artifactId>RabbitMQProducer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>RabbitMQProducer</name>
	<description>RabbitMQProducer</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>21</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springdoc</groupId>
			<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
			<version>2.0.2</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Среди них Spring Web для написания REST, Spring RabbitMQ для работы с RabbitMQ и Srping Openapi для подключения Swagger.

Сперва сконфигурируем наше приложение с помощью файла application.properties:

spring.application.name=RabbitMQProducer

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672

server.port=8081

rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue

Под spring.rabbitmq указываем подключения к rabbitmq, server.port для того, чтобы открыть веб приложение на определённом порту (нужно учитывать, что он может быть попросту занят другим приложением). Две последних строки указывают названия очередей (де-факто их можно указать и напрямую в Java Code, но такой подход не совсем правильный, как мне кажется, да и менять названия очередей затем проще из application.properties, чем потом копаться в коде и менять там).

Затем укажем конфигурации для очередей:

package org.example.rabbitmqproducer.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.queue_with_delay.name}")
    private String queueWithDelayName;

    /**
     * Конфигурируем очередь
     * @return Очередь RabbitMQ
     */
    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    public Queue queue2() {
        return new Queue(queueWithDelayName, false);
    }

}

Здесь с помощью аннотации @Value забираем значения из application.properties.

Далее напишем небольшой сервис, который будет просто посылать сообщения в брокер:

package org.example.rabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

    private final RabbitTemplate template;

    @Value("${rabbitmq.queue.name}")
    private String messageQueue;

    @Value("${rabbitmq.queue_with_delay.name}")
    private String messageWithDelayQueue;

    @Autowired
    public MessageService(RabbitTemplate template) {
        this.template = template;
    }

    /**
     * Отправляет сообщение в очередь
     * @param message Сообщение
     */
    public void sendMessage(String message) {
        template.convertAndSend(messageQueue, message);
    }

    /**
     * Посылает сообщение в очередь, которая обрабатывается слушателем с некоторой задержкой
     * @param message Сообщение
     */
    public void sendMessageToQueueWithDelay(String message) {
        template.convertAndSend(messageWithDelayQueue, message);
    }

}

И теперь напишем контроллер, где метод POST по пути /message отправит сообщение в message_queue очередь, а метод POST по пути /message/with_delay отправит сообщение в очередь delay_queue:

package org.example.rabbitmqproducer.controller;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.example.rabbitmqproducer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@Tag(name = "Message Controller", description = "Controller to send message to RabbitMQ")
@RequestMapping("/message")
public class MessageController {

    private final MessageService service;

    @Autowired
    public MessageController(MessageService service) {
        this.service = service;
    }

    @Operation(description = "Send message to RabbitMQ")
    @ApiResponse(responseCode = "204")
    @PostMapping
    public ResponseEntity<Void> sendMessage(String message) {
        service.sendMessage(message);
        return ResponseEntity.noContent().build();
    }

    @Operation(description = "Send message to RabbitMQ in queue with delay")
    @ApiResponse(responseCode = "204")
    @PostMapping("/with_delay")
    public ResponseEntity<Void> sendMessageWithDelay(String message) {
        service.sendMessageToQueueWithDelay(message);
        return ResponseEntity.noContent().build();
    }
}

Запустим приложение.

Если всё правильно написано и сконфигурировано, то по ссылке http://localhost:8081/swagger-ui/index.html#/ можно увидеть поднятый Swagger:

Воспользуемся эндпоинтом /message два раза и эндпоинтом /message/with_delay один раз:

результат отправления сообщений
результат отправления сообщений

Здесь мы можем увидеть, что в очереди пришли сообщения. Теперь обработаем их.

RabbitMQConsumer

Данный сервис отвечает за обработку сообщений из очереди.

Подключим нужные библиотеки:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.4.1</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>org.example</groupId>
	<artifactId>RabbitMQConsumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>RabbitMQProducer</name>
	<description>RabbitMQConsumer</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>21</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Сконфигурируем в application.properties:

spring.application.name=RabbitMQConsumer

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672

server.port=8082

rabbitmq.queue.name=message_queue
rabbitmq.queue_with_delay.name=delay_queue

Здесь из Spring библиотек нужна только Spring RabbitMQ

Напишем слушатель сообщений:

package org.example.rabbitmqconsumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMQListener {

    private final static Random RANDOM = new Random();

    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue.name')}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_delay.name')}")
    public void receiveMessageWithDelay(String message) throws InterruptedException {
        var delay = RANDOM.nextInt(10000);
        Thread.sleep(delay);
        System.out.println("Received message: " + message + " with delay: " + delay);
    }

}

Особым образом здесь подтягиваются названия очередей из application.properties.

В методе receiveMessage сообщение сразу печатается в консоле. В методе receiveMessageWithDelay симулируется задержка до 10 секунд. Можно увидеть, что сообщения постепенно вытаскиваются из очереди delay_queue.

Запустим приложение и увидим вывод в консоле:

вывод в консоль
вывод в консоль

Приложение запустилось и само забрало из очереди сообщение. Можно ещё "побаловаться" и поотправлять сообщений. Посмотрим, что в мониторинге очереди delay_queue RabbitMQ:

мониторинг delay_queue
мониторинг delay_queue

Также на message_queue:

мониторинг message_queue
мониторинг message_queue

Видим разницу. В message_queue сообщения не задерживаются. А вот в delay_queue задерживаются из-за случайной задержки.

GitHub проекта: https://github.com/3abubenni/rabbitmq

Комментарии (3)


  1. sarkhan69
    14.01.2025 17:53

    дедовский совет автору и всем джуникам:
    @Bean public Queue queue() { return new Queue(queueName, false); }
    @Bean public Queue queue2() { return new Queue(queueWithDelayName, false); }


    при ручном объявлении бинов, давайте им читаемые и понятные по смыслу названия

    потому что у вас в контексте будут болтаться в данном конкретном случае
    два бина с названиям queue и queue2, что может навести смуту в умах других молодых разработчиков


  1. olku
    14.01.2025 17:53

    У Кролика много фишек. Нет ли желания продолжить серию темами подтверждения сообщений, версионирования типов сообщений, доставки один ко многим, сериализации для связывания сервисов на спринге и ноде, фоллбека недоставленных?


  1. egribanov
    14.01.2025 17:53

    Подушню, но вот такие штуки можно не писать, только усложняете себе жизнь ResponseEntity<Void>

    На ютуб канале Young&&Yandex помню было видео про архитектуру спринговых приложений, много полезного было