Автор статьи: Сергей Прощаев @sproshchaev
Руководитель направления Java‑разработки в FinTech

Введение

Очень часто в проектах необходимо использовать передачу сообщений между компонентами распределенной системы по определенным правилам. И перед разработчиком встает вопрос — какой инструмент наиболее эффективно можно использовать для этого? И сегодня мы рассмотрим брокер сообщений, который позволяет это делать «прямо из коробки» и это будет RabbitMQ.

RabbitMQ — это популярный брокер сообщений, который реализует стандарт AMQP и который позволяет эффективно управлять коммуникацией между сервисами через очереди. И в этой статье мы разберем основные типы обменников (exchange): Direct, Topic, Headers и Fanout, которые напрямую участвуют в процессе маршрутизации, а также приведем примеры их настройки в Spring Boot.

Роль обменников в RabbitMQ

Обменники (Exchanges) в RabbitMQ — это компоненты, которые принимают сообщения от продюсеров и маршрутизируют их в соответствующие очереди по определённым правилам. Обменники определяют, как сообщение попадёт в очередь, основываясь на типе обменника, ключа маршрутизации и привязках между обменником и очередями.

Мы рассмотрим четыре наиболее часто используемых типа обменника: Direct, Topic, Headers и Fanout.

Direct Exchange

Сообщения направляются в очередь по точному совпадению routing key.

Рис.1 Direct Exchange
Рис.1 Direct Exchange

На рис. 1 изображен пример обменника Direct Exchange, который отправляет сообщение с ключом «green» только в «зеленую очередь», привязанную к этому ключу.

Topic Exchange

Маршрутизация через шаблоны с точками и wildcards, где символ «*» заменяет одно слово, а символ «#» заменяет ноль или более слов.

Рис.2 Topic Exchange
Рис.2 Topic Exchange

На рис. 2 представлен пример Topic Exchange в котором ключ «first.green.fast» обеспечит попадание сообщения в зеленую очередь, привязанную к шаблону «*.green.*»

Headers Exchange

Маршрутизация сообщений на основе заголовков (headers), в которой сообщение направляется в очередь, если заголовки сообщения соответствуют условиям привязки. При работе Routing key в сообщении игнорируется. Используются параметры x‑match: all (все указанные заголовки должны совпадать) или x‑match: any (достаточно совпадения хотя бы одного заголовка).

Рис.3 Headers Exchange
Рис.3 Headers Exchange

На рис. 3 демонстрируется пример Headers Exchange в котором сообщение с заголовком «{key1, value1}» будет отправлено в зеленую очередь, привязанную к обменнику.

Fanout Exchange

Сообщения рассылаются во все очереди, подключенные к обменнику и Routing key при этом игнорируется.

Рис.4 Fanout Exchange
Рис.4 Fanout Exchange

На рис. 4 показан пример обменника Fanout Exchange отправит сообщение всем очередям независимо от используемого ключа маршрутизации.

Как правильно настроить использованием RabbitMQ в Spring boot для отправки сообщений?

Для начала необходимо в проекте добавить зависимости Spring Boot и RabbitMQ. Здесь показан пример использования сборщика Maven и определение зависимостей в pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

Direct Exchange

Конфигурация Direct Exchange:

@Configuration
public class DirectExchangeConfig {
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_exchange");
    }

    @Bean
    public Queue directQueue() {
        return new Queue("direct_queue");
    }

    @Bean
    public Binding binding(Queue directQueue, 
                           DirectExchange directExchange) {
        return BindingBuilder
                   .bind(directQueue)
                   .to(directExchange)
                   .with("direct.key");
    }
}

Эта конфигурация реализует простую прямую маршрутизацию для точечной доставки сообщений. Если очередь или обменник не существуют в RabbitMQ, Spring создаст их автоматически. Имена direct_exchange и direct_queue могут быть любыми, но должны быть уникальными в рамках RabbitMQ. Ключ direct.key должен точно совпадать при отправке сообщения, иначе оно не попадет в очередь.

