Разработчики языка Go подбросили программистам занятную дилемму - в виде фиксированного размера буфера у каналов (тех что используются для передачи сообщений между потоками - точнее "go-рутинами" приложения). На днях на ревью увидел у коллеги код в духе:

  • хотим оповещать "подписчиков" об изменениях в БД, асинхронно

  • используем для этого канал с буфером какой-то вместимости (например, 50)

  • когда вызывается коллбэк от БД, мы создаём информационное сообщение и пытаемся затолкать его в канал

  • если канал оказался заполнен, просто пишем ошибку в лог и нормально завершаем коллбэк

Стоит ли так делать? А какие ещё варианты? Был бы канал безразмерным (как в erlang или некоторые из очередей в java) - программист и не задумался бы (может семантически лучше чтобы отправка в канал удавалась всегда - как запись в массив или мэпу) - но в Go нам предоставили "более полный" контроль над ситуацией, только не сказали что с ним делать :-)

Мне неизвестен ответ в духе "серебрянной пули" - ниже я лишь подытожил варианты которые обычно приходят в голову - и буду рад если вы поделитесь соображениями и опытом (а может и какими-то полезными библиотечками).

Вариант 1 - блокироваться

Предусмотренное в самом Go поведение достаточно естественно - блокироваться на операции записи в канал пока в буфере не появится место. В этом смысле никакой ошибки при записи и не возникает. Код создающий отправляемые события начинает "самотормозиться" - и во многих случаях этого достаточно (особенно если нагрузка пиковая а не постоянная).

Когда это может быть нехорошо?

Ну, понятно - если вдруг у нас вызывающий код прямо очень критичный ко времени выполнения и блокироваться ему не положено. При этом сама логика работы должна подразумевать "отправил и забыл" - впрочем если у канала буфер ненулевого размера то речь именно о такой логике обычно и идёт.

Вариант 2 - долой ограничения

Чтобы не отставать от "других языков" можно наверняка обернуть канал в структурку и сделать его авто-разрастающимся. Скорее всего такие реализации уже есть, стоит только копнуть в гитхабе.

Хорошо ли это?

Смотря из-за чего он "разрастается". Как и выше вскользь говорилось - если это короткий пик нагрузки (ну вот в нашем случае может какая-то мудрёная операция на БД выполнилась) который лишь временно превысил вместимость канала - то вроде бы нормально. Это получается просто канал который сам подстроится под нужный размер не заставляя программиста пытаться "угадать заранее".

Это защитит и от неожиданностей при эволюции приложения. Быть может сейчас мы по тестированию определили что достаточно буфера на 50 сообщений, а год-два спустя из-за усложнения запросов в БД там пики до 150 будут - (но не миллиард и не миллион даже). Ради таких незначительных изменений в константах пересобирать код (заводить тикеты и пр) кажется глупо.

Если же переполнение из-за того что нагрузка постоянно превышает возможности "получателя" на выходе канала, то бишь нам не хватает производительности - то безразмерный канал будет просто "маскировать" проблему - вполне возможно что из-за этого приложение "бумкнет" не на тестовом стенде а в проде :)

Вариант 3 - внешняя очередь

В последнем случае - когда мы чувствуем что нам пригодился бы "безразмерный" канал, но подозреваем что проблемы могут возникать из-за производительности консьюмера, а не из-за "пиков" - на мысль обычно должна прийти реализация с внешней очередью. Всё-таки каналы, работающие "внутри инстанса" это вещь с ограниченной применимостью. Внешние очереди - другое дело:

  • обычно у нас есть возможность масштабировать количество консьюмеров и это даёт большую гибкость

  • плюс существует немало готовых способов мониторить состояние большинства популярных мессадж-брокеров и получать ворнинги и алармы централизованно не когда "оно уже бумкнуло" а когда в очередях скапливается подозрительно много сообщений

  • можно даже второе с первым связать, запускать консьюмеры в "лямбдах" и т.п. (эластичное решение - если у нас есть эластичное облако)

Тут основными противопоказаниями является разве что некоторое усложнение системы (вот уж ошибки записи во внешнюю очередь всегда могут произойти, проверять их необходимо и т.п.) - ну и зоопарк этих самых очередей и даже "баталии" разработчиков (ортодоксы с кафкой и рэббитом против сектантов с натсом и бинстолкд, шутка).

В целом этот вариант "корректнее" ложится на "микросервисную" идею - а встроенные каналы можно использовать лишь для каких-то самых легковесных операций.

Вариант 4 - фоллбэк с синхронным вызовом

В некоторых случаях (в частности вот этот с нотификациями по операциям в БД) предлагается рассмотреть использование "фоллбэка" - если не удалось отправить сообщение через канал, не дёрнуть ли нам "консьюмера" напрямую?

Ведь для чего в данном случае канал нужен - чтобы не тормозить коллбэк вызванный БД при совершении операции. Но если канал заполнен - ну чёрт с ним, пусть попробует выполниться "напрямую" - протормозит так и ладно - но всё же операция выполняется.

Это похоже на то как сверхпроводящий кабель заключают в "изоляцию" из обычной меди например. Если температура повышается и сверхпроводимость исчезает - ток продолжает идти по медной оболочке, хотя и с потерями - но это лучше чем резко разрывать цепь.

Правда при ближайшем рассмотрении этот вариант не сильно отличается от блокирования на операции записи. Только мы нарушаем порядок эвентов (если это важно). Да и логика консьюмера должна позволять такой "вызов извне" (нужно соблюсти в нём потокобезопасность и пр).

Зачем такой вариант может быть нужен? Мне трудно придумать юзкейс :) может вы знаете ситуацию с каким-то особенным консьюмером который изредка (на каких-то типах сообщений?) сильно тормозит... Что-то странное в общем.

