Автор статьи: Сергей Прощаев @sproshchaev
Руководитель направления Java‑разработки в FinTech

Сообщения в RabbitMQ — это основные единицы данных, которые передаются между продюсерами и потребителями. Понимание их структуры и возможностей позволяет эффективно управлять потоком данных в распределенных системах. В этой статье мы разберем анатомию сообщений, обязательные и опциональные компоненты, а также реализуем пример отправки объекта с настройкой свойств

1. Структура сообщения в RabbitMQ

Каждое сообщение состоит из трех частей: Тело сообщения (Body), Свойства (Properties) и Routing Key.

Тело сообщения (Body)

Тело сообщения — это обязательный элемент, который содержит основные данные, передаваемые через RabbitMQ. Оно представляет собой массив байт (binary data), что делает его универсальным контейнером для любой информации.

В нем можно передавать текст, JSON, XML, изображения и даже зашифрованные данные. При этом сам RabbitMQ не анализирует содержимое тела — это задача вашего приложения. Сервер лишь доставляет байты получателю.

Почему Тело Должно Быть Массивом Байт?

RabbitMQ работает на низком уровне, поэтому требует данных в формате, который можно передать по сети без потерь. Преобразование в байты гарантирует кросс‑платформенность, так как любой популярный язык программирования Java, Python или.NET умеет работать с байтами. Через массив байт можно передавать бинарные файлы (например, изображения), а так же это ускоряет обработку данных, так как байты всегда обрабатываются быстрее, чем строки.

Что можно рекомендовать?

  1. обязательно согласовывайте форматы данных и используйте всегда один формат (например JSON/XML) во всей системе, чтобы избежать конфликтов;

  2. контролируйте размер сообщений, ведь большие сообщения (свыше 1 МБ) замедляют работу RabbitMQ;

  3. если необходимо передавать файлы, то лучше использовать ссылки.

  4. добавляйте заголовки (об этом чуть позже) и указывайте, например: timestamp, user_id для удобства дальнейшей отладки;

  5. всегда обрабатывайте ошибки и проверяйте, что тело сообщения не пустое и имеет ожидаемый формат.

Свойства (Properties)

Метаданные в RabbitMQ — это скрытая мощь, которая превращает простые очереди в гибкие системы. Они управляют всем: от маршрутизации до выживания сообщений при сбоях.

Метаданные — это служебная информация, которая сопровождает тело сообщения. Сами метаданные не являются обязательными, но с их помощью можно указать системе такие факторы как:

  • Маршрутизация — куда и как доставить сообщение,

  • Обработка — как именно интерпретировать данные, передаваемые в сообщении,

  • Надежность — какие гарантии доставки должны быть использованы для сообщения.

Давайте разберем основные метаданные, которые часто используются:

1. Параметр content_type — это тип данных, который указывает формат тела сообщения. Это помогает потребителю правильно десериализовать данные.

Например в content_type можно указать один из типов: application/json, application/xml или text/plain.

2. Параметр content_encoding — отвечает за кодировку и указывает, как закодированы данные. Зачем: Экономит трафик и ускоряет передачу.

Примером значения, указанного в content_encoding могут быть: utf-8 (текст) или gzip и deflate (если используется сжатие).

3. Параметр headers — является незаменимым, если мы хотим указать значение переменных, которые планируем поместить в пользовательские заголовки

В качестве headers мы можем указывать произвольные пары ключ‑значение для кастомной логики. Это может быть как маршрутизация через обменник типа headers exchange, так и хранение любых меток, которые будут необходимы для работы с сообщением на стороне получателя. Например мы можем определить две переменные: source: 'web', env: 'prod'.

4. Параметр delivery_mode — содержит настройку режима доставки и может принимать значения: 1 — не сохранять на диск (быстро, но ненадежно). 2 — сохранять на диск (гарантия доставки даже при падении сервера).

Параметр delivery_mode позволяет управлять надежностью доставки сообщения.

