Queue (очередь) — структура данных на диске или в оперативной памяти, которая хранит ссылки на сообщения и отдает их копии consumers (потребителям). Queue представляет собой Erlang-процесс с состоянием (где могут кэшироваться и сами сообщения). 1 тысяча очередей может занимать порядка 80Mb.
Binding (привязка) — правило, которое сообщает обменнику в какую из очередей должны попадать сообщения.
Оглавление
- RabbitMQ. Часть 1. Introduction. Erlang, AMQP и RPC
- RabbitMQ. Часть 2. Разбираемся с Exchanges
- RabbitMQ. Часть 3. Разбираемся с Queues и Bindings
- RabbitMQ. Часть 4. Разбираемся с тем что-такое сообщения и фреймы
- RabbitMQ. Часть 5. Производительность публикации и потребления сообщений
- RabbitMQ. Часть 6. Обзор модулей Federation и Shovel
- RabbitMQ. Часть 7. Подробно про Connection и Chanel
- RabbitMQ. Часть 8. RabbitMQ в .NET
- RabbitMQ. Часть 9. Мониторинг
Временные очереди
Если создание очереди происходит с установленным параметром autoDelete, то такая очередь обретает способность автоматически удалять себя. Такие очереди обычно создаются в момент подключения первого клиента и удаляются в момент, когда все клиенты отсоединились.
Если создание очереди происходит с установленным параметром exclusive, то такая очередь разрешает подключаться только одному потребителю и удаляется если закроется канал. До тех пор пока канал не закроется, клиент может отключаться/подключаться, но только в рамках того же самого соединения. Если параметр exclusive установлен, то параметр autoDelete не имеет никакого эффекта.
Особенности:
- при кратковременном разрыве связи мы будем терять сообщения, которые ещё не успели дойти до потребителя
- можно поймать феномен
binding churn. Феномен возникает, когда количество операций по созданию/удалению очередей и привязок достигает очень больших значений. В кластерном режиме такой поток операций будет расползаться по всем узлам и создаст большую нагрузку. Данный процесс можно оптимизировать за счет контроля количества подписок
Постоянные очереди
Если создание очереди происходит с установленным параметром durable, то такая очередь сохраняет свое состояние и восстанавливается после перезапуска сервера/брокера. Данная очередь будет существовать до тех пор пока не будет вызвана команду Queue.Delete.
Highly Available очереди
Очереди HA требуют кластерной среды RabbitMQ. В кластерном режиме вся информация об обменниках, очередях, привязках и потребителях будет скопирована на все узлы.
Когда сообщение публикуется в какую-то HA очередь, оно хранится на каждом узле, относящемуся к HA очереди. После того как сообщение потребляется на каком-то из узлов, все копии этого сообщения будут удалены на других узлах.
Очереди HA могут распространяться на все узлы в некотором кластере или только на индивидуальные.

