Брокер сообщений RabbitMQ уже давно активно используется в микросервисах. Он используется, когда требуется асинхронная обработка сообщений от клиентов или при аналогичном межсервисном взаимодействии.

Практически нет языка, под который не была бы сделана соответствующая клиентская библиотека. Для Go такой библиотекой является github.com/streadway/amqp (далее по тексту библиотека amqp). Она имеет широкий функционал, можно подключаться к RabbitMQ, создавать каналы, настраивать очереди и exchange. Не хватает только самой малости – реконнектов. А именно автоматических реконнектов при потери связи.

Поиск в Google показывает, что есть много различных решений на базе библиотеки amqp. На проекте, где я работаю, мы создали ещё парочку. Но ни найденные в сети, ни уже созданные не устраивали по ряду причин:

  • раздельное обслуживание консьюмера и продюсера – под каждого свой коннект, а документация на RabbitMQ настойчиво не рекомендует плодить подключения и вместо этого использовать каналы (каналы в amqp это легковесные соединения поверх TCP-подключения) поверх одного подключения;

  • сложные конструкции пула каналов, а то и вовсе их отсутствие – с точки зрения потокобезопасности, как минимум для консьюмера и продюсера, нужно разделять каналы;

  • отсутствие поддержки backoffPolicy;

  • отсутствие graceful shutdown.

Сформулируем требования к желаемому решению:

  • возможность создать общее подключение для консьюмера и продюсера;

  • простой и прозрачный пул каналов;

  • поддержка backoffPolicy;

  • автоматический реконнект при потере соединения;

  • поддержка graceful shutdown.

Требования появились, можно приступать к реализации. Сразу оговорюсь, статья ориентирована на тех, кто уже хорошо знаком с библиотекой amqp и не ставит своей целью перевести документацию.

"Новый велосипед с треугольными колёсами"

Первый пункт из озвученных потребностей самый простой, просто создаём одно подключение и используем его для всех последующих манипуляций.

С пулом каналов тоже было решено пойти по простому пути и создать map с ключом в виде следующего объекта:

type ChannelPoolItemKey struct {
    Queue    string
    Consumer string
    Exchange string
    Key      string
}

Такой ключ можно использовать сразу и для консьюмеров и паблишеров. Как было сказано выше, каналы между ними не должны пересекаться для повышения потокобезопасности.

Реализовать backoffPolicy тоже не сложно:

for _, timeout := range c.backoffPolicy {
  if connErr := c.connect(ctx); connErr != nil {
    logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
    time.Sleep(timeout)
    continue
  }
  break
}

где backoffPolicy это массив типа time.Duration.

Остаётся самое интересное, реконнект и graceful shutdown. Здесь нам поможет пакет golang.org/x/sync/errgroup. Он специально предназначен для управления группами рутин.

При подключении создаётся TCP-подключение и служебный канал. Последний нужен для создания exchange, очередей и биндинга очереди к exchange. Больше он ни для чего не используется, но такая логика упрощает построение пула каналов. Например, при создании exchange не известен ключ роутинга, а при декларировании очереди не известно, с каким exchange она будет связана.

Публичный метод Connect будет по совместительству контролировать подключение. А приватный метод connect будет создавать само подключение и пул каналов. Ниже приведён код подключения.

func (c *Connection) connect(_ context.Context) error {
	var err error
	if c.conn, err = amqp.Dial(c.dsn); err != nil {
		return errors.Wrap(err, "connect to rabbitMQ")
	}

	if c.serviceChannel, err = c.conn.Channel(); err != nil {
		return errors.Wrap(err, "create service rabbitMQ channel")
	}

	c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)

	return nil
}

// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
	if !c.isClosed {
		if err := c.connect(ctx); err != nil {
			return errors.Wrap(err, "connect")
		}
	}

	c.errorGroup = errorGroup
	c.chanCtx = ctx

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(ctx)
		logger.Info().Msg("starting connection watcher")

		for {
			select {
			case <-ctx.Done():
				logger.Info().Msg("connection watcher stopped")
				return ctx.Err()
			default:
				reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ connection unexpected closed")

					c.mu.Lock()
					for _, timeout := range c.backoffPolicy {
						if connErr := c.connect(ctx); connErr != nil {
							logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
							time.Sleep(timeout)
							continue
						}
						break
					}
					c.mu.Unlock()
				}
			}
		}
	})

	return nil
}

Как видно, основная идея была связана с мьютексом mu, который будет блокировать возможность получить доступ к оригинальному коннекту (из библиотеки amqp). Т.е. если происходит какая-то ошибка, консьюмер и продюсер должны попытаться переподключиться, они наткнуться на блокировку и будут ждать восстановления подключения. Как только блокировка будет снята, они смогут заново полностью инициализироваться.

Не забываем, что на стороне сервера может закрыться не только подключение, но и каналы. Для этого по аналогии с подключением используется метод NotifyClose, который регистрирует слушателя для событий о закрытии канала или подключения. Если канал закрывается, то он удаляется из пула и соотвественно ошибка, которая долетит до продюсера/консьюмера вызовет повторное создание канала.

func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
	c.channelPoolMu.Lock()
	defer c.channelPoolMu.Unlock()
	var err error
	poolKey := ChannelPoolItemKey{
		Exchange: exchange,
		Key:      key,
		Queue:    queue,
		Consumer: consumer,
	}
	ch, ok := c.channelPool[poolKey]
	if !ok {
		ch, err = c.conn.Channel()
		if err != nil {
			return nil, errors.Wrap(err, "create channel")
		}
		c.channelPool[poolKey] = ch
		c.chanWatcher(poolKey)
	}

	return ch, nil
}

func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
	ch := c.channelPool[poolKey]

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(c.chanCtx)
		logger.Info().Msg("starting channel watcher")

		for {
			select {
			case <-c.chanCtx.Done():
				logger.Info().Msg("channel watcher stopped")
				return c.chanCtx.Err()
			default:
				reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
					c.channelPoolMu.Lock()
					delete(c.channelPool, poolKey)
					c.channelPoolMu.Unlock()
					return nil
				}
			}
		}
	})
}

После создания подключения переходим к его закрытию и отображению состояния:

func (c *Connection) Close(_ context.Context) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	c.isClosed = true

	for _, ch := range c.channelPool {
		if err := ch.Close(); err != nil {
			return errors.Wrap(err, "close rabbitMQ channel")
		}
	}

	if err := c.conn.Close(); err != nil {
		return errors.Wrap(err, "close rabbitMQ connection")
	}

	return nil
}

func (c *Connection) IsClosed() bool {
	return c.isClosed
}

Сам Connection, который реализует всё вышеописанное, представлен ниже.

type Connection struct {
	dsn            string
	backoffPolicy  []time.Duration
	conn           *amqp.Connection
	serviceChannel *amqp.Channel
	mu             sync.RWMutex
	channelPool    map[ChannelPoolItemKey]*amqp.Channel
	channelPoolMu  sync.RWMutex
	isClosed       bool
	errorGroup     *errgroup.Group
	chanCtx        context.Context
}

Конечно не хорошо передавать контекст в структуру. Но это было сделано сознательно, чтобы обёртки над стандартными методами библиотеки amqp были взаимозаменяемы с ними.

Ниже код обёрток над стандартными методами библиотеки amqp:

func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}

func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}

func (c *Connection) Consume(
	queue, consumer string,
	autoAck, exclusive, noLocal, noWait bool,
	args amqp.Table) (<-chan amqp.Delivery, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool("", "", queue, consumer)
	if err != nil {
		return nil, errors.Wrap(err, "get channel from pool")
	}

	return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}

// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool(exchange, key, "", "")
	if err != nil {
		return errors.Wrap(err, "get channel from pool")
	}

	return ch.Publish(exchange, key, mandatory, immediate, msg)
}

Consumer

В конструктор консьюмера передаётся созданное подключение, а далее запускается подписка на события из очереди. Подписка запускается в отдельной рутине, если происходит ошибка, текущая рутина закрывается и создаётся новая.