5. Параметр priority — определяет приоритет сообщения. Его значение это целое число в диапазоне 0–255. Сообщения с высшим приоритетом обрабатываются первыми. Но это работает только с очередями, созданными с поддержкой приоритетов

6. И наконец параметр expiration — задает время жизни (TTL). Если он задан, то сообщение удаляется через N миллисекунд, если не будет обработано. Это очень удобно для обработки временных данных, которыми могут являться одноразовые ссылки и тп.

Что можно рекомендовать при использовании перечисленных метаданных?

  1. указывайте всегда content_type — иначе потребитель может неправильно интерпретировать данные;

  2. используйте delivery_mode=2 только для критически важных данных, не злоупотребляйте этим параметром, так как запись на диск замедляет работу;

  3. если вы передаете больше данные в формате JSON/XML, то сжимайте эти данные через content_encoding — это сократит размер сообщения в 2–3 раза;

  4. будьте осторожны с приоритетами — высокий приоритет может «забить» очередь, если низкоприоритетные сообщения не обрабатываются.

Routing Key

И третья часть сообщения это ключ, который используется для маршрутизации сообщения через exchange. Эта часть сообщения обязательна, если обменник (exchange) его использует в своей работе.

Например, обменники типа direct или topic используют Routing Key, а для fanout и headers ключ маршрутизации игнорируется.

2. Пример отправки объекта Person в сообщении RabbitMQ

И традиционно после небольшой теории у нас всегда идет практический пример. Давайте, в лучших традициях курсов, создадим класс Person и на примере покажем что и как настраивается в сообщении.

Для начала нам нужно будет подключить зависимости. Мы будем использовать стартер Spring framework:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Теперь определим сам класс, экземпляр которого у нас будет являться полезной нагрузкой в сообщении RabbitMQ. Для этого будем использовать проект Lombok, который позволяет сократить часть шаблонного кода в Java:

import lombok.*;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class Person {
    private String name;
    private int age;
}

И давайте приведем пример конфигурационного класса, в котором определим основные настройки RabbitMQ:

@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("person_exchange");
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("person_queue")
                .withArgument("x-dead-letter-exchange", "dlq_exchange")
                .withArgument("x-max-priority", 10)
                .withArgument("x-message-ttl", 10000) 
                .build();
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder
                      .bind(queue)
                      .to(exchange)
                      .with("person.key");
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

В этом классе мы создаем обменник типа Direct с именем person_exchange, который маршрутизирует сообщения в очередь по точному совпадению routing key.

Затем мы создаем «устойчивую очередь» person_queue в которой устойчивость заключается в том, что все сообщения будут сохраняться в случае перезагрузки RabbitMQ.

Далее добавляем аргумент x‑dead‑letter‑exchange, который определяет то, что если есть сообщения, которые не удалось обработать (например, после N попыток), будут отправлены в очередь Dead Letter через обменник dlq_exchange.

Также в настройки очереди присутствует аргумент x‑max‑priority со значением 10 — его назначение мы разберем чуть позже. И указываем параметр x‑message‑ttl, который задаёт максимальное время жизни сообщений в очереди (в миллисекундах). Если сообщение не будет обработано за это время, оно будет автоматически удалено.

После этого мы связываем очередь person_queue с обменником person_exchange, используя ключ маршрутизации person.key. И после этого сообщения, отправленные в person_exchange с ключом person.key, будут попадать в person_queue.

И наконец мы регистрируем конвертер, который автоматически сериализует объекты класса Person в JSON при отправке сообщений и десериализует JSON обратно в объекты при получении.

На третьем шаге мы создадим сервис, который будет отправлять экземпляры класса Person в сконфигурированную очередь RabbitMQ:

@Service
public class PersonProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendPerson(Person person) {


        MessageProperties props = new MessageProperties();
        props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        props.setPriority(5);
        props.setExpiration("10000");
        props.setHeader("user_id", "admin");

        Message message = rabbitTemplate.getMessageConverter()
                .toMessage(person, props); 
        
