Брокер сообщений RabbitMQ уже давно активно используется в микросервисах. Он используется, когда требуется асинхронная обработка сообщений от клиентов или при аналогичном межсервисном взаимодействии.
Практически нет языка, под который не была бы сделана соответствующая клиентская библиотека. Для Go такой библиотекой является github.com/streadway/amqp (далее по тексту библиотека amqp). Она имеет широкий функционал, можно подключаться к RabbitMQ, создавать каналы, настраивать очереди и exchange. Не хватает только самой малости – реконнектов. А именно автоматических реконнектов при потери связи.
Поиск в Google показывает, что есть много различных решений на базе библиотеки amqp. На проекте, где я работаю, мы создали ещё парочку. Но ни найденные в сети, ни уже созданные не устраивали по ряду причин:
раздельное обслуживание консьюмера и продюсера – под каждого свой коннект, а документация на RabbitMQ настойчиво не рекомендует плодить подключения и вместо этого использовать каналы (каналы в amqp это легковесные соединения поверх TCP-подключения) поверх одного подключения;
сложные конструкции пула каналов, а то и вовсе их отсутствие – с точки зрения потокобезопасности, как минимум для консьюмера и продюсера, нужно разделять каналы;
отсутствие поддержки backoffPolicy;
отсутствие graceful shutdown.
Сформулируем требования к желаемому решению:
возможность создать общее подключение для консьюмера и продюсера;
простой и прозрачный пул каналов;
поддержка backoffPolicy;
автоматический реконнект при потере соединения;
поддержка graceful shutdown.
Требования появились, можно приступать к реализации. Сразу оговорюсь, статья ориентирована на тех, кто уже хорошо знаком с библиотекой amqp и не ставит своей целью перевести документацию.
"Новый велосипед с треугольными колёсами"
Первый пункт из озвученных потребностей самый простой, просто создаём одно подключение и используем его для всех последующих манипуляций.
С пулом каналов тоже было решено пойти по простому пути и создать map
с ключом в виде следующего объекта:
type ChannelPoolItemKey struct {
Queue string
Consumer string
Exchange string
Key string
}
Такой ключ можно использовать сразу и для консьюмеров и паблишеров. Как было сказано выше, каналы между ними не должны пересекаться для повышения потокобезопасности.
Реализовать backoffPolicy тоже не сложно:
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
где backoffPolicy это массив типа time.Duration
.
Остаётся самое интересное, реконнект и graceful shutdown. Здесь нам поможет пакет golang.org/x/sync/errgroup. Он специально предназначен для управления группами рутин.
При подключении создаётся TCP-подключение и служебный канал. Последний нужен для создания exchange, очередей и биндинга очереди к exchange. Больше он ни для чего не используется, но такая логика упрощает построение пула каналов. Например, при создании exchange не известен ключ роутинга, а при декларировании очереди не известно, с каким exchange она будет связана.
Публичный метод Connect
будет по совместительству контролировать подключение. А приватный метод connect
будет создавать само подключение и пул каналов. Ниже приведён код подключения.
func (c *Connection) connect(_ context.Context) error {
var err error
if c.conn, err = amqp.Dial(c.dsn); err != nil {
return errors.Wrap(err, "connect to rabbitMQ")
}
if c.serviceChannel, err = c.conn.Channel(); err != nil {
return errors.Wrap(err, "create service rabbitMQ channel")
}
c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)
return nil
}
// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
if !c.isClosed {
if err := c.connect(ctx); err != nil {
return errors.Wrap(err, "connect")
}
}
c.errorGroup = errorGroup
c.chanCtx = ctx
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(ctx)
logger.Info().Msg("starting connection watcher")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
return ctx.Err()
default:
reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ connection unexpected closed")
c.mu.Lock()
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
c.mu.Unlock()
}
}
}
})
return nil
}
Как видно, основная идея была связана с мьютексом mu
, который будет блокировать возможность получить доступ к оригинальному коннекту (из библиотеки amqp). Т.е. если происходит какая-то ошибка, консьюмер и продюсер должны попытаться переподключиться, они наткнуться на блокировку и будут ждать восстановления подключения. Как только блокировка будет снята, они смогут заново полностью инициализироваться.
Не забываем, что на стороне сервера может закрыться не только подключение, но и каналы. Для этого по аналогии с подключением используется метод NotifyClose, который регистрирует слушателя для событий о закрытии канала или подключения. Если канал закрывается, то он удаляется из пула и соотвественно ошибка, которая долетит до продюсера/консьюмера вызовет повторное создание канала.
func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
c.channelPoolMu.Lock()
defer c.channelPoolMu.Unlock()
var err error
poolKey := ChannelPoolItemKey{
Exchange: exchange,
Key: key,
Queue: queue,
Consumer: consumer,
}
ch, ok := c.channelPool[poolKey]
if !ok {
ch, err = c.conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "create channel")
}
c.channelPool[poolKey] = ch
c.chanWatcher(poolKey)
}
return ch, nil
}
func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
ch := c.channelPool[poolKey]
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(c.chanCtx)
logger.Info().Msg("starting channel watcher")
for {
select {
case <-c.chanCtx.Done():
logger.Info().Msg("channel watcher stopped")
return c.chanCtx.Err()
default:
reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
c.channelPoolMu.Lock()
delete(c.channelPool, poolKey)
c.channelPoolMu.Unlock()
return nil
}
}
}
})
}
После создания подключения переходим к его закрытию и отображению состояния:
func (c *Connection) Close(_ context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
c.isClosed = true
for _, ch := range c.channelPool {
if err := ch.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ channel")
}
}
if err := c.conn.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ connection")
}
return nil
}
func (c *Connection) IsClosed() bool {
return c.isClosed
}
Сам Connection
, который реализует всё вышеописанное, представлен ниже.
type Connection struct {
dsn string
backoffPolicy []time.Duration
conn *amqp.Connection
serviceChannel *amqp.Channel
mu sync.RWMutex
channelPool map[ChannelPoolItemKey]*amqp.Channel
channelPoolMu sync.RWMutex
isClosed bool
errorGroup *errgroup.Group
chanCtx context.Context
}
Конечно не хорошо передавать контекст в структуру. Но это было сделано сознательно, чтобы обёртки над стандартными методами библиотеки amqp были взаимозаменяемы с ними.
Ниже код обёрток над стандартными методами библиотеки amqp:
func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}
func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}
func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}
func (c *Connection) Consume(
queue, consumer string,
autoAck, exclusive, noLocal, noWait bool,
args amqp.Table) (<-chan amqp.Delivery, error) {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool("", "", queue, consumer)
if err != nil {
return nil, errors.Wrap(err, "get channel from pool")
}
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool(exchange, key, "", "")
if err != nil {
return errors.Wrap(err, "get channel from pool")
}
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
Consumer
В конструктор консьюмера передаётся созданное подключение, а далее запускается подписка на события из очереди. Подписка запускается в отдельной рутине, если происходит ошибка, текущая рутина закрывается и создаётся новая.
func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
logger := zerolog.Ctx(ctx)
var msg <-chan amqp.Delivery
var err error
for {
if msg, err = c.connect(ctx); err != nil {
logger.Err(err).Msg("connect consumer to rabbitMQ")
time.Sleep(10 * time.Second)
continue
}
break
}
logger.Info().Msg("consumer connected")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
if err := subscriber.Shutdown(ctx); err != nil {
logger.Err(err).Msg("shutdown handler")
}
return ctx.Err()
case d, ok := <-msg:
if ok {
logger.Debug().Msgf("got new event %+v", string(d.Body))
if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
logger.Err(errConsume).Msg("consume message")
}
if err := d.Ack(true); err != nil {
logger.Err(err).Msg("ack")
}
} else {
if c.conn.IsClosed() {
return nil
}
logger.Info().Msg("try to reconnect consumer")
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
}
}
}
// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
Обработка полученного сообщения выполняется в методе Consume
переданного в консьюмер подписчика, реализующего интерфейс Subscriber
.
type Subscriber interface {
Consume(ctx context.Context, data []byte) error
Shutdown(ctx context.Context) error
}
В этом интерфейсе также есть метод Shutdown
для действий при штатной остановки консьюмера.
В приватном методе connect
выполняется создание exchange, очереди, биндиг очереди к exchange и создание канала на прослушивание событий.
func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return nil, errors.Wrap(err, "declare a exchange")
}
if _, err := c.conn.QueueDeclare(
c.config.RabbitQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
return nil, errors.Wrap(err, "declare a queue")
}
if err := c.conn.QueueBind(
c.config.RabbitQueue, // queue name
c.config.RoutingKey, // routing key
c.config.ExchangeName, // exchange
false,
nil,
); err != nil {
return nil, errors.Wrap(err, "bind to queue")
}
msg, err := c.conn.Consume(
c.config.RabbitQueue, // queue
c.config.RabbitConsume, // consume
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, errors.Wrap(err, "consume message")
}
return msg, nil
}
Publisher
Также как и при создании консьюмера в конструктор паблишера передаётся созданное подключение. При первой попытке опубликовать создаётся exchange для публикаций. Если при публикации возникает ошибка, то пытаемся ещё раз. Если вторая попытка не удалась, то тогда возвращаем ошибку вызвавшему методу.
func (p *Publisher) connect(_ context.Context) error {
p.muConn.Lock()
defer p.muConn.Unlock()
if p.isConnected {
return nil
}
if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return errors.Wrap(err, "declare a exchange")
}
p.isConnected = true
return nil
}
// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
logger := zerolog.Ctx(ctx)
body, err := json.Marshal(message)
if err != nil {
return errors.Wrap(err, "marshal message")
}
ampqMsg := buildMessage(body)
logger.Debug().Msgf("send message: %s", string(body))
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
// We try to send message twice. Between attempts we try to reconnect.
if err := p.sendMessage(ctx, ampqMsg); err != nil {
if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
return errors.Wrap(errBadMsg, "count bad messages")
}
return errors.Wrap(errRetryPub, "retry publish a message")
}
}
if err := p.okMessages(ctx); err != nil {
return errors.Wrap(err, "count ok messages")
}
return nil
}
func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
logger := zerolog.Ctx(ctx)
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
if err := p.conn.Publish(
p.config.ExchangeName,
p.config.RoutingKey,
false,
false,
*ampqMsg,
); err != nil {
p.muConn.Lock()
p.isConnected = false
p.muConn.Unlock()
return errors.Wrap(err, "publish a message")
}
return nil
}
Методы badMessages
и okMessages
используются для подсчёта статистки успеха отправки сообщений. buildMessage
небольшой хелпер для подготовки сообщения для отправки.
Заключение
Написанный код ещё плохо покрыт тестами. Планируется использовать тесты с использованием докера, чтобы проверять функционал на реальном RabbitMQ. Но есть тестовый микросервис, который использует данный функционал. При его запуске можно отправить в очередь консьюмера событие, которое будет обработано сервисом и приведёт к отправке сообщения паблишером. При перезапуске RabbitMQ микросервис автоматически переподключается. Остановка тестового микросервиса также выполняется штатно.
gBear
Вот всегда мне была интересна эта наша, скажем там, чисто-ITшная "особенность"... Ну есть же "спека", ну возьми и сделай все "по черчежу". Но - нет... "треугольные колеса" :-)
Хм... а что этот "пул" должен решать? Каналы же (если речь о каналах amqp) не шарятся. Вы не можете, грубо говоря, сделать consume в одном канале, а cancel сделать в другом. Какой смысл в этом "пуле"?
Ну и с точки зрения прикладной логики, кланал - это *сессия*. Она начинается с открытия канала, и заканачивается его закрытием. Мигрировать на другой канал она ("просто так") не может.
То, что каналы для консумеров и паблишеров "не должны пересекаться" - это, мягко говоря, бред. Каким образом - при таком "непересечении" - вы собераетесь обеспечивать, например, транзакционность?!
За такое, в приличном обществе, я извиняюсь, "бьют канделябрами". Каким образом, при таком подходе, вы собираетесь обеспечивать, например, экслюзивность деклараций?
Дальше просто уже не смог читать :-(
Ребят... ну реально, ну вы сначала хотяб разберитесь с "как оно работает". Ну "спеки" все открытые - это не какой-то там секрет. И "спеки" нулевой серии amqp - это именно, что *спецификации* - в инженерном смысле... а не в нашем... ITшном, так сказать. Там же даже придумывать ничего не надо. Все описано/расписано - бери и делай.
ssa-company Автор
Смысл, чтобы отслеживать созданные каналы и если попросят тот же самый, например, для паблишера в тот же самый exchange, в который уже отправляли сообщение, не создавать канал заново, а переиспользовать тот же самый. Так же он позволяет пройтись по всем каналам при закрытии соединения с RabbitMQ и аккуратно закрыть сначала каналы, а потом и подключение.
Простите, но причём тут транзакционность? Отправка и получение сообщение это одна операция, а не десять последовательных шагов. И без разницы, на каком канале я это делаю.
Да, консумер и паблишер могут прекрасно работать на одном канале. Ничего не сломается, наверное. Но лучше же их наверно разнести?
Вы точно знакомы с RabbitMQ? Ниже кусок из документации на github.com/streadway/amqp
gBear
Сдается мне, вы совсем не понимаете, что такое каналы в amqp :-) При чем тут, вообще exchange?! Еще раз... что со стороны сервера, что со стороны клиента - канал - это сессия. Более того... до 0.10 для сервера, конкретный канал - это не просто сессия... это ещё и "привязка" конкретного экземпляра клиента... со всеми его qos, таймингами, фреймингами и т.п.
А если же вы "шарите" канал по прикладной логике (что само по себе не сильно хорошо... ибо не понятно, как recovery делать, например... но не суть) - то, имхо, сильно проще делать это явно... не привязываясь к каким-то exchange... а то можно ненароком подумать, что паблишер не может в разные exchange сообщения посылать. Или, что ему для этого разные каналы нужны :-)
?! Оно, насколько я помню, всю жизнь работало так: посылаешь серверу connection.close, он тебе в ответ засылает пачку channel.close, ты их обрабатываешь, в конце концов получая от сервера connection.close-ok... после чего закрываешь сокет.
В любом случае, хранить каналы в отрыве от родительского подключения - это такое. Ну не умеет "девятка" в нормальный детач. "Десятка" - умеет. Но "кролики" - не умеют в "десятку" :-)
Если прием "запроса" (грубо, basic.ack, как ответ на basic.delivery), и отправка ответа (грубо, basic.publish) происходят в разных каналах, то вы не сможете связать эти две команды через tx.commit. Они обе в одном канале с tx.commit должны быть, который должен быть в том же канале, из которого получен basic.delivery. А это - в свою очередь - будет тот канал в котором был basic.consume.
Так понятнее?
Вот как раз с точностью до наоброт. Если этого одна "логика" - например, какой-нибудь сервис типа запрос/ответ - то прием запроса, и отправка ответа могут конечно "жить" в разных каналах. Но это, скажем так, сильно сужает использование возможностей amqp.
А причем тут exchange?! У него просто нет экслюзивности. В отличие, например, от queue... а их, как я понял, вы тож через свой "служебный канал" объявляете. Т.е. - если на пальцах - очереди, внезапно, могут быть и "приватными"... консумить которые можно только через тот же канал, через который они были объявлены. И это не просто так придумано :-)
ssa-company Автор
Мне кажется, проблема в том, что я явно не подсветил в начале статьи - я не делаю ещё одну реализацию протокола amqp. Я использую готовую библиотеку, где всё уже сделано за меня. Но там нет автореконнекта.
Да, канал это сессия. Под exchange в моём ответе имелось ввиду его имя, чтобы по имени в пуле на один и тот же exchange вернуть один и тот же канал. Т.е. какдый раз, когда будут отправлять сообщение в один и тот же exchange, будет использоваться одна и таже сессия. Причём у меня не только коннект описан, но и паблишер, который получит строго один раз сессию и будет её использовать до получения ошибки или до его удаления. При получении ошибки канал/сессия будут пересозданы и опять будут единственные для этого паблишера.
Также и с косьюмером, он строго один раз получает сессию по имени очереди и не меняет её до ошибки или удаления консьюмера.
Я никак не подменяю логику работы с каналами в github.com/streadway/amqp. Сделана просто обёртка над ней для реконнекта. И пока косьюмер/паблишер жив, он использует один и тот же канал, который он получит при запуске.
Как это возможно в моей реализации? Здесь указывается конкретный exchange для паблишера при его создании.
Повторю, я не выдумываю свою реализацию взаимодействия по протоколу amqp, используется готовая библиотека github.com/streadway/amqp, я сделал только реконнект поверх неё.
Вы расписываете про то, как на уровне протокола работает, я это знаю. Но я не пишу свою реализацию, зачем, если есть хорошая библиотека.
Закрытие каналов в github.com/streadway/amqp это не только отправить сообщения по протоколу, это ещё и грохнуть созданные структуры. И для прозрачности сделано по примеру github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go, создали подключение, потом канал, при завершении выполняем в обратном порядке.
Поясните, в чём сужение? Напомню, я не делаю свою реализацию протокола.
Exclusive (used by only one connection and the queue will be deleted when that connection closes)
An exclusive queue can only be used (consumed from, purged, deleted, etc) by its declaring connection
Из документации на RabbitMQ, где здесь речь про канал? А соединение у меня одно.
mayorovp
Ну вот в том и сужение возможностей, что протокол позволяет слать сообщения в разные exchange в рамках одной сессии, а вы не позволяете.
ssa-company Автор
Ну так у меня сам паблишер ограничен только конкретным exchange, ему он передаётся при создании. Был бы в моей реализации паблишер, который шлёт в разные exchange, тогда да, это было серьёзное сужение, т.к. в текущей реализации я бы городил отдельные сессии. Я подумаю в эту сторону.
Сейчас можно выдернуть сессию через GetChannelFromPool и послать через неё в другой Exchange.