Привет, Хабр! Я Артем Чаадаев, Golang-разработчик в МТС Digital. Традиционно считается, что главная фича Go – это «многопоточность из коробки». Этому посвящено немало статей, авторы которых показывают, какие есть примитивы конкурентности в рамках языка. В процессе изучения этих статей я понял, что на начальном этапе изучения языка мне тяжело начать применять конкурентный код не имея каких-то простых наглядных примеров. 

Поэтому я собрал в одной статье наиболее распространенные примеры использования конкурентного кода в Go на основе личного опыта.

Примитивы конкурентности

Основной фокус статьи – не на разборе примитивов, а на их использовании. Поэтому рекомендую перед прочтением ознакомиться со следующими примитивами конкурентности в Go:

  • горутины;

  • каналы;

  • мьютексы (объекты Mutex, RWMutex);

  • оператор select … case;

  • объекты waitGroup, errGroup.

Во многих статьях показано, как работать с вышеописанными примитивами, но как их применить на практике? Разберем пять конкретных кейсов:

Кейс 1 – Простой ограничитель скорости выполнения

Описание проблемы

Допустим, у нас есть сервис отправки сообщений по e-mail, это весьма распространенная корпоративная задача, примеры подобных сервисов есть и у нас в МТС. Стоит отметить, что в этом примере показано упрощенное решение проблемы. 

Мы не хотим перегружать сторонний SMTP-сервер параллельными запросами на отправку. Как минимум, можно использовать последовательное чтение сообщений для рассылки из брокера. Но у нас есть ограничение на скорость отправки. Скажем, не более какого-то количества запросов в период времени. Например, в секунду. Простейшим решением данного кейса был было бы использование ограничителя скорости выполнения.

Решение

В нашем случае решение будет выглядеть так, как показано в функции HandleMessages() по этой ссылке.

В конфиге можно указать период в микросекундах, относительно которого считается количество запросов. Если же в конфиге указан период – мы создадим экземпляр обработчика, у которого задействован тип time.Ticker. Период тиков мы как раз и берем из конфига. Подробнее о нем прочитать можно здесь. По тику считываются сообщения из очереди в количестве не больше того, сколько мы можем отправить за период.

Можно, конечно, по-разному организовать обработку сообщений, используя пулы обработчиков, комбинируя несколько очередей в зависимости от требований задач. Примером может быть работа с очередями с неудавшейся отправкой сообщений параллельно с обработкой текущих сообщений. Но в этой статье показан простейший пример работы с одной очередью.

Ключевые моменты

  • код заблокируется, пока канал пуст. Выполнение продолжится, когда в канал ticker.C будет передано значение при последующем тике, то есть не ранее, чем после указанного значения в микросекундах;

  • при получении значения достается сообщение из очереди (если, конечно, оно там есть) и передается SMTP-серверу, если количество запросов в период не превысило максимального значения. После отправки счетчик инкрементируется;

  • то, что происходит в первом и втором пунктах, повторяется каждый раз, когда появляется сообщение в очереди.

Кейс 2 – Пул обработчиков

Описание проблемы

Еще одна задача, которую приходилось решать в рамках работы в МТС Digital. Допустим, у нас есть большой json-документ и есть ресурсы, чтобы считать его полностью. Но для последующей обработки нам нужно получить данные из стороннего сервиса. К сожалению, название сервиса я упомянуть не могу, но работа с ним осложнена медленной обработкой единичных записей в последовательном режиме и при отправке всех записей из файла разом.

Решение проблемы

Поможет паттерн worker-pool, так называемый пул обработчиков. У нас есть фиксированное количество горутин, каждая из которых читает последовательно передаваемые записи из одного канала. Каждый обработчик в конкурентном режиме отправляет запрос к данным в сторонний сервис. К счастью, сторонний сервис позволяет организовать такую обработку. Мы получили весомый выигрыш по времени. К сожалению, результаты замеров по этому кейсу у меня не сохранились, так как дело было давно. Тем не менее, вот здесь показан упрощенный пример реализации пула обработчиков:

Рекомендую посмотреть и такой пример оформления пула обработчиков, где лучше прослеживается паттерн:

В этом кейсе лаконичнее организовать обработку так, как показано в моем примере выше.

