Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool

Паттерны конкурентности в Go. Подробный разбор. Часть 2. Fan-Out/Fan-In

Pub/Sub

Pub/sub, сокращение от «publish-subscribe» (публикация-подписка), — это паттерн обмена сообщениями между различными частями приложения. Прелесть такого подхода в том, что отправитель сообщения не знает о получателе, а получатель не знает об отправителе. Оба они знают только об одном посреднике — брокере сообщений. Отправитель публикует сообщение, а получатель подписывается на получение сообщений. В итоге это кардинально уменьшает связанность между частями приложения и неминуемую головную боль ею вызванную.

Пример:
В приложении нужно по расписанию выполнять некие действия - например рекламные рассылки или какое-нибудь закрытие дня раз в сутки. Вы реализуете сервис плнировщика задач, при старте задаете ему расписание, по которому он публикует некие уведомления. А потом в любом месте приложения просто подписываетесь на выбранные уведомления и выполняете нужные действия при их получении. Это просто, это удобно, это гибко, это легко масштабировать. Это Pub/Sub.

В Go паттерн Pub/Sub можно реализовать несколькими способами. Один из самых простых и эффективных — использование каналов. Каналы в Go являются потокобезопасными и позволяют передавать данные между горутинами максимально простым способом. Собственно, известные брокеры сообщений (RabbitMQ, Kafka и т.д.) - это предельная реализация паттерна Pub/Sub для обмена между приложениями (микросервисами). Мы же в данной статье рассмотрим, как реализовать Pub/Sub внутри приложения с использованием каналов.

Реализация

Как правило, сообщение издателя имеют два основных поля: тема (topic) и сообщение (message). Подписчики могут подписываться на определенные темы и получать все сообщения, опубликованные в этих темах. В нашем примере структура сообщения представлены типом topics map[string][]chan any. Мапа в данном случае идеально подходит. Ключом мапы является тема, а значением - срез каналов (подписчиков), по которым будем рассылать произвольные данные. В момент подписки для нового подписчика создается канал, который добавляется в срез каналов для соответствующей темы.

Notifier

Начнем с минимальной струтуры брокера сообщений:

type Notifier struct {
	topics map[string][]chan any
}

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
    n.topics[topic] = append(n.topics[topic], ch)

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

func (n *Notifier) Publish(topic string, message any) {
	for _, ch := range n.topics[topic] {
		ch <- message
	}	
}

При подписке мы передаем топик, на который подписываемся и функцию-хэндлер, которая будет вызвана при получении сообщения. У функций есть параметр any, позволяющий передавать произвольные данные сообщения. Внутри метода создается канал, который добавляется в срез каналов для соответствующей темы. Если тема еще не существует, она будет создана мапой автоматически. Затем запускается горутина, которая будет ждать сообщения из канала и вызывать обработчик для каждого сообщения. Горутина завершится, когда канал будет закрыт.

При публикации, мы сначала получаем срез подписчиков (каналов) для темы, а затем отправляем сообщение во все каналы.

Что тут не так, чего не хватает? Ну как минимум - нет инициализации мапы. После объявления карты ее значение по умолчанию равно nil. Чтение из такой карты возвращает пустой результат, но попытка записи в такую карту вызывает ошибку времени выполнения. Добавим:

func NewNotifier() *Notifier {
	return &Notifier{
		topics: make(map[string][]chan any),
	}
}

Далее, в коде есть доступ к общему ресурсу из нескольких горутин без синхронизации, что приводит к data race и непредсказуемому поведению. Общий ресурс в данном случае - это мапа topics.
Добавим мьютекс:

type Notifier struct {
	topics map[string][]chan any
	mu     sync.Mutex
}

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
	
	n.mu.Lock()
    n.topics[topic] = append(n.topics[topic], ch)
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

func (n *Notifier) Publish(topic string, message any) {
	n.mu.Lock()
	chnls := n.topics[topic]
	n.mu.Unlock()

	for _, ch := range chnls {
		ch <- message
	}	
}

Выглядит неплохо на первый взляд. При подписке мы захватываем мьютекс, добавляем канал в мапу, и сразу же отпускаем. При публикации мы перед получаением среза подписчиков (каналов) для темы опять захватываем мьютекс, после получения среза - отпускаем. Пока мьютекс захвачен, код может выполняться только одной горутиной. Таким образом, наш код гарантирует, что мапа не будет одновременно читаться и записываться разными горутинами. Будто бы мы сделали всё правильно. И тем не менее, в этом коде зарыта потенциальная проблема. Чтобы понять, в чем она состоит, сначала вспомним, как устроены срезы в Go:

type slice struct {
    ptr *array
    len int
    cap int
}

Когда выполняется chnls := n.topics[topic] копируется заголовок среза, но не сам массив. Это значит, что chnls и n.topics[topic] указывают на один и тот же массив. Что тут может пойти не так? Пример:

  1. Горутина A (Publish) читает chnls в цикле.

  2. Горутина B (Subscribe) одновременно делает append в тот же массив.

