Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool
Паттерны конкурентности в Go. Подробный разбор. Часть 2. Fan-Out/Fan-In
Pub/Sub
Pub/sub, сокращение от «publish-subscribe» (публикация-подписка), — это паттерн обмена сообщениями между различными частями приложения. Прелесть такого подхода в том, что отправитель сообщения не знает о получателе, а получатель не знает об отправителе. Оба они знают только об одном посреднике — брокере сообщений. Отправитель публикует сообщение, а получатель подписывается на получение сообщений. В итоге это кардинально уменьшает связанность между частями приложения и неминуемую головную боль ею вызванную.
Пример:
В приложении нужно по расписанию выполнять некие действия - например рекламные рассылки или какое-нибудь закрытие дня раз в сутки. Вы реализуете сервис плнировщика задач, при старте задаете ему расписание, по которому он публикует некие уведомления. А потом в любом месте приложения просто подписываетесь на выбранные уведомления и выполняете нужные действия при их получении. Это просто, это удобно, это гибко, это легко масштабировать. Это Pub/Sub.
В Go паттерн Pub/Sub можно реализовать несколькими способами. Один из самых простых и эффективных — использование каналов. Каналы в Go являются потокобезопасными и позволяют передавать данные между горутинами максимально простым способом. Собственно, известные брокеры сообщений (RabbitMQ, Kafka и т.д.) - это предельная реализация паттерна Pub/Sub для обмена между приложениями (микросервисами). Мы же в данной статье рассмотрим, как реализовать Pub/Sub внутри приложения с использованием каналов.
Реализация
Как правило, сообщение издателя имеют два основных поля: тема (topic) и сообщение (message). Подписчики могут подписываться на определенные темы и получать все сообщения, опубликованные в этих темах. В нашем примере структура сообщения представлены типом topics map[string][]chan any. Мапа в данном случае идеально подходит. Ключом мапы является тема, а значением - срез каналов (подписчиков), по которым будем рассылать произвольные данные. В момент подписки для нового подписчика создается канал, который добавляется в срез каналов для соответствующей темы.
Notifier
Начнем с минимальной струтуры брокера сообщений:
type Notifier struct { topics map[string][]chan any } func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.topics[topic] = append(n.topics[topic], ch) go func() { for msg := range ch { handler(msg) } }() } func (n *Notifier) Publish(topic string, message any) { for _, ch := range n.topics[topic] { ch <- message } }
При подписке мы передаем топик, на который подписываемся и функцию-хэндлер, которая будет вызвана при получении сообщения. У функций есть параметр any, позволяющий передавать произвольные данные сообщения. Внутри метода создается канал, который добавляется в срез каналов для соответствующей темы. Если тема еще не существует, она будет создана мапой автоматически. Затем запускается горутина, которая будет ждать сообщения из канала и вызывать обработчик для каждого сообщения. Горутина завершится, когда канал будет закрыт.
При публикации, мы сначала получаем срез подписчиков (каналов) для темы, а затем отправляем сообщение во все каналы.
Что тут не так, чего не хватает? Ну как минимум - нет инициализации мапы. После объявления карты ее значение по умолчанию равно nil. Чтение из такой карты возвращает пустой результат, но попытка записи в такую карту вызывает ошибку времени выполнения. Добавим:
func NewNotifier() *Notifier { return &Notifier{ topics: make(map[string][]chan any), } }
Далее, в коде есть доступ к общему ресурсу из нескольких горутин без синхронизации, что приводит к data race и непредсказуемому поведению. Общий ресурс в данном случае - это мапа topics.
Добавим мьютекс:
type Notifier struct { topics map[string][]chan any mu sync.Mutex } func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.mu.Lock() n.topics[topic] = append(n.topics[topic], ch) n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() } func (n *Notifier) Publish(topic string, message any) { n.mu.Lock() chnls := n.topics[topic] n.mu.Unlock() for _, ch := range chnls { ch <- message } }
Выглядит неплохо на первый взляд. При подписке мы захватываем мьютекс, добавляем канал в мапу, и сразу же отпускаем. При публикации мы перед получаением среза подписчиков (каналов) для темы опять захватываем мьютекс, после получения среза - отпускаем. Пока мьютекс захвачен, код может выполняться только одной горутиной. Таким образом, наш код гарантирует, что мапа не будет одновременно читаться и записываться разными горутинами. Будто бы мы сделали всё правильно. И тем не менее, в этом коде зарыта потенциальная проблема. Чтобы понять, в чем она состоит, сначала вспомним, как устроены срезы в Go:
type slice struct { ptr *array len int cap int }
Когда выполняется chnls := n.topics[topic] копируется заголовок среза, но не сам массив. Это значит, что chnls и n.topics[topic] указывают на один и тот же массив. Что тут может пойти не так? Пример:
Горутина A (Publish) читает chnls в цикле.
Горутина B (Subscribe) одновременно делает append в тот же массив.
Если при добавлении capasity среза не хватает, то append создаёт новый массив, копирует в него данные из старого массива, добавляет новый элемент и возвращает новый срез. При этом старый массив не модифицируется, поэтому конкурентного доступа к одной и той же памяти не происходит.
Если же при добавлении срезу хватает capacity, то append модифицирует тот же массив, который в данный момент читается другой горутиной, что может привести к неопределённому поведению при итерации - data race. Читать и изменять не атомарно одни и те же данные из разных потоков нельзя.
Самое простое решение - держать мютекс захваченным на время итерации по срезу в методе Publish. Но это существенно снизит производительность, Publish будет блокировать Subscribe и наоборот.
Давайте это поправим иначе:
func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.mu.Lock() oldChnls := n.topics[topic] // copy-on-write to avoid data races with concurrent readers newChnls := make([]chan any, len(oldChnls)+1) copy(newChnls, oldChnls) newChnls[len(oldChnls)] = ch n.topics[topic] = newChnls n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() }
Теперь при добавлении элемента в срез всегда создаётся новый массив, который копируется из старого, и только после этого новый массив присваивается n.topics[topic]. Это гарантирует, что читающие горутины не будут читать массив, который в данный момент модифицируется, поскольку это уже разные массивы. Такой прием называется copy-on-write.
Для начала полноценного использования нашего кода нам необходим метод корректной финализации нашего Notifier, который будет закрывать все каналы и освобождать ресурсы. Добавим метод Close. Метод должен вызываться в конце работы приложения. После его вызова, использовать Notifier уже будет нельзя.
type Notifier struct { topics map[string][]chan any mu sync.Mutex closed bool } func (n *Notifier) Close() { n.mu.Lock() defer n.mu.Unlock() if n.closed { return } for _, chs := range n.topics { for _, ch := range chs { close(ch) } } n.topics = nil n.closed = true } func (n *Notifier) Subscribe(topic string, handler func(any)) error { if n.closed { return fmt.Errorf("notifier is closed") } ch := make(chan any) n.mu.Lock() oldChnls := n.topics[topic] // copy-on-write to avoid data races with concurrent readers newChnls := make([]chan any, len(oldChnls)+1) copy(newChnls, oldChnls) newChnls[len(oldChnls)] = ch n.topics[topic] = newChnls n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() return nil } func (n *Notifier) Publish(topic string, message any) error { n.mu.Lock() if n.closed { n.mu.Unlock() return fmt.Errorf("notifier is closed") } chnls := n.topics[topic] n.mu.Unlock() for _, ch := range chnls { ch <- message } return nil }
В принципе, текущую реализацию уже можно считать неким минимальным MVP, иллюстрирующим использование паттерна Pub/Sub.
Что можно было бы добавить к коду Notifier, чтобы получить из него полноценную библиотеку?
В первую очередь не хватает метода
Unsubscribe, который позволит отписаться от топика. Хотя в реальных кейсах отписка используется редко, но для полноценной библиотеки это было бы логично.-
Главным недостатком данной реализации является использование одной простейшей стратегии отправки сообщений подписчикам.
for _, ch := range chnls { ch <- message }При публикации, мы последовательно отправляем сообщение в каждый канал. Поскольку мы используем небуферизированные каналы, нам приходится каждый раз ждать, пока подписчик не прочитает сообщение. Это может привести к задержкам в обработке сообщений, если хэндлер подписчика будет обрабатывать сообщение долго. Универсального способа решения этой проблемы нет. Есть несколько стратегий, у каждой из которых есть свои преимущества и недостатки. Первое и самое эффективное решение - использовать буферизированные каналы. Второе - ограничить время обработки сообщения подписчиком таймаутом. Вариантов много и их можно комбинировать для каждого конкретного случая. Подобные стратегии используются в работе и “взрослых” брокеров сообщений (NATS, Kafka и пр.).
Не будет лишним добавить
recoveryв функцию хэндлера подписчика. Если хэндлер подписчика упадет с паникой, это приведет к падению горутины, которая читает канал.
Полный код готовой библиотеки, пригодной для полноценного использования доступен здесь.
Комментарии (11)

stoi Автор
09.04.2026 15:06Подписчик не может не вычитывать - это часть Notifiyer. Но вот если хэндлер подписчика будет долго тупить - заблокируется. В конце статьи есть об этом. В боевом коде конечно так лучше не делать. И в библиотеке я предусмотрел несколько стратегий на этот счет

kukymbr
09.04.2026 15:06Главная проблема такой реализации, мне кажется, в том, что при любом перезапуске сервиса потеряются все недополученные сообщения. Фикс этой проблемы приведет к изобретению своей версии редиски, кролика и им подобным :)