Ключевые моменты

  • создаем каналы: для подачи данных на вход обработчику и для сбора выходных значений. Выходные значения представлены структурой в виде полей: для обработанных данных и для ошибок. Это сделано для того, чтобы иметь возможность обработать все ошибки. Если это не критично, можно и передавать только структуру только с результатом, а ошибки просто логгировать. Также создаем объект sync.WaitGroup для ожидания завершения работы всех обработчиков;

  • передаем в отдельной горутине данные на вход пула обработчиков через inputCh. Как только все данные будут переданы – канал inputCh закрывается, чтобы оповестить все обработчики, что данных больше не будет;

  • сам пул представлен в виде горутины, в которой  при проходе циклом по количеству обработчиков выполняем функцию processUsers(), она же и является обработчиком в данном кейсе;

  • получение значений обработчиками происходит до тех пор, пока не закроется канал inputCh, после этого обработчик выполнит wg.Done(), что означает, что нужно ожидать завершения выполнения на одного обработчика меньше;

  • каждый обработчик после изменения данных о пользователе кладет результат в outputCh, результаты из которого собираются в слайс в основной горутине. Сбор завершится тогда, когда пул обработчиков закроет канал outputCh. А он закроется, когда в горутине с пулом обработчиков разблокируется wg.Wait() после завершения работы каждого обработчика;

  • В этом примере на выходной канал подается структура с ошибкой и результатом. Это сделано для того, чтобы была возможность обработать ошибки позднее. Например, пул может вызываться в функции, которая должна вернуть обработанные данные и error .

Не обязательно делать пул обработчиков с помощью объекта WaitGroup. Можно использовать паттерн «Семафор». Но это немного сложнее, подробнее можно прочитать здесь.

Кейс 3 – Состояние гонки

Описание проблемы

Рассмотрим еще одна задачу. Допустим, что мы синхронизируем данные пользователей в стороннем API, но для обновления данных нужны id на другие сущности. В частности лимиты, которые нужно назначить пользователю в зависимости от бизнес-логики. Данные по этим лимитам, а именно их id, нужно получить из того же стороннего API. Данные эти весьма статичны, по ходу работы приложения они вряд ли поменяются, а запрос на их получение даст дополнительные временные затраты. Это может сыграть свою роль, если по лимитам надо отправить десятки и сотни запросов. 

Поэтому в данной ситуации их можно как-нибудь закешировать. Для начала используем самый примитивный вариант: в рамках сервиса создадим мапу, которая хранит лимиты по всем кабинетам.

При работе с мапой в конкурентном режиме, в частности, при записи, возникнет состояние гонки. Это состояние, когда несколько потоков пытаются получить и обновить одни и те же данные, в данном случае это мапа с лимитами по всем кабинетам. Как известно, мапа является потоконебезопасной, подробнее о причинах этого можно прочитать в других статьях, например здесь.

Решение проблемы

Чтобы предотвратить состояние гонки, нужно заблокировать общие данные так, чтобы только одна горутина имела возможность обновлять данные в данный момент времени, а при чтении горутинами ни одна не могла ничего записать в мапу в текущий момент.

Для решения используется объект RWMutex, который при записи блокирует код для остальных горутин на время выполнения операций, затем в конце функции (с помощью defer) разблокирует. При чтении он блокирует только то место, где происходит запись.

Пример вот здесь.

Ключевые моменты

  • внутри функции RefillLimitsData() мы достаем данные по лимитам на все кабинеты в конкурентном режиме в нескольких горутинах. Поскольку кабинетов не предполагается больше пары десятков – можно это делать в горутинах по количеству кабинетов;

  • данное действие подразумевает сохранение данных в общую мапу в таком же конкурентном режиме. При записи в мапу в конкурентном режиме мы получим панику concurrent map writes;

  • проще всего защитить данный код с помощью мьютекса. А точнее, с помощью объекта RWMutex. Мьютекс подразумевает блокировку кода от Lock() до Unlock() для других горутин, выполняющих этот код в общем случае. Однако RWMutex позволяет блокировать и разблокировать участок кода для всех горутин, если вызываются функции Lock() и Unlock(). Если же вызываются функции RLock() и RUnlock(), то код блокируется для пишущих горутин, но не для читающих. Соответственно, в данном случае в RefillLimitsData() будут вызваны функции Lock() и Unlock() при записи в мапу;

  • Если же есть какие-то горутины, которые читают из этой мапы, то там следует вызывать функции RLock() и RUnlock(), чтобы все горутины могли прочитать в любой момент времени эту мапу.

Обратите внимание: бывают ситуации, когда нужно получить данные, применить какую-то логику, в зависимости от полученных данных, и положить обратно. В этой ситуации между Lock() и Unlock() нужно заносить весь процесс работы с мапой при выполнении сценария записи данных (здесь под записью подразумевается и чтение с применением какой-либо логики). Если такой логики немало – не забывайте положить Unlock() в defer, чтобы не потерять.

