Настоятельно рекомендую ознакомиться перед прочтением: https://ru.wikipedia.org/wiki/AMQP
Если все ещё не установили RabbitMQ, то вот установка через Docker для Linux:
$ docker pull rabbitmq
$ docker run --restart always -d --network host rabbitmq
RabbitMQ это брокер сообщений. Он отвечает за получение и отправку сообщений. Вы можете думать об этом как об обычной почтовой системе. Когда вы кладете ваше письмо в почтовый ящик, то можете быть уверены, что позже курьер заберет его и в конечном счете доставит получателю. Если следовать аналогии, то RabbitMQ это почтовый ящик, почтовое отделение и почтальон в одном лице.
Ключевое отличие между RabbitMQ и почтой в реальной жизни – первый не работает с реальной бумагой, вместо этого он получает, хранит и передает бинарные блоки данных – messages.
1) Producing – означает не что иное, как производство/отправку. Тот, кто отправляет сообщения, называют producer.
2) Queue – означает то самое почтовое отделение внутри RabbitMQ. Хоть сообщения протекают через RabbitMQ в ваши приложения, они могут оставаться внутри брокера. Queue ограничена только вашими дисками и оперативной памятью, по сути это просто огромный буфер. Множество producers могут отсылать messages в одну и ту же queue, также, как и множество consumers могут вычитывать messages из одной queue.
3) Consuming – означает потребление/получение. Это просто программный код, который ожидает messages. Потребитель/получатель называется Consumer.
Стоит обратить внимание, что producer, consumer и queue могут располагаться не на одном компьютере. Это совсем не обязательно. В большинстве систем приложения распределены по нескольким серверам, виртуальным машинам и т.д. Эти приложения могут быть как producer, так и consumer.
"Hello World"
(Используем Go RabbitMQ клиент)
В этой части мы напишем две маленькие программы на языке программирования Go. Producer будет отсылать одиночные message, а consumer будет принимать их и печатать в командной строке. Многие детали Go RabbitMQ API мы специально опустим, сосредоточившись только на базовых вещах.
На рисунке ниже P – producer, C – consumer. Красный прямоугольник означает queue (буфер, где RabbitMQ хранит данные).
RabbitMQ поддерживает несколько протоколов. В этом руководстве мы будем рассматривать AMQP 0-9-1 – открытый протокол общего назначения для обмена сообщениями. RabbitMQ поддерживает еще много клиентов для других языков.
Сначала нужно установить пакет, через go get:
go get github.com/rabbitmq/amqp091-go
Когда все установилось, мы можем начать написание кода.
Отправка
Первым делом создадим подключение к серверу RabbitMQ.
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go" // Делаем удобное имя для импорта в нашем коде
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // Создаем подключение к RabbitMQ
if err != nil {
log.Fatalf("unable to open connect to RabbitMQ server. Error: %s", err)
}
defer func() {
_ = conn.Close() // Закрываем подключение в случае удачной попытки
}()
}
Соединение является абстракцией над socket и служит для согласования версии протокола между сервером и клиентом, отвечает за аутентификацию и другие важные вещи. Затем создадим канал, в котором находится большая часть логики для выполнения задач.
ch, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open channel. Error: %s", err)
}
defer func() {
_ = ch.Close() // Закрываем канал в случае удачной попытки открытия
}()
Далее нужно объявить queue для публикации сообщений.
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("failed to declare a queue. Error: %s", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("failed to publish a message. Error: %s", err)
}
log.Printf(" [x] Sent %s\n", body)
Объявление queue произойдет только в том случае, если она еще не создана. Messages представляют из себя массив байтов. Это позволяет помещать в очередь все, что можно превратить в данный формат.
Если вы работаете с RabbitMQ в первый раз и не получаете сообщение об успешной отправке, то придется посмотреть, что не так с вашей инсталляцией брокера. Возможно не хватает свободного места на диске (RabbitMQ требует не менее 200Мб свободного места на диске). Первым делом стоит проверить журнал и убедиться, что это действительно так. Документация по конфигурации RabbitMQ поможем вам найти, как изменить disk_free_limit.
Получение
В отличии от producer, наш consumer должен постоянно работать, чтобы не пропускать сообщения.
У consumer аналогичные настройки. Сначала мы открываем коннект, затем канал и объявляем queue из которой будет читать messages. Если такой queue не существует, то она будет создана.
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("unable to open connect to RabbitMQ server. Error: %s", err)
}
defer func() {
_ = conn.Close() // Закрываем подключение в случае удачной попытки подключения
}()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel. Error: %s", err)
}
defer func() {
_ = ch.Close() // Закрываем подключение в случае удачной попытки подключения
}()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("failed to declare a queue. Error: %s", err)
}
}
Обратите внимание, что QueueDeclare присутствует и в этом коде. Поскольку мы можем запустить consumer раньше, чем producer, то нам следует проверить, что queue с таким именем существует, прежде чем осуществлять операцию чтения (как указано выше, если такой queue не существовало, то она будет создана).
Далее мы говорим серверу RabbitMQ, что собираемся читать данные из queue с именем, которое мы передали в QueueDeclare. Поскольку сообщения будут доставляться асинхронно, то операцию чтения мы запустим в отдельной горутине.
messages, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("failed to register a consumer. Error: %s", err)
}
var forever chan struct{}
go func() {
for message := range messages {
log.Printf("received a message: %s", message.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
Пожалуйста, имейте в виду, что это всего лишь демонстрация базовых концепций RabbitMQ. Тут не рассматриваются многие важные темы, а примеры специально являются упрощениями для лучшего понимания. Обязательно нужно изучить такие темы, как управление соединением, обработка ошибок, восстановление соединения, параллелизм и сбор метрик, в основном опущены для краткости. Такой упрощенный код нельзя считать готовым к эксплуатации.
Рекомендуемые темы для прочтения: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring.
kosbic
Статья полезная, как для новичка, но вот этого не хватает. Было бы неплохое, если будут затронуты эти темы в будущих статьях.
kenshin_himura Автор
Я лишь делаю перевод документации, если руки дойдут перевести эти темы на русский, то обязательно приложу ссылки.