stoi Автор
09.04.2026 15:06Это не проблема, ИМХО. Это же внутрисервисные сообщения - между компонентами одного приложения. Завершилась работа сервиса, завершились и внутренние сообщения и компоненты уничтожились. Это не замена Кролику )

kukymbr
09.04.2026 15:06так он же может завершиться нештатно? сервис перезапустится, publisher останется доволен, что всё отправил (до перезапуска), а ни один subscriber ничего не получит и потеряется какая-нибудь важная запись в бд например)

stoi Автор
09.04.2026 15:06Ну если нештатно, тогда всё что угодно можно предполагать ). Тут и редиски с кроликами не помогут ))). Безусловно, в описанной тобой ситуации нужно не просто пулять сообщения, а что-то придумывать. Я когда писал либу и статью, ориентировался на System.Messaging в Delphi https://docwiki.embarcadero.com/CodeExamples/Athens/en/System.Messaging_(Delphi). По опыту знаю - очень удобная штука для сложных проектов. Кардинально уменьшает связанность и хаос в коде.

apevzner
09.04.2026 15:06А ещё то, что поздноподписавшиеся могут пропустить что-то интересное, что случилось до их прихода
В итоге, такая реализация накладывает определенные ограничения на порядок инициализации, вводя неочевидные зависимости.
В системе с большим количеством компонент это может больно стукнуть в какой-то момент