Некоторый профит "на уровне языка" в том что мы при этом точно можем писать ворнинг в лог или сигнализировать другим способом что наблюдается нештатная ситуация. При простой блокировке на записи нам это по-простому не сделать.

Вариант 5 - с обратной связью

Мы можем занять "проактивную" позицию и проверять что буфер полон (или лучше - полон наполовину) - и если так, то не просто сыпать сообщения в лог - а принимать какие-то меры - например, сигнализировать вызывающему коду чтобы поумерил свои аппетиты. Послать сигнал "горшочек, не вари!"

Очевидно этот подход сильно зависит от задачи. В случае с нотификациями по операциям в БД - куда мы пойдём? Попросим "писать пореже"? Включим задержки в пользовательском интерфейсе? :) Наверняка область применения найдётся - наиболее явно если это какая-то балк-обработка (но ей не так часто нужны мессаджи-каналы-очереди).

Заключение

Как всегда приходим к выводу что асинхронная обработка - дело деликатное, и прежде чем написать в коде обработку ситуации тем или иным способом стоит подумать, какого поведения мы хотим достигнуть.

Варианты с мониторингом состояния канала и оповещениями или даже "саморегуляцией" кажутся наиболее прогрессивными, но требуют дополнительных трудозатрат. Известны ли вам какие-то готовые обёртки подобного рода над каналами в Go?

С другой стороны - а если это очередь для отправки алармов по проблемам при выполнении. ошибкам и т.п. - то куда же мы будем отсылать ошибки о том что очередь ошибок переполнилась? :-)

Комментарии (7)


  1. DimNS
    14.01.2025 05:54

    А как такой вариант: код запускает горутину отправки в канал и блокируется уже горутина, тем самым мы не блокируем основной поток, из минусов можно наплодить слишком много горутин (но если предположить что это временный пик и очередь постоянно вычитывается, то вроде и нет в этом проблемы)


    1. mrobespierre
      14.01.2025 05:54

      Горутина это вам не тред ОС. Если много тредов - это где-то 10000, то 1000000 горутин это ещё не много. Кроме того, можно же запилить token bucket и не запускать больше n. А вообще, конечно этот вариант гораздо адекватнее всего того, что в статье.


  1. yellow79
    14.01.2025 05:54

    Тут возможны, как мне кажется, всего два варианта. Либо сообщения из канала вам важны и вы блокируетесь на записи, либо сообщения не важны и вы их выкидываете при заполненном буфере.

    Варианты с временным "расширением" вместимости канала кажутся странными и как минимум это переусложнение на ровном месте, можно просто сделать канал больше, но это путь в никуда. Можно "выкидывать" данные в другой канал, заполнение которого будет явно свидетельствовать о проблеме и тут уже можно будет "бить в колокол".

    Размер канала в 50 кажется странным решением. Зачем вам ещё одна очередь? Что станет с данными в канале, если сервис вдруг "упадёт"? Вероятно стоит поднять количество обработчиков, если очерёдность не важна, ну и канал с нулевым буфером тоже напрашивается сам собой. Ну и конечно же добавить метрику по тому, на какой промежуток времени блокируетесь при записи в канал, либо сколько выкидываете данных и после на данную метрику повесить алерты.


  1. Sabirman
    14.01.2025 05:54

    Похоже, вы используете каналы go не по назначению. Если вам нужна очередь, то лучше использовать нормальную очередь, например, RabbitMQ, у которого есть нормальная админка и он управляем и расширяем. А каналы в go - это замена межпотокового взаимодействия и это довольно низкоуровневый механизм. Поэтому (имхо) в каналах go можно только блокироваться.


  1. Sanchous98
    14.01.2025 05:54

    Не стоит использовать каналы как очереди. Их конечно можно использовать так, но это плохая идея. Каналы - это в первую очередь способ синхронизации горутин с возможностью обмена сообщениями, и размер буфера канала должен определяться не логикой функции(нужно ли вам отправить и забыть или заблокироваться), а нагрузкой на этот канал(вы должны четко понимать, сколько у вас будет чтений из канала и сколько записей и исходя из этого подобрать оптимальный размер буфера). А вы при помощи канала пытаетесь решить задачу, которую решают очереди. В таком случае лучше обернуть слайс в структуру и защитить ее sync.Cond, таким образом получив подобие каналов, которые с одной стороны блокируют поток, если буфер пустой, расширяют буфер если его недостаточно и обеспечивают потокобезопасность. Или использовать службы очередей вроде Kafka или RabbitMQ


  1. Revertis
    14.01.2025 05:54

    Так если через канал пересылаются сообщения о том, что что-то в базе изменилось, то закономерно предположить, что в заполненном канале такие сообщения уже есть. И консюмер всё равно обработает ситуацию "база обновилась" когда прочитает такое сообщение из канала. Значит можно спокойно дропать дублирующие сообщения.


  1. evgeniy_kudinov
    14.01.2025 05:54

    Выше уже озвучили мнения, и я дополню.
    Если рассматривать каналы как один из вариантов Inter process communication (IPC), станет более понятно, зачем каналы нужны.

    В описании модели памяти Go:

    Программы, которые изменяют данные, к которым одновременно обращаются несколько горутин, должны сериализовать такой доступ. Чтобы упорядочить доступ, защитите данные с помощью операций с каналами или других примитивов синхронизации, таких как в пакетах sync и sync/atomic.

    P. S. Кажется, что использовать каналы в Go как очередь можно и используют, но надо понимать при этом про ограничения и риски. В вашем случае можно попробовать применить паттерн Fanout с автоскейлингом.