Особенности:
- использование HA очередей приводит к наказаниям в производительности. При помещении сообщения в некую HA очередь или при потреблении сообщения из HA очереди RabbitMQ должен выполнять координацию по всем узлам (2-3 узла обычно достаточно)
Создание очереди
Создание очереди происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Declare, вызываемого с параметрами:
- название очереди
- другие параметры
Пример создания очереди при помощи RabbitMQ.Client:
// ...
channel.QueueDeclare(
queue: "my_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// ...queue— название очереди, которую мы хотим создать. Название должно быть уникальным и не может совпадать с системным именем очередиdurable— если true, то очередь будет сохранять свое состояние и восстанавливается после перезапуска сервера/брокераexclusive— если true, то очередь будет разрешать подключаться только одному потребителюautoDelete— если true, то очередь обретает способность автоматически удалять себяarguments— необязательные аргументы. Ниже разберем подробнее.
arguments
x-message-ttl(x-message-time-to-live) — позволяет установить время истечения срока жизни сообщения в миллисекундах. Если создание очереди происходит с установленным значением аргументаx-message-ttl, то такая очередь будет автоматически исключать сообщения, у которых истек срок действия. Установка значения аргументаx-message-ttlзадает максимальный возраст для всех сообщений в данной очереди. Создание такой очереди позволяет предотвратить получение устаревшей информации. Это можно использовать в системах реального времени. Если у очереди для которой задан обменник для отклоненных сообщений установить значение аргументаx-message-ttl, то отклоненные сообщения в данной очереди начнут обладать сроком жизни.x-expires— задает значение в миллисекундах по истечению которого происходит удаление очереди. Очередь может израсходовать срок своего действия только если она не имеет никаких подписчиков. Если к очереди подключены подписчики, она сможет автоматически удалиться только тогда, когда все подписчики вызовутBasic.Cancelили отсоединятся. Срок жизни очереди может завершиться только в том случае, если к ней не было запросаBasic.Get. Иначе текущее значение установки времени жизни обнуляется, и очередь больше не будет автоматически удаляться. Также нет гарантий того, насколько быстро происходит удаление очереди после истечения её срока жизни.x-max-length— задает максимальное число сообщений в очереди. Если число сообщений в очереди начинает превышать макимальное чило, то начинают удаляться самые старые

x-max-lenght-bytes— задает максимально допустимый суммарный размер полезной нагрузки сообщений в очереди. При превышении установленного значения (возникло переполнение очереди при очередной публикации сообщения) самые старые сообщения начнут удалятьсяx-overflow— данный аргумент используется для настройки поведения в результате переполнения очереди. Доступны два значения:drop-head(значение по умолчанию) иreject-publish. Если выбратьdrop-head, то самые старые сообщения будут удаляться. Если выбратьreject-publish, то прием сообщений будет приостановленx-dead-letter-exchange— задает exchange, в который направляются отвергнутые сообщения, которые не поставлены повторно в очередьx-dead-letter-routing-key— задает не обязательный ключ маршрутизации для отвергнутых сообщенийx-max-priority— разрешает сортировку по приоритетам в очереди с максимальным значением приоритета 255 (RabbitMQ версий 3.5.0 и выше). Число указывает максимальный приоритет, который будет поддерживать очередь. Если аргумент не установлен, очередь не будет поддерживать приоритет сообщенийx-queue-mode— позволяет перевести очередь в ленивый режим. В таком режиме как можно больше сообщений будет храниться на диске. Использование оперативной памяти будет минимально. В случае, если он не установлен, очередь будет хранить сообщения в памяти, чтобы доставлять сообщения максимально быстроx-queue-master-locator— если у нас кластер, то можно задать мастер очередьx-ha-policy— используется при создании HA очередей и определяет как сообщение будет распространяться по узлам. Если установлено значениеall, то сообщение будет сохраняться на всех узлах. Если установлено значениеnodes, то сообщение будет сохраняться на определенных узлах кластераx-ha-nodes— задает узлы, к которым будет относиться некая очередьHA

Если создание очереди возможно, то сервер отправит клиенту синхронный RPC ответ Queue.DeclareOk. Если создание очереди невозможно (произошел отказ по запросу Queue.Declare), то канал закроется сервером при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.
Повторный вызов Queue.Declare с аналогичными параметрами вернет полезную информацию об этой очереди. Например, общее число сообщений, находящихся в ожидании в данной очереди, и общее число подписанных на неё потребителей.
Вызов Queue.Declare под учетными данными пользователя, которому не назначены необходимые права закроет канал при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки 403 и ее описание.
После того, как очередь простаивает в течении >= 10 секунд, она впадает в спящий режим, вызывая GC в очереди, что приводит к значительному сокращению памяти, необходимой для этой очереди.
Создание Queue через графический интерфейс
Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и нажмем на Add a new queue. Заполняем свойства:

Как только мы введем все необходимые данные и нажмете на Add queues, очередь появится в общем списке.

Щелчок по имени очереди покажет ее детальную информацию. Здесь можно настроить привязку между обменом и очередью, посмотреть список consumers, опубликовать/получить сообщения, удалить очередь и посмотреть статистику.
Создание Binding
Создание привязки происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Bind, вызываемого с параметрами:
- название очереди
- название точки обмена
- другие параметры
Пример создания привязки при помощи RabbitMQ.Client:
//...
channel.QueueBind(
queue: queueName,
exchange: "my_exchange",
routingKey: "my_key",
arguments: null
);
//...queue— имя очередиexchange— имя обменникаroutingKey— ключ маршрутизацииarguments— необязательные аргументы

Если создание привязки возможно, то сервер отправит клиенту синхронный RPC ответ Queue.BindOk.
Создание Binding через графический интерфейс
Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и жмем на очередь my_queue. Заполняем поля раздела bindings:

Как только мы введем все необходимые данные и нажмем на Bind, привязка отобразится в общем списке:

Code
В данном разделе опишем очередь и привязку кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.
public interface IQueue
{
string Name { get; }
/// <summary>
/// Если установить true, то queue будет являться постоянным.
/// Она будет храниться на диске и сможет
/// пережить перезапуск сервера/брокера.
/// Если значение false, то queue является временной и будет удаляться,
/// когда сервер/брокер будет перезагружен
/// </summary>
bool IsDurable { get; }
/// <summary>
/// Если значение равно true, то
/// такая очередь будет разрешать подключаться
/// только одному consumer-у
/// </summary>
bool IsExclusive { get; }
/// <summary>
/// Автоматическое удаление.
/// Очередь будет удалена, когда все клиенты отсоединятся.
/// </summary>
bool IsAutoDelete { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}public class Queue : IQueue
{
public Queue(
string name,
bool isDurable = true,
bool isExclusive = false,
bool isAutoDelete = false,
IDictionary<string, object> arguments = null)
{
Name = name ??
throw new ArgumentNullException(name, $"{name} must not be null");
IsDurable = isDurable;
IsExclusive = isExclusive;
IsAutoDelete = isAutoDelete;
Arguments = arguments ?? new Dictionary<string, object>();
}
public string Name { get; }
public bool IsDurable { get; }
public bool IsExclusive { get; }
public bool IsAutoDelete { get; }
public IDictionary<string, object> Arguments { get; }
}public static class QueueMode
{
public const string Default = "default";
/// <summary>
/// Ленивый режим. Ленивый режим заставит сохранять
/// как можно больше сообщений на диске, чтобы уменьшить
/// использование оперативной памяти
/// </summary>
public const string Lazy = "lazy";
}public interface IBinding
{
/// <summary>
/// Обменник, который будет связываться привязкой
/// </summary>
IExchange Exchange { get; }
/// <summary>
/// Ключ маршрутизации
/// </summary>
string RoutingKey { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}public class Binding : IBinding
{
public Binding(
IExchange exchange,
string routingKey,
IDictionary<string, object> arguments)
{
Exchange = exchange;
RoutingKey = routingKey;
Arguments = arguments;
}
public IExchange Exchange { get; }
public string RoutingKey { get; }
public IDictionary<string, object> Arguments { get; }
}
1210mk2
Это, пожалуй, самое полное описание rabbitmq на русском, что я встречал. Такой опыт можно получить только прочитав полный туториал с английском (это не 6ти страничный, есть другой) и протестировав поведение лично.
AgentFire
и при этом равно 0 описания того, зачем это всё нужно. вроде как статья для непрошаренных, но как я был непрошаренным, так и остался, только на красивые слова посмотрел.