Отправка сообщения в Direct Exchange:

@Service
public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("direct_exchange", 
                                      "direct.key", 
                                      message);
    }
}

Класс DirectProducer отвечает за отправку сообщений в RabbitMQ через Direct Exchange. Аннотация @Service помечает класс как сервисный компонент Spring. Spring автоматически создаст бин этого класса, и его можно будет внедрять (через аннотацию @Autowired) в другие части приложения.

RabbitTemplate — это класс Spring для взаимодействия с RabbitMQ. Он предоставляет методы для отправки и получения сообщений. Через @Autowired Spring внедряет готовый экземпляр RabbitTemplate, настроенный в конфигурации приложения.

Метод sendMessage() отправляет сообщение в RabbitMQ. В качестве параметры метода выступают: «direct_exchange» — имя обменника, которое должно совпадать с тем, что объявлено в конфигурации, «direct.key» — это ключ маршрутизации (routing key), по которому сообщение будет маршрутизироваться, и наконец message — это само сообщение, которое мы отправляем в RabbitMQ в качестве него мы можем использовать например, строку «Hello, Rabbit!».

И у rabbitTemplate мы вызываем метод convertAndSend, который автоматически сериализует объект message в байты.

Topic Exchange

Конфигурация Topic Exchange:

@Configuration
public class TopicExchangeConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_exchange");
    }

    @Bean
    public Queue topicQueue() {
        return new Queue("topic_queue");
    }

    @Bean
    public Binding binding(Queue topicQueue, 
                           TopicExchange topicExchange) {
        return BindingBuilder
                   .bind(topicQueue)
                   .to(topicExchange)
                   .with("user.*");
    }
}

Класс TopicExchangeConfig настраивает обменник с маршрутизацией по шаблонам Topic Exchange в RabbitMQ через Spring Boot. Создается обменник типа Topic с именем topic_exchange. Сообщения маршрутизируются в очереди на основе шаблонов.

В нашем примере используется шаблон user.* В этом шаблоне * заменяет ровно одно слово (например, user.profile или user.settings). Еще есть возможность в шаблоне использовать символ #, который заменяет ноль или более слов.

Через new Queue(«topic_queue») мы объявляем очередь с именем topic_queue. Если очередь с таким именем не существует в RabbitMQ, то она будет создана автоматически. И затем привязываем очередь topic_queue к обменнику topic_exchange через шаблон user.*

Отправка сообщений в Topic Exchange

@Service
public class TopicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend("topic_exchange", 
                                      routingKey, 
                                      message);
    }
}

Класс TopicProducer отвечает за отправку сообщений в RabbitMQ через Topic Exchange, который маршрутизирует сообщения на основе шаблонов. Метод rabbitTemplate.convertAndSend(«topic_exchange», routingKey, message) отправляет сообщение в RabbitMQ через Topic Exchange, используя шаблоны маршрутизации.

Headers Exchange

Конфигурация Headers Exchange

@Configuration
public class HeadersExchangeConfig {
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers_exchange");
    }

    @Bean
    public Queue headersQueue() {
        return new Queue("headers_queue");
    }

    @Bean
    public Binding binding(Queue headersQueue, 
                           HeadersExchange headersExchange) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-match", "all");
        headers.put("type", "error");
        headers.put("priority", "high");
        return BindingBuilder
                   .bind(headersQueue)
                   .to(headersExchange)
                   .whereAll(headers)
                   .match();
    }
}

Класс HeadersExchangeConfig настраивает Headers Exchange в котором сообщения маршрутизируются на основе заголовков (метаданных). Ключ маршрутизации в этом типе обменника не используется. Через new HeadersExchange(«headers_exchange») создается обменник типа Headers с именем headers_exchange. Сообщения маршрутизируются в очереди на основе совпадения заголовков (headers), указанных в привязке. В конструкции new Queue(«headers_queue») объявляется очередь с именем headers_queue. В бине binding() производится привязка очереди к обменнику. Настройка x‑match: all определяет то, что сообщение попадет в очередь только если все указанные заголовки совпадают. А через.whereAll(headers).match() мы указываем, что проверяются все ключи из Map с именем headers. В Headers Exchange параметр routing key не используется.