        rabbitTemplate.send("person_exchange", "person.key", message);
    }
}

Класс PersonProducer отправляет сообщения в RabbitMQ с тонкой настройкой метаданных. В нем используется RabbitTemplate который является ключевым классом в Spring AMQP для отправки и получения сообщений в RabbitMQ. Чтобы автоматически сериализовать объекты в JSON мы используем Jackson2JsonMessageConverter, который ранее был настроен в RabbitConfig.

И давайте подробнее остановимся на настройках:

1) setDeliveryMode(MessageDeliveryMode.PERSISTENT) — задает параметры режима доставки. В данном случае при MessageDeliveryMode.PERSISTENT сообщение будет сохранено на диске RabbitMQ. Это гарантирует, что данные не потеряются при перезагрузке сервера. Еще можно выбрать значение MessageDeliveryMode.NON_PERSISTENT — в этом случае сообщение хранится только в памяти брокера и если сервер RabbitMQ перезагрузится, сообщение будет потеряно. В случае MessageDeliveryMode.NON_PERSISTENT отправка сообщения будет быстрее, так как не используется сохранение на диск, поэтому этот режим можно использовать для сценариев, где скорость важнее надежности.

2) priority: 5 — сообщение получит приоритет 5 из диапазона значении от 0 до 255. И для того, чтобы это работало очередь должна поддерживать приоритеты и для этого мы использовали аргумент x‑max‑priority в классе конфигурации.

Сам приоритет определяет, в каком порядке сообщения будут доставлены потребителям. Чем выше priority, тем раньше сообщение будет обработано. Например: сообщение с приоритетом 5 будет обработано раньше сообщения с приоритетом 3. Если приоритет не указан, используется значение по умолчанию (0).

3) expiration: 10 000 — TTL отвечает за параметр Time‑To‑Live — это задает время жизни сообщения и в нашем случае сообщение удалится через 10 секунд, если не будет обработано. Если мы задаем TTL, то в настройках очередь мы должны указать то, что очередь поддерживает это (аргумент x‑message‑ttl).

4) user_id: admin — пример пользовательского заголовка, который можно использовать для обработки сообщения. В данном случае из этого заголовка мы поймем, что экземпляр класса Person, передаваемый в этом сообщении очевидно является администратором.

Если не требуется полный контроль над созданием Message — можно использовать метод convertAndSend для сокращения кода:

@Service
public class PersonProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendPerson(Person person) {
        rabbitTemplate.convertAndSend(
            "person_exchange", 
            "person.key", 
            person, 
            message -> {
                message.getMessageProperties()
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setPriority(5);
                message.getMessageProperties().setExpiration("10000");
                message.getMessageProperties()
                    .setHeader("user_id", "admin");
                return message;
            }
        );
    }
}

В этом примере мы используем метод convertAndSend вместо send и этот метод автоматически конвертирует объект Person в сообщение через Jackson2JsonMessageConverter и при этом используем те же настройки для отправки нашего сообщения.

Заключение

Умение тонко настраивать отправку сообщений в RabbitMQ — это не просто технический нюанс, а основа надежной и эффективной распределенной системы. Каждая опция — будь то установка delivery_mode для сохранения сообщений на диске, задание TTL для автоматического удаления устаревших данных или использование приоритетов для критичных задач — превращает хаотичный поток информации в предсказуемый и управляемый процесс.


Если вы работаете с распределёнными системами и хотите разобраться в тонкостях настройки RabbitMQ, приглашаем вас пройти небольшое тестирование. Оно поможет оценить ваше понимание ключевых механизмов работы брокера сообщений.

А также приглашаем на открытый урок «Оптимальные решения на RabbitMQ, или как Кролик превосходит Kafka», который пройдет 23 июля в 20:00. Обсудим архитектурные подходы, сравним популярные решения и разберём практические кейсы.

Чтобы оставаться в курсе самых актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.

Комментарии (0)