Привет, Хабр!
Сегодня рассмотрим библиотеку Lapin в Rust. Lapin — это библиотека, реализующая протокол AMQP 0.9.1, она помогает взаимодействовать с RabbitMQ.
Многоканальная работа: один TCP‑соединение поддерживает множество каналов.
Поддержка подтверждений: безопасная обработка сообщений (ack/nack).
Интеграция с async: асинхронный API, который вписывается в экосистему Rust.
TLS: поддержка защищённых соединений через
native-tls
,openssl
илиrustls
.
Основные сущности Lapin
Connection
Соединение с RabbitMQ устанавливается один раз и может использоваться для работы с несколькими каналами. Это основа любого взаимодействия.
Пример:
use lapin::{Connection, ConnectionProperties};
let addr = "amqp://user:password@localhost:5672/%2f";
let connection = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("Ошибка подключения");
Поддерживаются как TCP, так и TLS‑соединения.
Channel
Каналы — это основа взаимодействия с RabbitMQ. Через них создаются очереди, подписки и отправляются сообщения.
Пример:
let channel = connection.create_channel()
.await
.expect("Ошибка создания канала");
RabbitMQ рекомендует использовать отдельные каналы для отправки и получения сообщений. В одном соединении можно создавать множество каналов.
Queue
Очереди — это хранилища сообщений. Они бывают:
Durable: сохраняются при перезапуске сервера.
Exclusive: доступны только для текущего соединения.
Auto‑delete: удаляются, когда больше не используются.
Пример:
use lapin::options::QueueDeclareOptions;
use lapin::types::FieldTable;
let queue = channel
.queue_declare(
"task_queue",
QueueDeclareOptions {
durable: true, // Устойчивая очередь
..Default::default()
},
FieldTable::default(),
)
.await
.expect("Ошибка объявления очереди");
Используйте FieldTable
, чтобы настроить TTL сообщений, ограничение размера очереди и т. п.
Exchange
Exchange — это маршрутизатор, который направляет сообщения в очереди на основе типов и правил маршрутизации:
Direct: сообщения отправляются в конкретную очередь по ключу маршрутизации.
Fanout: сообщение отправляется во все очереди, привязанные к exchange.
Topic: сложная маршрутизация по шаблонам.
Headers: маршрутизация на основе заголовков сообщения.
Пример настройки exchange и привязки к очереди:
use lapin::options::{ExchangeDeclareOptions, QueueBindOptions};
channel
.exchange_declare(
"my_exchange",
lapin::ExchangeKind::Direct, // Тип exchange
ExchangeDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await
.expect("Ошибка создания exchange");
channel
.queue_bind(
"task_queue",
"my_exchange",
"routing_key", // Ключ маршрутизации
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.expect("Ошибка привязки очереди");
TLS:
Для продакшен‑окружения требуется защищённое соединение. Настроим соединение через rustls
:
use lapin::{Connection, ConnectionProperties};
use lapin::tcp::{OwnedTLSConfig, OwnedTLSStream};
let addr = "amqps://user:password@rabbitmq.example.com:5671/";
let tls_config = OwnedTLSConfig::default();
let connection = Connection::connect(
addr,
ConnectionProperties::default().with_tls(tls_config),
)
.await
.expect("Ошибка подключения через TLS");
Обработка ошибок
Ошибки неизбежны.
Используйте
retry
для повторного подключения или обработки сообщения.Обрабатывайте
nack
, если сообщение нельзя обработать.
Пример обработки ошибок:
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(delivery) => {
if let Err(err) = process_message(&delivery).await {
error!("Ошибка обработки сообщения: {:?}", err);
delivery
.nack(Default::default())
.await
.expect("Ошибка отправки nack");
} else {
delivery
.ack(Default::default())
.await
.expect("Ошибка отправки ack");
}
}
Err(err) => {
error!("Ошибка получения сообщения: {:?}", err);
}
}
}
Пример применения
Напишем приложение, которое:
Создаёт exchange и очередь.
Маршрутизирует сообщения.
Обрабатывает входящие сообщения с QoS и подтверждениями.
use lapin::{
options::{BasicPublishOptions, QueueBindOptions, QueueDeclareOptions},
types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use tokio;
use tracing::info;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let addr = "amqp://user:password@localhost:5672/%2f";
let connection = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("Ошибка подключения");
let channel = connection.create_channel().await.expect("Ошибка создания канала");
channel
.exchange_declare(
"logs",
lapin::ExchangeKind::Fanout,
Default::default(),
FieldTable::default(),
)
.await
.expect("Ошибка создания exchange");
let queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await
.expect("Ошибка объявления очереди");
channel
.queue_bind(
&queue.name(),
"logs",
"",
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.expect("Ошибка привязки очереди");
info!("Очередь привязана к exchange");
tokio::spawn(async move {
for i in 0..5 {
let message = format!("Log message {}", i);
channel
.basic_publish(
"logs",
"",
BasicPublishOptions::default(),
message.as_bytes(),
BasicProperties::default(),
)
.await
.expect("Ошибка отправки сообщения");
}
});
info!("Сообщения отправлены");
}
Создаём exchange типа Fanout
, объявляем временную очередь и связываем её с exchange. После этого отправляем несколько сообщений в exchange, которые автоматически маршрутизируются во все привязанные очереди.
Подробнее с библиотекой можно ознакомиться здесь.
Рекомендую обратить внимание на открытые уроки, которые в феврале проведут в Otus преподаватели-практики:
11 февраля: «Разбираем анатомию парсера на Rust».
Разберём устройство игрушечного парсера на Rust, его ключевые компоненты и архитектурные принципы, обеспечивающие надёжность и производительность кода. Записаться17 февраля: «Инцидент-менеджмент в SRE — как быстро найти, устранить и предотвратить сбои в системе».
Практическое руководство по эффективному управлению аварийными ситуациями в рамках Site Reliability Engineering (SRE). Записаться