Если при добавлении capasity среза не хватает, то append создаёт новый массив, копирует в него данные из старого массива, добавляет новый элемент и возвращает новый срез. При этом старый массив не модифицируется, поэтому конкурентного доступа к одной и той же памяти не происходит.

Если же при добавлении срезу хватает capacity, то append модифицирует тот же массив, который в данный момент читается другой горутиной, что может привести к неопределённому поведению при итерации - data race. Читать и изменять не атомарно одни и те же данные из разных потоков нельзя.

Самое простое решение - держать мютекс захваченным на время итерации по срезу в методе Publish. Но это существенно снизит производительность, Publish будет блокировать Subscribe и наоборот.

Давайте это поправим иначе:

func (n *Notifier) Subscribe(topic string, handler func(any)) {
	ch := make(chan any)
	
	n.mu.Lock()
	oldChnls := n.topics[topic]

	// copy-on-write to avoid data races with concurrent readers
	newChnls := make([]chan any, len(oldChnls)+1)
	copy(newChnls, oldChnls)
	newChnls[len(oldChnls)] = ch

	n.topics[topic] = newChnls
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()
}

Теперь при добавлении элемента в срез всегда создаётся новый массив, который копируется из старого, и только после этого новый массив присваивается n.topics[topic]. Это гарантирует, что читающие горутины не будут читать массив, который в данный момент модифицируется, поскольку это уже разные массивы. Такой прием называется copy-on-write.

Для начала полноценного использования нашего кода нам необходим метод корректной финализации нашего Notifier, который будет закрывать все каналы и освобождать ресурсы. Добавим метод Close. Метод должен вызываться в конце работы приложения. После его вызова, использовать Notifier уже будет нельзя.

type Notifier struct {
	topics map[string][]chan any
	mu     sync.Mutex
	closed     bool
}

func (n *Notifier) Close() {
	n.mu.Lock()
	defer n.mu.Unlock()

	if n.closed {
		return
	}

	for _, chs := range n.topics {
		for _, ch := range chs {
			close(ch)
		}
	}

	n.topics = nil
	n.closed = true
}

func (n *Notifier) Subscribe(topic string, handler func(any)) error {
	if n.closed {
		return fmt.Errorf("notifier is closed")
	}

	ch := make(chan any)
	
	n.mu.Lock()
	oldChnls := n.topics[topic]

	// copy-on-write to avoid data races with concurrent readers
	newChnls := make([]chan any, len(oldChnls)+1)
	copy(newChnls, oldChnls)
	newChnls[len(oldChnls)] = ch

	n.topics[topic] = newChnls
	n.mu.Unlock()

	go func() {
		for msg := range ch {
			handler(msg)
		}
	}()

	return nil
}

func (n *Notifier) Publish(topic string, message any) error {
	n.mu.Lock()
	if n.closed {
		n.mu.Unlock()
		return fmt.Errorf("notifier is closed")
	}

	chnls := n.topics[topic]
	n.mu.Unlock()

	for _, ch := range chnls {
		ch <- message
	}
	
	return nil
}

В принципе, текущую реализацию уже можно считать неким минимальным MVP, иллюстрирующим использование паттерна Pub/Sub.

Что можно было бы добавить к коду Notifier, чтобы получить из него полноценную библиотеку?

  1. В первую очередь не хватает метода Unsubscribe, который позволит отписаться от топика. Хотя в реальных кейсах отписка используется редко, но для полноценной библиотеки это было бы логично.

  2. Главным недостатком данной реализации является использование одной простейшей стратегии отправки сообщений подписчикам.

     for _, ch := range chnls {
     	ch <- message
     }
    

    При публикации, мы последовательно отправляем сообщение в каждый канал. Поскольку мы используем небуферизированные каналы, нам приходится каждый раз ждать, пока подписчик не прочитает сообщение. Это может привести к задержкам в обработке сообщений, если хэндлер подписчика будет обрабатывать сообщение долго. Универсального способа решения этой проблемы нет. Есть несколько стратегий, у каждой из которых есть свои преимущества и недостатки. Первое и самое эффективное решение - использовать буферизированные каналы. Второе - ограничить время обработки сообщения подписчиком таймаутом. Вариантов много и их можно комбинировать для каждого конкретного случая. Подобные стратегии используются в работе и “взрослых” брокеров сообщений (NATS, Kafka и пр.).

  3. Не будет лишним добавить recovery в функцию хэндлера подписчика. Если хэндлер подписчика упадет с паникой, это приведет к падению горутины, которая читает канал.

Полный код готовой библиотеки, пригодной для полноценного использования доступен здесь.