func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	logger := zerolog.Ctx(ctx)
	var msg <-chan amqp.Delivery
	var err error

	for {
		if msg, err = c.connect(ctx); err != nil {
			logger.Err(err).Msg("connect consumer to rabbitMQ")
			time.Sleep(10 * time.Second)
			continue
		}
		break
	}

	logger.Info().Msg("consumer connected")

	for {
		select {
		case <-ctx.Done():
			logger.Info().Msg("connection watcher stopped")
			if err := subscriber.Shutdown(ctx); err != nil {
				logger.Err(err).Msg("shutdown handler")
			}
			return ctx.Err()
		case d, ok := <-msg:
			if ok {
				logger.Debug().Msgf("got new event %+v", string(d.Body))
				if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
					logger.Err(errConsume).Msg("consume message")
				}
				if err := d.Ack(true); err != nil {
					logger.Err(err).Msg("ack")
				}
			} else {
				if c.conn.IsClosed() {
					return nil
				}

				logger.Info().Msg("try to reconnect consumer")
				errorGroup.Go(func() error {
					return c.subscribe(ctx, errorGroup, subscriber)
				})
				return nil
			}
		}
	}
}

// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	errorGroup.Go(func() error {
		return c.subscribe(ctx, errorGroup, subscriber)
	})

	return nil
}

Обработка полученного сообщения выполняется в методе Consume переданного в консьюмер подписчика, реализующего интерфейс Subscriber.

type Subscriber interface {
	Consume(ctx context.Context, data []byte) error
	Shutdown(ctx context.Context) error
}

В этом интерфейсе также есть метод Shutdown для действий при штатной остановки консьюмера.

В приватном методе connect выполняется создание exchange, очереди, биндиг очереди к exchange и создание канала на прослушивание событий.

func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
	if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return nil, errors.Wrap(err, "declare a exchange")
	}

	if _, err := c.conn.QueueDeclare(
		c.config.RabbitQueue, // name
		true,                 // durable
		false,                // delete when unused
		false,                // exclusive
		false,                // no-wait
		nil,                  // arguments
	); err != nil {
		return nil, errors.Wrap(err, "declare a queue")
	}

	if err := c.conn.QueueBind(
		c.config.RabbitQueue,  // queue name
		c.config.RoutingKey,   // routing key
		c.config.ExchangeName, // exchange
		false,
		nil,
	); err != nil {
		return nil, errors.Wrap(err, "bind to queue")
	}

	msg, err := c.conn.Consume(
		c.config.RabbitQueue,   // queue
		c.config.RabbitConsume, // consume
		false,                  // auto-ack
		false,                  // exclusive
		false,                  // no-local
		false,                  // no-wait
		nil,                    // args
	)
	if err != nil {
		return nil, errors.Wrap(err, "consume message")
	}

	return msg, nil
}

Publisher

Также как и при создании консьюмера в конструктор паблишера передаётся созданное подключение. При первой попытке опубликовать создаётся exchange для публикаций. Если при публикации возникает ошибка, то пытаемся ещё раз. Если вторая попытка не удалась, то тогда возвращаем ошибку вызвавшему методу.

func (p *Publisher) connect(_ context.Context) error {
	p.muConn.Lock()
	defer p.muConn.Unlock()
	if p.isConnected {
		return nil
	}

	if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return errors.Wrap(err, "declare a exchange")
	}

	p.isConnected = true

	return nil
}

// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
	logger := zerolog.Ctx(ctx)

	body, err := json.Marshal(message)
	if err != nil {
		return errors.Wrap(err, "marshal message")
	}

	ampqMsg := buildMessage(body)

	logger.Debug().Msgf("send message: %s", string(body))

	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	// We try to send message twice. Between attempts we try to reconnect.
	if err := p.sendMessage(ctx, ampqMsg); err != nil {
		if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
			if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
				return errors.Wrap(errBadMsg, "count bad messages")
			}
			return errors.Wrap(errRetryPub, "retry publish a message")
		}
	}

	if err := p.okMessages(ctx); err != nil {
		return errors.Wrap(err, "count ok messages")
	}

	return nil
}