Отправка сообщений в Headers Exchange

@Service
public class HeadersProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        MessageProperties props = new MessageProperties();
        props.setHeader("type", "error");
        props.setHeader("priority", "high");
        Message msg = new Message(message.getBytes(), props);
        rabbitTemplate.send("headers_exchange", "", msg);
    }
}

Класс HeadersProducer отвечает за отправку сообщений в RabbitMQ через Headers Exchange, который маршрутизирует сообщения на основе заголовков.

В методе sendMessage() создаются заголовки через экземпляр класса MessageProperties и экземпляр класса Message, который включает в себя тело сообщения (message.getBytes() — текст, преобразованный в байты) и заголовки (props). Метод.send() отправляет сообщение напрямую, без автоматической конвертации, в отличие от convertAndSend().

Конфигурация Fanout Exchange

@Configuration
public class FanoutExchangeConfig {
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange");
    }

    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout_queue_1");
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout_queue_2");
    }

    @Bean
    public Binding binding1(Queue fanoutQueue1, 
                            FanoutExchange fanoutExchange) {
        return BindingBuilder
                   .bind(fanoutQueue1)
                   .to(fanoutExchange);
    }

    @Bean
    public Binding binding2(Queue fanoutQueue2, 
                            FanoutExchange fanoutExchange) {
        return BindingBuilder
                   .bind(fanoutQueue2)
                   .to(fanoutExchange);
    }
}

Класс FanoutExchangeConfig настраивает Fanout Exchange (обменник с широковещательной рассылкой) в RabbitMQ через Spring Boot. Через new FanoutExchange(«fanout_exchange») создает обменник типа Fanout с именем fanout_exchange. Все сообщения, отправленные в этот обменник, рассылаются во все привязанные очереди, независимо от routing key. Через new Queue() объявляются две очереди: fanout_queue_1 и fanout_queue_2.

И через BindingBuilder.bind(queue).to(exchange) — привязывается очередь к обменнику.

Отправка сообщения Fanout Exchange

@Service
public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("fanout_exchange", "", message);
    }
}

Класс FanoutProducer отвечает за отправку сообщений в RabbitMQ через Fanout Exchange, который осуществляет широковещательную рассылку сообщений во все привязанные очереди. Метод отправки сообщения sendMessage() отправляет сообщение в RabbitMQ через Fanout Exchange. В этом методе «fanout_exchange» — имя обменника, объявленного в FanoutExchangeConfig. Строка «« — это пустой routing key (в Fanout Exchange он игнорируется) и message — это само сообщение.

Заключение

В статье мы разобрали то, какие обменники используются для каких целей. Важно запомнить, что:

  1. Direct Exchange — используется для точечной маршрутизации,

  2. Topic Exchange — для гибкой фильтрации по шаблонам,

  3. Headers Exchange — для маршрутизации на основе заголовков,

  4. Fanout Exchange — для широковещательной рассылки.

В реальных проектах можно комбинировать типы обменников и использовать продвинутые настройки (например, TTL сообщений, dead‑letter очереди). О том, как это реализовать с использованием лучших практик — мы подробно изучаем в OTUS на курсе «RabbitMQ для разработчиков и администраторов».


Приглашаем вас на открытый урок «Оптимальные решения на RabbitMQ или как Кролик превосходит Kafka», который пройдет 23 июля в 20:00. В ходе занятия рассмотрим основные особенности RabbitMQ и его преимущества по сравнению с другими системами обмена сообщениями, а также представлены эффективные практические подходы к работе с этим брокером.

Также, чтобы проверить уровень знаний, пройдите вступительное тестирование по курсу «RabbitMQ для разработчиков и администраторов».

Комментарии (0)