Комментарии (11)


  1. uvelichitel
    09.04.2026 15:06

    Метод Publish в цикле отправляет сообщения в каналы подписчиков. Если подписчик по каким либо причинам не вычитывает сообщение из канала, то цикл блокируется??

    for _, ch := range chnls {

    // может быть здесь хотя бы select залепить от блокировки

    ch <- message

    }


    1. stoi Автор
      09.04.2026 15:06

      Вы правы. Всё есть тут: https://github.com/istovpets/messaging/blob/master/messaging.go#L237 А в данной статье - нарочито упрощенный учебный пример.


  1. stoi Автор
    09.04.2026 15:06

    Подписчик не может не вычитывать - это часть Notifiyer. Но вот если хэндлер подписчика будет долго тупить - заблокируется. В конце статьи есть об этом. В боевом коде конечно так лучше не делать. И в библиотеке я предусмотрел несколько стратегий на этот счет


  1. kukymbr
    09.04.2026 15:06

    Главная проблема такой реализации, мне кажется, в том, что при любом перезапуске сервиса потеряются все недополученные сообщения. Фикс этой проблемы приведет к изобретению своей версии редиски, кролика и им подобным :)


    1. stoi Автор
      09.04.2026 15:06

      Это не проблема, ИМХО. Это же внутрисервисные сообщения - между компонентами одного приложения. Завершилась работа сервиса, завершились и внутренние сообщения и компоненты уничтожились. Это не замена Кролику )


      1. kukymbr
        09.04.2026 15:06

        так он же может завершиться нештатно? сервис перезапустится, publisher останется доволен, что всё отправил (до перезапуска), а ни один subscriber ничего не получит и потеряется какая-нибудь важная запись в бд например)


        1. stoi Автор
          09.04.2026 15:06

          Ну если нештатно, тогда всё что угодно можно предполагать ). Тут и редиски с кроликами не помогут ))). Безусловно, в описанной тобой ситуации нужно не просто пулять сообщения, а что-то придумывать. Я когда писал либу и статью, ориентировался на System.Messaging в Delphi https://docwiki.embarcadero.com/CodeExamples/Athens/en/System.Messaging_(Delphi). По опыту знаю - очень удобная штука для сложных проектов. Кардинально уменьшает связанность и хаос в коде.


    1. apevzner
      09.04.2026 15:06

      А ещё то, что поздноподписавшиеся могут пропустить что-то интересное, что случилось до их прихода

      В итоге, такая реализация накладывает определенные ограничения на порядок инициализации, вводя неочевидные зависимости.

      В системе с большим количеством компонент это может больно стукнуть в какой-то момент


      1. stoi Автор
        09.04.2026 15:06

        Предполагается, что все подписки инициализируются при старте приложения. Как любые другие зависимости. Вы же не говорите, что, допустим, накладываются ограничения на HTTP сервер из-за того, что БД не инициализирована до его старта? Это не полноценный брокер сообщений для микросервисов и хранить сообщения он не должен, ИМХО. Впрочем, если нужен именно такой сценарий - нужно сочинять что-то другое, не спорю.


        1. apevzner
          09.04.2026 15:06

          Это понятно, да. Но возникают сложности, если, к примеру, подписка меняется динамически.

          Делать очередь - тоже такое себе. Или она должна быть конечной, и тогда в какой-то момент она всё равно должна начать терять. Или бесконечной, но тогда она может выжрать всю память. Или блокировать отправителя, но есть риск навсегда его заблокировать.

          Есть интересный паттерн, я применял его несколько раз в разных проектах.

          Представим себе, что есть некоторое состояние, и смысл потока событий уведомить заинтересованных получателей в изменении этого состояния.

          Если допустить, что нам достаточно, чтобы состояние с точки зрения отправителя и с точки зрения получателей достаточно, чтобы было eventually consistent, то на стороне отправителя можно хранить его представление о состоянии получателей и присылать только обновления.

          Сложность тут растет пропорционально числу подписчиков и нет очереди, которая может переполниться.

          Как частный случай, отправитель может просто уведомлять заинтересованные лица, что состояние изменилось, а они могут перечитывать его целиком. Очередь сообщений тут тоже не нужна, нет разницы, сколько таких уведомлений получит получатель, лишь бы последнее получил.


          1. stoi Автор
            09.04.2026 15:06

            Ну вообще, я использовал раньше в проектах на Delphi подобный Notifiyer для сложных проектов, где один компонент должен уведомлять о чем-то в процессе работы другой компонент. Раньше для этого в Delphi использовали TNotifyEvent и подписывались, передавая хэндлер (коллбэк) - OnEvent. И всё это делало код очень связанным и запутанным. Так написан весь VCL в Delphi. Начиная с Delphi XE5 в нем появилась либа System.Messaging подобная тому, что я описал в статье. Использовать её после TNotifyEvent - просто кайф )). Собственно, глядя на неё и статью писал. И эти библиотеки не подразумевают хранение и передачу истории сообщений, динамическое подключение подписчиков. Это уже, на мой взгляд - другое. Это уже - брокер сообщений полноценный. Это гораздо больше. Как-то так )