Хочу уточнить, что в рамках этой статьи я хотел бы показать работу с мьютексом, однако для данного кейса можно предложить и решение без мьютексов, с помощью каналов. В этом случае можно сделать worker pool, который принимает кабинеты, заполняет лимиты в виде слайса, а потом на выходе значения кладутся в мапу. Более того, в effective Go есть рекомендация Do not communicate by sharing memory; instead, share memory by communicating. Однако если в горутине происходит ситуация чтения из кэшированных данных, а затем применяется какая-то логика относительно них и только после этого запись, лучше все же использовать мьютекс, чтобы не попасть в ситуацию, когда решение для записи принято на основе тех данных, что уже поменялись к моменту записи. В этом случае все другие горутины прочитают уже только измененные данные для применения своей логики.

Кейс 4 – Ограничение времени выполнения какого-либо процесса

Описание проблемы

В отличие от предыдущих кейсов, этот основан на примере из англоязычной статьи, о которой я упоминал во вступлении. Допустим, есть какой-то сервис обработки видео, с ограничением времени просмотра – 10 секунд на все просмотры со стороны обычного пользователя, либо на каждый запрос. Для премиум-пользователя просмотр не ограничен. 

К сожалению, я не имею опыта работы с видео в Golang, однако этот кейс мне интересен с точки зрения работы с конкурентностью. Если немного абстрагироваться, то задача состоит в том, чтобы ограничить выполнение какого-то процесса больше, чем на заданное время. То есть если прошел лимит времени для, например, обычного пользователя, то процесс обрывается. 

Есть два пути решения данного кейса:

  • ограничение на запросы со стороны пользователя. То есть каждый наш абстрактный процесс может выполняться не более 10 секунд при запросе пользователем;

  • ограничение всех запросов пользователя, то есть суммарное время всех абстрактных процессов ограничено 10 секундами.

Решение 1 – Десять секунд на каждый запрос пользователя

В первом случае рассмотрим ограничение на каждый запрос со стороны пользователя. Это значит, что пользователь, у которого нет премиум-аккаунта, не может производить какое-то действие больше 10 секунд. Поэтому код завершит обработку какого-то действия через 10 секунд. Но при этом само действие пользователь может инициировать неограниченное количество раз.

Пример показан по ссылке.

Ключевые моменты решения 1

  • здесь у нас есть абстрактный обработчик HandleRequest(), которому передается какой-то длительный процесс process(). Эта функция выполняется в отдельной горутине. По завершении процесса отправляется значение в сигнальный канал done, чтобы оповестить об успешном завершении процесса. Успешное завершение означает, что процесс не нужно останавливать;

  • в основной горутине работает оператор select, оба case из которого будут заблокированы, пока либо не пройдет процесс, либо не пройдет бесплатный период. Второй case подразумевает чтение из канала, возвращаемого функцией time.After(). Туда пишется значение через заданное количество времени. Если срабатывает этот case, включается проверка на премиум-пользователя и соответственно функция вернет false, если пользователь обычный.

Обратите внимание: можно также ограничивать длительность процесса с помощью чтения из канала, возвращаемого функцией context.Done() внутри оператора select. В этом случае нужно создать контекст с тайм-аутом с помощью функции context.WithTimeout().

Решение 2 – Десять секунд на все запросы пользователя

Во втором варианте решения ограничение на обработку ограничено десятью секундами суммарно на все запросы одного пользователя. То есть при каждом последующем запросе мы отнимаем потраченное время с предыдущего запроса. Как только пользователь превышает это время, мы не можем дальше проводить обработку любых запросов.

Пример показан по ссылке.

Ключевые моменты решения 2

  • здесь у нас есть какой-то абстрактный обработчик HandleRequest(), которой передается какой-то длительный процесс process(). Эта функция выполняется в отдельной горутине. По завершении процесса отправляется значение в сигнальный канал done, чтобы оповестить об успешном завершении процесса. Успешное завершение значит, что процесс не нужно останавливать;

  • вместо time.After()time.Tick(), которая каждую секунду отправляет в канал, возвращаемый этой функцией, значение. Каждый раз при чтении значения на выходе этой функции происходит инкрементирование времени, которое есть у пользователя, а также проверка, не истекло ли общее время просмотра у пользователя. Если истекло, то работа функции завершается с результатом false;

  • Чтобы правильно учитывать потраченное пользователем время, используется объект sync.Mutex. Мьютекс блокирует доступ к счетчику секунд пользователя, чтобы только один процесс обработки имел одновременный доступ нему. Иначе может возникнуть ситуация, когда несколько обработчиков запроса получили одну и ту же копию счетчика и каждый из них будет пытаться инкрементировать старое значение не зная о том, какое является текущим.

Кейс 5 – Изящное завершение процесса по сигналу ОС, так называемый Graceful shutdown

Описание проблемы

В кейсе есть какой-то неопределенный процесс, блокирующий горутину main и единственный способ остановки программы – послать сигнал ОС.