stoi Автор
09.04.2026 15:06Предполагается, что все подписки инициализируются при старте приложения. Как любые другие зависимости. Вы же не говорите, что, допустим, накладываются ограничения на HTTP сервер из-за того, что БД не инициализирована до его старта? Это не полноценный брокер сообщений для микросервисов и хранить сообщения он не должен, ИМХО. Впрочем, если нужен именно такой сценарий - нужно сочинять что-то другое, не спорю.

apevzner
09.04.2026 15:06Это понятно, да. Но возникают сложности, если, к примеру, подписка меняется динамически.
Делать очередь - тоже такое себе. Или она должна быть конечной, и тогда в какой-то момент она всё равно должна начать терять. Или бесконечной, но тогда она может выжрать всю память. Или блокировать отправителя, но есть риск навсегда его заблокировать.
Есть интересный паттерн, я применял его несколько раз в разных проектах.
Представим себе, что есть некоторое состояние, и смысл потока событий уведомить заинтересованных получателей в изменении этого состояния.
Если допустить, что нам достаточно, чтобы состояние с точки зрения отправителя и с точки зрения получателей достаточно, чтобы было eventually consistent, то на стороне отправителя можно хранить его представление о состоянии получателей и присылать только обновления.
Сложность тут растет пропорционально числу подписчиков и нет очереди, которая может переполниться.
Как частный случай, отправитель может просто уведомлять заинтересованные лица, что состояние изменилось, а они могут перечитывать его целиком. Очередь сообщений тут тоже не нужна, нет разницы, сколько таких уведомлений получит получатель, лишь бы последнее получил.

stoi Автор
09.04.2026 15:06Ну вообще, я использовал раньше в проектах на Delphi подобный Notifiyer для сложных проектов, где один компонент должен уведомлять о чем-то в процессе работы другой компонент. Раньше для этого в Delphi использовали TNotifyEvent и подписывались, передавая хэндлер (коллбэк) - OnEvent. И всё это делало код очень связанным и запутанным. Так написан весь VCL в Delphi. Начиная с Delphi XE5 в нем появилась либа System.Messaging подобная тому, что я описал в статье. Использовать её после TNotifyEvent - просто кайф )). Собственно, глядя на неё и статью писал. И эти библиотеки не подразумевают хранение и передачу истории сообщений, динамическое подключение подписчиков. Это уже, на мой взгляд - другое. Это уже - брокер сообщений полноценный. Это гораздо больше. Как-то так )
uvelichitel
Метод
Publishв цикле отправляет сообщения в каналы подписчиков. Если подписчик по каким либо причинам не вычитывает сообщение из канала, то цикл блокируется??for _, ch := range chnls {// может быть здесь хотя бы select залепить от блокировкиch <- message}stoi Автор
Вы правы. Всё есть тут: https://github.com/istovpets/messaging/blob/master/messaging.go#L237 А в данной статье - нарочито упрощенный учебный пример.