func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
	logger := zerolog.Ctx(ctx)
	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	if err := p.conn.Publish(
		p.config.ExchangeName,
		p.config.RoutingKey,
		false,
		false,
		*ampqMsg,
	); err != nil {
		p.muConn.Lock()
		p.isConnected = false
		p.muConn.Unlock()
		return errors.Wrap(err, "publish a message")
	}
	return nil
}

Методы badMessages и okMessages используются для подсчёта статистки успеха отправки сообщений. buildMessage небольшой хелпер для подготовки сообщения для отправки.

Заключение

Написанный код ещё плохо покрыт тестами. Планируется использовать тесты с использованием докера, чтобы проверять функционал на реальном RabbitMQ. Но есть тестовый микросервис, который использует данный функционал. При его запуске можно отправить в очередь консьюмера событие, которое будет обработано сервисом и приведёт к отправке сообщения паблишером. При перезапуске RabbitMQ микросервис автоматически переподключается. Остановка тестового микросервиса также выполняется штатно.

Ссылки

Комментарии (6)


  1. gBear
    05.01.2022 23:11
    +1

    Вот всегда мне была интересна эта наша, скажем там, чисто-ITшная "особенность"... Ну есть же "спека", ну возьми и сделай все "по черчежу". Но - нет... "треугольные колеса" :-)

    простой и прозрачный пул каналов;

    Хм... а что этот "пул" должен решать? Каналы же (если речь о каналах amqp) не шарятся. Вы не можете, грубо говоря, сделать consume в одном канале, а cancel сделать в другом. Какой смысл в этом "пуле"?

    Ну и с точки зрения прикладной логики, кланал - это *сессия*. Она начинается с открытия канала, и заканачивается его закрытием. Мигрировать на другой канал она ("просто так") не может.

    Такой ключ можно использовать сразу и для консумеров и паблишеров. Как было сказано выше, каналы между ними не должны пересекаться для повышения потокобезопасности.

    То, что каналы для консумеров и паблишеров "не должны пересекаться" - это, мягко говоря, бред. Каким образом - при таком "непересечении" - вы собераетесь обеспечивать, например, транзакционность?!

    При подключении создаётся TCP-подключение и служебный канал. Последний нужен для создания exchange, очередей и биндинга очереди к exchange. 

    За такое, в приличном обществе, я извиняюсь, "бьют канделябрами". Каким образом, при таком подходе, вы собираетесь обеспечивать, например, экслюзивность деклараций?

    Дальше просто уже не смог читать :-(

    Ребят... ну реально, ну вы сначала хотяб разберитесь с "как оно работает". Ну "спеки" все открытые - это не какой-то там секрет. И "спеки" нулевой серии amqp - это именно, что *спецификации* - в инженерном смысле... а не в нашем... ITшном, так сказать. Там же даже придумывать ничего не надо. Все описано/расписано - бери и делай.


    1. ssa-company Автор
      06.01.2022 00:17
      -1

      Хм... а что этот "пул" должен решать? Каналы же (если речь о каналах amqp) не шарятся. Вы не можете, грубо говоря, сделать consume в одном канале, а cancel сделать в другом. Какой смысл в этом "пуле"?

      Смысл, чтобы отслеживать созданные каналы и если попросят тот же самый, например, для паблишера в тот же самый exchange, в который уже отправляли сообщение, не создавать канал заново, а переиспользовать тот же самый. Так же он позволяет пройтись по всем каналам при закрытии соединения с RabbitMQ и аккуратно закрыть сначала каналы, а потом и подключение.

      То, что каналы для консумеров и паблишеров "не должны пересекаться" - это, мягко говоря, бред. Каким образом - при таком "непересечении" - вы собераетесь обеспечивать, например, транзакционность?!

      Простите, но причём тут транзакционность? Отправка и получение сообщение это одна операция, а не десять последовательных шагов. И без разницы, на каком канале я это делаю.

      Да, консумер и паблишер могут прекрасно работать на одном канале. Ничего не сломается, наверное. Но лучше же их наверно разнести?

      За такое, в приличном обществе, я извиняюсь, "бьют канделябрами". Каким образом, при таком подходе, вы собираетесь обеспечивать, например, экслюзивность деклараций?

      Вы точно знакомы с RabbitMQ? Ниже кусок из документации на github.com/streadway/amqp

      ExchangeDeclare declares an exchange on the server. If the exchange does notalready exist, the server will create it.  If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.


      1. gBear
        06.01.2022 02:08

        Смысл, чтобы отслеживать созданные каналы и если попросят тот же самый, например, для паблишера в тот же самый exchange, в который уже отправляли сообщение, не создавать канал заново, а переиспользовать тот же самый.

        Сдается мне, вы совсем не понимаете, что такое каналы в amqp :-) При чем тут, вообще exchange?! Еще раз... что со стороны сервера, что со стороны клиента - канал - это сессия. Более того... до 0.10 для сервера, конкретный канал - это не просто сессия... это ещё и "привязка" конкретного экземпляра клиента... со всеми его qos, таймингами, фреймингами и т.п.

        А если же вы "шарите" канал по прикладной логике (что само по себе не сильно хорошо... ибо не понятно, как recovery делать, например... но не суть) - то, имхо, сильно проще делать это явно... не привязываясь к каким-то exchange... а то можно ненароком подумать, что паблишер не может в разные exchange сообщения посылать. Или, что ему для этого разные каналы нужны :-)

        Так же он позволяет пройтись по всем каналам при закрытии соединения с RabbitMQ и аккуратно закрыть сначала каналы, а потом и подключение.

        ?! Оно, насколько я помню, всю жизнь работало так: посылаешь серверу connection.close, он тебе в ответ засылает пачку channel.close, ты их обрабатываешь, в конце концов получая от сервера connection.close-ok... после чего закрываешь сокет.

        В любом случае, хранить каналы в отрыве от родительского подключения - это такое. Ну не умеет "девятка" в нормальный детач. "Десятка" - умеет. Но "кролики" - не умеют в "десятку" :-)

        Простите, но причём тут транзакционность? Отправка и получение сообщение это одна операция, а не десять последовательных шагов. И без разницы, на каком канале я это делаю.

        Если прием "запроса" (грубо, basic.ack, как ответ на basic.delivery), и отправка ответа (грубо, basic.publish) происходят в разных каналах, то вы не сможете связать эти две команды через tx.commit. Они обе в одном канале с tx.commit должны быть, который должен быть в том же канале, из которого получен basic.delivery. А это - в свою очередь - будет тот канал в котором был basic.consume.

        Так понятнее?

        Да, консумер и паблишер могут прекрасно работать на одном канале. Ничего не сломается, наверное. Но лучше же их наверно разнести?

        Вот как раз с точностью до наоброт. Если этого одна "логика" - например, какой-нибудь сервис типа запрос/ответ - то прием запроса, и отправка ответа могут конечно "жить" в разных каналах. Но это, скажем так, сильно сужает использование возможностей amqp.

        Вы точно знакомы с RabbitMQ?

        А причем тут exchange?! У него просто нет экслюзивности. В отличие, например, от queue... а их, как я понял, вы тож через свой "служебный канал" объявляете. Т.е. - если на пальцах - очереди, внезапно, могут быть и "приватными"... консумить которые можно только через тот же канал, через который они были объявлены. И это не просто так придумано :-)


        1. ssa-company Автор
          06.01.2022 13:37
          -1

          Мне кажется, проблема в том, что я явно не подсветил в начале статьи - я не делаю ещё одну реализацию протокола amqp. Я использую готовую библиотеку, где всё уже сделано за меня. Но там нет автореконнекта.

          Сдается мне, вы совсем не понимаете, что такое каналы в amqp :-) При чем тут, вообще exchange?! Еще раз... что со стороны сервера, что со стороны клиента - канал - это сессия. Более того... до 0.10 для сервера, конкретный канал - это не просто сессия... это ещё и "привязка" конкретного экземпляра клиента... со всеми его qos, таймингами, фреймингами и т.п.

          Да, канал это сессия. Под exchange в моём ответе имелось ввиду его имя, чтобы по имени в пуле на один и тот же exchange вернуть один и тот же канал. Т.е. какдый раз, когда будут отправлять сообщение в один и тот же exchange, будет использоваться одна и таже сессия. Причём у меня не только коннект описан, но и паблишер, который получит строго один раз сессию и будет её использовать до получения ошибки или до его удаления. При получении ошибки канал/сессия будут пересозданы и опять будут единственные для этого паблишера.

          Также и с косьюмером, он строго один раз получает сессию по имени очереди и не меняет её до ошибки или удаления консьюмера.

          Я никак не подменяю логику работы с каналами в github.com/streadway/amqp. Сделана просто обёртка над ней для реконнекта. И пока косьюмер/паблишер жив, он использует один и тот же канал, который он получит при запуске.

          а то можно ненароком подумать, что паблишер не может в разные exchange сообщения посылать. Или, что ему для этого разные каналы нужны :-)

          Как это возможно в моей реализации? Здесь указывается конкретный exchange для паблишера при его создании.

          ?! Оно, насколько я помню, всю жизнь работало так: посылаешь серверу connection.close, он тебе в ответ засылает пачку channel.close, ты их обрабатываешь, в конце концов получая от сервера connection.close-ok... после чего закрываешь сокет.

          Повторю, я не выдумываю свою реализацию взаимодействия по протоколу amqp, используется готовая библиотека github.com/streadway/amqp, я сделал только реконнект поверх неё.

          Вы расписываете про то, как на уровне протокола работает, я это знаю. Но я не пишу свою реализацию, зачем, если есть хорошая библиотека.

          Закрытие каналов в github.com/streadway/amqp это не только отправить сообщения по протоколу, это ещё и грохнуть созданные структуры. И для прозрачности сделано по примеру github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go, создали подключение, потом канал, при завершении выполняем в обратном порядке.

          Вот как раз с точностью до наоброт. Если этого одна "логика" - например, какой-нибудь сервис типа запрос/ответ - то прием запроса, и отправка ответа могут конечно "жить" в разных каналах. Но это, скажем так, сильно сужает использование возможностей amqp.

          Поясните, в чём сужение? Напомню, я не делаю свою реализацию протокола.

          А причем тут exchange?! У него просто нет экслюзивности. В отличие, например, от queue... а их, как я понял, вы тож через свой "служебный канал" объявляете. Т.е. - если на пальцах - очереди, внезапно, могут быть и "приватными"... консумить которые можно только через тот же канал, через который они были объявлены. И это не просто так придумано :-)

          • Exclusive (used by only one connection and the queue will be deleted when that connection closes)

          • An exclusive queue can only be used (consumed from, purged, deleted, etc) by its declaring connection

          Из документации на RabbitMQ, где здесь речь про канал? А соединение у меня одно.


          1. mayorovp
            06.01.2022 13:44

            Как это возможно в моей реализации? Здесь указывается конкретный exchange для паблишера при его создании.

            Поясните, в чём сужение? Напомню, я не делаю свою реализацию протокола.

            Ну вот в том и сужение возможностей, что протокол позволяет слать сообщения в разные exchange в рамках одной сессии, а вы не позволяете.


            1. ssa-company Автор
              06.01.2022 15:15
              -1

              Ну так у меня сам паблишер ограничен только конкретным exchange, ему он передаётся при создании. Был бы в моей реализации паблишер, который шлёт в разные exchange, тогда да, это было серьёзное сужение, т.к. в текущей реализации я бы городил отдельные сессии. Я подумаю в эту сторону.

              Сейчас можно выдернуть сессию через GetChannelFromPool и послать через неё в другой Exchange.