Но, прежде чем завершить работу приложения, нужно завершить все процессы. Например, сервер, получивший команду на завершение, не принимает больше соединений и ждет завершение обработки запросов. К сожалению, сам продукт (МТС Compliance Hub, там это используется в бэкенде), где мы это реализовали, подпадает под NDA, поэтому в решении воспроизведу аналогичный кейс на своем примере. К слову, это самый распространенный из перечисленных кейсов в разработке микросервисов.

Цель изящного завершения – бесшовная выкатка. Нужно дать возможность завершить недообработанные соединения, но не начинать новые в момент завершения.

Решение проблемы

Рассмотрим пример по ссылке.

Ключевые моменты кейса

  • на базе пустого контекста context.Background() с помощью функции NotifyContext() создается дочерний контекст, который завершается по сигналу ОС. Список ожидаемых сигналов передается параметром;

  • для работы различных процессов создается объект errgroup, который работает аналогично waitGroup: мы ждем завершения всех горутин (rungroup.Go()) в рамках этого объекта. В данном случае мы ждем завершения горутины с сервером и горутины, где выполняется srv.Shutdown();

  • в момент работы сервера обе горутины блокируются. Как только одна из горутин получит завершенный по сигналу ОС контекст, она вызовет функцию завершения сервера. После этого разблокируется горутина с srv.ListenAndServe() и выполнение программы завершится.

Заключение

Конкурентность позволяет увеличить скорость обработки данных при наличии ресурсов, если выполнять те же задачи, которые можно выполнять последовательно. В этом можно убедиться, если посмотреть кейс с пулом обработчиков. Этот кейс позволил на порядок увеличить скорость обработки данных. 

Есть также ряд кейсов, где без конкурентного программирования не обойтись. Например, изящное завершение по сигналу ОС, когда у нас запущен какой-то сервер. Важно также понимать, что при конкурентной обработке может возникнуть состояние гонки за общими ресурсами. Это нужно предусматривать, если есть какой-то обмен общими данными. 

Также рассмотрен пример, когда нужно ограничить по времени какой-либо процесс обработки пользователем. Разбор этих решений даст представление о том, как именно пользоваться примитивами конкурентности при решении практических задач. И я уверен, что вкупе с прочтением статей о работе примитивов конкурентности, это поможет в дальнейшем принимать решения и в других возможных кейсах.

Не поленитесь пройти по ссылкам в статье, любой дополнительный материал – это плюс в освоении конкурентного программирования.

Надеюсь, статья помогла вам. Вопросы, мнения, ваши примеры на тему конкурентности в Go я готов обсудить в комментариях. Спасибо за уделенное время!

Советую прочитать

База: различные статьи о конкурентности в Go (а их много) гуглятся по запросам Concurrency in Go и «Конкурентное программирование Go». Меня, например, больше всего вдохновила публикация, ссылку на которую приведу ниже. Скрывать не буду, идею с кейсами и даже один пример взял из нее. Однако другие примеры я встречал уже в рамках своей работы над продуктами.

Серия статей Go concurrency through problem solving Series' Articles за авторством Joash Xu.

Статья Race Conditions In Golang.

Статья Are Go Maps Sensitive To Data Races.

Также я рекомендую прочитать главы о конкурентности из книги: «Golang для профи: работа с сетью, многопоточность, структуры данных и машинное обучение с Go» Михалиса Цукалоса

А еще рекомендую посетить курс Golang-разработчик от МТС Тета, где рассматривается большой блок о конкурентности в Go и о том, как она реализована. Есть примеры кода и домашнее задание на написание конкурентного кода в рамках создания сервиса.

Комментарии (4)


  1. grSereger
    02.08.2022 15:23
    +2

    для кейса "Ограничение времени выполнения какого-либо процесса" я бы старался не городить горутины, а сделать контекст с таймаутом и внутри process обрабатывался бы отмену контекста.

    Бывают конечно кейсы, когда мы используем внешнюю либу, которая не умеет работать с контекстом, но в таком случае текущие решения я бы не позиционировал как best practices.


    1. perfectgentlemande Автор
      02.08.2022 16:28
      +1

      совершенно верно, поэтому и указал в примечании, что можно можно обрабатывать отмену контекста чтением из context.Done()


  1. RPG18
    02.08.2022 16:10

    В телекоме же придумали алгоритмы типа Leaky bucket для ограничения пропускной способности. Есть https://github.com/uber-go/ratelimit и https://pkg.go.dev/golang.org/x/time/rate


    1. perfectgentlemande Автор
      02.08.2022 16:29
      +1

      Хорошая идея! На одном из проектов (если конечно правильно помню, но ощущение, что правильно) у нас даже видел использование https://github.com/uber-go/ratelimit