Привет, Хабр!

Сегодня рассмотрим библиотеку Lapin в Rust. Lapin — это библиотека, реализующая протокол AMQP 0.9.1, она помогает взаимодействовать с RabbitMQ.

  1. Многоканальная работа: один TCP‑соединение поддерживает множество каналов.

  2. Поддержка подтверждений: безопасная обработка сообщений (ack/nack).

  3. Интеграция с async: асинхронный API, который вписывается в экосистему Rust.

  4. TLS: поддержка защищённых соединений через native-tls, openssl или rustls.


Основные сущности Lapin

Connection

Соединение с RabbitMQ устанавливается один раз и может использоваться для работы с несколькими каналами. Это основа любого взаимодействия.

Пример:

use lapin::{Connection, ConnectionProperties};

let addr = "amqp://user:password@localhost:5672/%2f";
let connection = Connection::connect(&addr, ConnectionProperties::default())
    .await
    .expect("Ошибка подключения");

Поддерживаются как TCP, так и TLS‑соединения.

Channel

Каналы — это основа взаимодействия с RabbitMQ. Через них создаются очереди, подписки и отправляются сообщения.

Пример:

let channel = connection.create_channel()
    .await
    .expect("Ошибка создания канала");

RabbitMQ рекомендует использовать отдельные каналы для отправки и получения сообщений. В одном соединении можно создавать множество каналов.

Queue

Очереди — это хранилища сообщений. Они бывают:

  • Durable: сохраняются при перезапуске сервера.

  • Exclusive: доступны только для текущего соединения.

  • Auto‑delete: удаляются, когда больше не используются.

Пример:

use lapin::options::QueueDeclareOptions;
use lapin::types::FieldTable;

let queue = channel
    .queue_declare(
        "task_queue",
        QueueDeclareOptions {
            durable: true, // Устойчивая очередь
            ..Default::default()
        },
        FieldTable::default(),
    )
    .await
    .expect("Ошибка объявления очереди");

Используйте FieldTable, чтобы настроить TTL сообщений, ограничение размера очереди и т. п.

Exchange

Exchange — это маршрутизатор, который направляет сообщения в очереди на основе типов и правил маршрутизации:

  1. Direct: сообщения отправляются в конкретную очередь по ключу маршрутизации.

  2. Fanout: сообщение отправляется во все очереди, привязанные к exchange.

  3. Topic: сложная маршрутизация по шаблонам.

  4. Headers: маршрутизация на основе заголовков сообщения.

Пример настройки exchange и привязки к очереди:

use lapin::options::{ExchangeDeclareOptions, QueueBindOptions};

channel
    .exchange_declare(
        "my_exchange",
        lapin::ExchangeKind::Direct, // Тип exchange
        ExchangeDeclareOptions {
            durable: true,
            ..Default::default()
        },
        FieldTable::default(),
    )
    .await
    .expect("Ошибка создания exchange");

channel
    .queue_bind(
        "task_queue",
        "my_exchange",
        "routing_key", // Ключ маршрутизации
        QueueBindOptions::default(),
        FieldTable::default(),
    )
    .await
    .expect("Ошибка привязки очереди");

TLS:

Для продакшен‑окружения требуется защищённое соединение. Настроим соединение через rustls:

use lapin::{Connection, ConnectionProperties};
use lapin::tcp::{OwnedTLSConfig, OwnedTLSStream};

let addr = "amqps://user:password@rabbitmq.example.com:5671/";
let tls_config = OwnedTLSConfig::default();

let connection = Connection::connect(
    addr,
    ConnectionProperties::default().with_tls(tls_config),
)
.await
.expect("Ошибка подключения через TLS");

Обработка ошибок

Ошибки неизбежны.

  1. Используйте retry для повторного подключения или обработки сообщения.

  2. Обрабатывайте nack, если сообщение нельзя обработать.

Пример обработки ошибок:

while let Some(delivery) = consumer.next().await {
    match delivery {
        Ok(delivery) => {
            if let Err(err) = process_message(&delivery).await {
                error!("Ошибка обработки сообщения: {:?}", err);
                delivery
                    .nack(Default::default())
                    .await
                    .expect("Ошибка отправки nack");
            } else {
                delivery
                    .ack(Default::default())
                    .await
                    .expect("Ошибка отправки ack");
            }
        }
        Err(err) => {
            error!("Ошибка получения сообщения: {:?}", err);
        }
    }
}

Пример применения

Напишем приложение, которое:

  • Создаёт exchange и очередь.

  • Маршрутизирует сообщения.

  • Обрабатывает входящие сообщения с QoS и подтверждениями.

use lapin::{
    options::{BasicPublishOptions, QueueBindOptions, QueueDeclareOptions},
    types::FieldTable,
    BasicProperties, Connection, ConnectionProperties,
};
use tokio;
use tracing::info;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let addr = "amqp://user:password@localhost:5672/%2f";
    let connection = Connection::connect(&addr, ConnectionProperties::default())
        .await
        .expect("Ошибка подключения");

    let channel = connection.create_channel().await.expect("Ошибка создания канала");

    channel
        .exchange_declare(
            "logs",
            lapin::ExchangeKind::Fanout,
            Default::default(),
            FieldTable::default(),
        )
        .await
        .expect("Ошибка создания exchange");

    let queue = channel
        .queue_declare(
            "",
            QueueDeclareOptions {
                exclusive: true,
                ..Default::default()
            },
            FieldTable::default(),
        )
        .await
        .expect("Ошибка объявления очереди");

    channel
        .queue_bind(
            &queue.name(),
            "logs",
            "",
            QueueBindOptions::default(),
            FieldTable::default(),
        )
        .await
        .expect("Ошибка привязки очереди");

    info!("Очередь привязана к exchange");

    tokio::spawn(async move {
        for i in 0..5 {
            let message = format!("Log message {}", i);
            channel
                .basic_publish(
                    "logs",
                    "",
                    BasicPublishOptions::default(),
                    message.as_bytes(),
                    BasicProperties::default(),
                )
                .await
                .expect("Ошибка отправки сообщения");
        }
    });

    info!("Сообщения отправлены");
}

Создаём exchange типа Fanout, объявляем временную очередь и связываем её с exchange. После этого отправляем несколько сообщений в exchange, которые автоматически маршрутизируются во все привязанные очереди.


Подробнее с библиотекой можно ознакомиться здесь.

Рекомендую обратить внимание на открытые уроки, которые в феврале проведут в Otus преподаватели-практики:

  • 11 февраля: «Разбираем анатомию парсера на Rust».
    Разберём устройство игрушечного парсера на Rust, его ключевые компоненты и архитектурные принципы, обеспечивающие надёжность и производительность кода. Записаться

  • 17 февраля: «Инцидент-менеджмент в SRE — как быстро найти, устранить и предотвратить сбои в системе».
    Практическое руководство по эффективному управлению аварийными ситуациями в рамках Site Reliability Engineering (SRE). Записаться

Комментарии (0)