Учимся применять Semaphore и Worker Pool на Go

Привет, Хабр! Я Артём Чаадаев из команды ассортимента размещения в Туту и занимаюсь разработкой на языке Go.

Большое количество статей посвящено простоте конкурентной разработки на Go, однако без практических примеров для начинающих разработчиков бывает трудно понять, как можно это применить. Более того, задачи на конкурентную разработку постоянно фигурируют на собеседованиях.

Поэтому в рамках данной статьи решим задачу, приближенную к реальным условиям, например, как деактивировать большое количество пользователей в стороннем API (обычно это бывает после применения бизнес-логики). Для решения используем два паттерна конкурентной разработки на Go: семафор (Semaphore) и пул обработчиков (Worker pool).

Также выясним плюсы и минусы обоих подходов с помощью небольших бенчмарк-тестов с точки зрения времени выполнения и затрат по памяти.

Постановка задачи

Есть некое количество пользователей, например от 1 до 100 000, по каждому надо выполнить функцию Deactivate. Как правило, это происходит путём отправки одного запроса в сторонний API (который не позволяет обновлять пачками) по данному пользователю с его ID и какими-нибудь данными. Каждый запрос даёт временные затраты. При последовательной отправке это будет приблизительно время тайм-аута запроса * количество пользователей. Это даст серьёзный рост временных затрат.

Для того чтобы ускорить время обработки пользователей, можем воспользоваться преимуществами языка Go, а именно возможностью обработки данных в конкурентном режиме. Однако при этом нужно иметь возможность ограничивать конкурентное выполнение.

В рамках данной статьи мы не будем рассматривать основы конкурентности, поэтому рекомендую к этому моменту ознакомиться с примитивами конкурентности: каналами, горутинами, WaitGroup, мьютексами. Цель данной статьи — познакомить вас с тем, как пользоваться примитивами конкурентности в рамках аналогичных задач.

Для решения данного кейса можем воспользоваться двумя подходами:

  • семафор (Semaphore);

  • пул обработчиков (Worker pool).

Для простоты не учтено завершение процессов по контексту, хотя это нужно сделать в реальных условиях.

Но при этом для наглядности (в своё время, когда я разбирался с подходами в аналогичных задачах, это меня интересовало) я оставил завершение процесса при первом же получении ошибки.

Семафор (Semaphore)

В рамках данного подхода создаётся, по сути, неограниченное количество горутин. Их выполнение блокируется с помощью объекта Semaphore, который ограничивает количество одновременно выполняемых горутин с помощью буферизированного канала. В данном подходе на каждую задачу создаётся горутина. Однако если буфер канала семафора переполнен, то при операции Acquire горутина с задачей блокируется до тех пор, пока буфер не освободится с помощью операции Release.

Для сбора результатов используется канал, из которого происходит чтение в основной горутине (здесь, конечно, речь идёт о той, в которой выполняется функция, это совсем не обязательно может быть горутина, в которой выполняется функция main).

Реализация в рамках примера показана ниже:

type Semaphore struct {
	C chan struct{}
}

func (s *Semaphore) Acquire() {
	s.C <- struct{}{}
}

func (s *Semaphore) Release() {
	<-s.C
}

type resultWithError struct {
	User users.User
	Err  error
}

func DeactivateUsers(usrs []users.User, gCount int) ([]users.User, error) {
	// создаем семафор и передаем ему канал с размером буфера равным ограничению на количество одновременно выполняемых горутин
	sem := Semaphore{
		C: make(chan struct{}, gCount),
	}

	// канал для сбора результатов
	outputCh := make(chan resultWithError, len(usrs))
	// канал для оповещения горутин, что мы больше не ждем их выполнения
	sgnlCh := make(chan struct{})

	output := make([]users.User, 0, len(usrs))

	for _, v := range usrs {
		go func(usr users.User) {
			sem.Acquire()
			defer sem.Release()

			err := usr.Deactivate()

			// если ловим закрытие сигнального канала, то завершаем функцию
			select {
			case outputCh <- resultWithError{
				User: usr,
				Err:  err,
			}:
			case <-sgnlCh:
				return
			}
		}(v)
	}

	// ждем и собираем результаты
	// либо мы получим все, либо выйдет ошибка, по которой мы перестанем читать
	for i := len(usrs); i > 0; i-- {
		res := <-outputCh
		if res.Err != nil {
			close(sgnlCh)
			return nil, fmt.Errorf("an error occurred: %w", res.Err)
		}

		output = append(output, res.User)
	}

	return output, nil
}

Пул обработчиков (Worker pool)

Здесь же мы создаём настраиваемое заранее фиксированное количество горутин, которое не зависит от того, сколько задач на входе. Таким образом, не нужно создавать сотни тысяч горутин при соответствующем наборе пользователей.

Каждая из горутин читает общий канал с входными данными, куда отправляются задачи, пока он не закроется.

Аналогично предыдущему способу в основной горутине (здесь, конечно, речь идёт о той, в которой выполняется функция) собираются результаты.

Реализация в рамках примера показана ниже:


type resultWithError struct {
	User users.User
	Err  error
}

func deactivateUser(wg *sync.WaitGroup, inCh <-chan users.User, outCh chan<- resultWithError) {
	defer wg.Done()

	for usr := range inCh {
		err := usr.Deactivate()
		outCh <- resultWithError{
			User: usr,
			Err:  err,
		}
	}
}

func DeactivateUsers(usrs []users.User, wgCount int) ([]users.User, error) {
	// канал для передачи входных данных горутинам
	inputCh := make(chan users.User)
	// канал для сбора результатов
	outputCh := make(chan resultWithError)
	// для ожидания завершения всех горутин
	wg := &sync.WaitGroup{}

	output := make([]users.User, 0, len(usrs))

	go func() {
		defer close(inputCh)

		for i := range usrs {
			inputCh <- usrs[i]
		}
	}()

	go func() {
		for i := 0; i < wgCount; i++ {
			wg.Add(1)

			go deactivateUser(wg, inputCh, outputCh)
		}
		wg.Wait()
		close(outputCh)
	}()

	// собираем результаты
	for res := range outputCh {
		if res.Err != nil {
			return nil, fmt.Errorf("an error occurred: %w", res.Err)
		}

		output = append(output, res.User)
	}

	return output, nil
}

Бенчим

В рамках работы над статьёй мне было интересно сравнить оба этих подхода с помощью бенчмарков.

Условия для бенчмаркинга были следующими:

  • количество пользователей — от 1 до 100 000;

  • количество горутин — от 10 до 1000;

  • количество запусков каждых кейсов —10.

Для этого использовалась утилита benchstat. Она есть тут: https://cs.opensource.google/go/x/perf.

В рамках бенчмаркинга рассмотрены следующие параметры:

  • затраты по времени (sec/op — секунд на операцию);

  • затраты по памяти (B/op — байт на операцию);

  • количество аллокаций (allocs/op — количество операций выделения памяти на операцию).

Сравнение затрат по времени

                                                           │ semaphore.txt │               workerpool.txt               │
                                                           │    sec/op     │     sec/op       vs base                   │
DeactivateUsers/input_size_1_goroutines_count_10-10           713.2n ±  0%     4030.5n ±  1%    +465.09% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10          5.752µ ±  1%      7.236µ ±  0%     +25.80% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10         47.49µ ±  4%      41.69µ ±  0%     -12.21% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10        496.2µ ± 20%      386.6µ ±  1%     -22.09% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10       4.567m ± 17%      3.979m ±  1%     -12.86% (p=0.023 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10      67.03m ±  2%      39.95m ±  1%     -40.41% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10          708.9n ±  0%    19625.0n ±  2%   +2668.18% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10         5.763µ ±  1%     20.340µ ±  2%    +252.96% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10        47.32µ ±  2%      58.72µ ±  1%     +24.10% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10       490.4µ ±  8%      466.2µ ± 26%           ~ (p=0.436 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10      4.998m ± 19%      4.812m ±  6%           ~ (p=0.218 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10     66.58m ±  2%      58.71m ±  5%     -11.81% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10         707.2n ±  0%   317225.0n ±  4%  +44756.48% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10        5.760µ ±  1%    309.622µ ±  4%   +5275.85% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10       48.93µ ±  4%     324.29µ ± 14%    +562.82% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10      518.3µ ± 12%      636.7µ ± 14%     +22.84% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10     4.301m ± 14%      4.728m ± 16%           ~ (p=0.529 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10    67.48m ±  3%      69.69m ± 19%           ~ (p=1.000 n=10)

Видим:

  • огда максимальное количество одновременно выполняемых горутин больше, чем задач, то временные затраты у семафора меньше на 1–3 порядка;

  • когда максимальное количество одновременно выполняемых горутин меньше, чем задач, то временные затраты в принципе держатся в тех же порядках, хотя и Worker pool быстрее;

  • при количестве горутин от 100 и больше незначительно растут временные затраты у Worker pool, однако не особо меняются у семафора.

Сравнение затрат по памяти

                                                           │ semaphore.txt │             workerpool.txt              │
                                                           │     B/op      │     B/op       vs base                  │
DeactivateUsers/input_size_1_goroutines_count_10-10             552.0 ± 0%      688.0 ± 0%    +24.64% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10          2.695Ki ± 0%    1.172Ki ± 0%    -56.52% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10        25.227Ki ± 0%    6.609Ki ± 0%    -73.80% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10       237.66Ki ± 0%    56.61Ki ± 0%    -76.18% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10      2350.1Ki ± 0%    552.6Ki ± 0%    -76.48% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10     22.892Mi ± 0%    5.344Mi ± 0%    -76.65% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10            552.0 ± 0%     3568.0 ± 0%   +546.38% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10         2.695Ki ± 0%    3.984Ki ± 0%    +47.83% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10       25.227Ki ± 0%    9.424Ki ± 0%    -62.64% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10      237.66Ki ± 0%    59.43Ki ± 0%    -74.99% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10     2350.0Ki ± 0%    555.5Ki ± 0%    -76.36% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10    22.892Mi ± 0%    5.347Mi ± 0%    -76.64% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10           552.0 ± 0%    32371.5 ± 0%  +5764.40% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10        2.695Ki ± 0%   32.115Ki ± 0%  +1091.52% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10       25.23Ki ± 0%    37.58Ki ± 0%    +48.95% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10     237.66Ki ± 0%    87.60Ki ± 0%    -63.14% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10    2350.0Ki ± 0%    583.9Ki ± 0%    -75.15% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10   22.892Mi ± 0%    5.378Mi ± 0%    -76.51% (p=0.000 n=10)

Видим:

  • когда максимальное количество одновременно выполняемых горутин больше, чем задач, то затраты по памяти у семафора меньше, разница вышла на 1–2 порядка;

  • когда максимальное количество одновременно выполняемых горутин меньше, чем задач, то затраты по памяти у Worker pool меньше, но разница примерно на 1 порядок.

Сравнение количества аллокаций

                                                           │ semaphore.txt  │              workerpool.txt              │
                                                           │   allocs/op    │   allocs/op    vs base                   │
DeactivateUsers/input_size_1_goroutines_count_10-10              8.000 ± 0%     16.000 ± 0%    +100.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_10-10             26.00 ± 0%      16.00 ± 0%     -38.46% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_10-10           206.00 ± 0%      16.00 ± 0%     -92.23% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_10-10         2006.00 ± 0%      16.00 ± 0%     -99.20% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_10-10       20006.00 ± 0%      16.00 ± 0%     -99.92% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_10-10     200006.00 ± 0%      16.00 ± 0%     -99.99% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_100-10             8.000 ± 0%    106.000 ± 0%   +1225.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_100-10            26.00 ± 0%     106.00 ± 0%    +307.69% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_100-10           206.0 ± 0%      106.0 ± 0%     -48.54% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_100-10         2006.0 ± 0%      106.0 ± 0%     -94.72% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_100-10       20006.0 ± 0%      106.0 ± 1%     -99.47% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_100-10     200006.0 ± 0%      108.5 ± 2%     -99.95% (p=0.000 n=10)
DeactivateUsers/input_size_1_goroutines_count_1000-10            8.000 ± 0%   1006.000 ± 0%  +12475.00% (p=0.000 n=10)
DeactivateUsers/input_size_10_goroutines_count_1000-10           26.00 ± 0%    1006.00 ± 0%   +3769.23% (p=0.000 n=10)
DeactivateUsers/input_size_100_goroutines_count_1000-10          206.0 ± 0%     1006.0 ± 0%    +388.35% (p=0.000 n=10)
DeactivateUsers/input_size_1000_goroutines_count_1000-10        2.006k ± 0%     1.006k ± 0%     -49.85% (p=0.000 n=10)
DeactivateUsers/input_size_10000_goroutines_count_1000-10      20.006k ± 0%     1.010k ± 0%     -94.95% (p=0.000 n=10)
DeactivateUsers/input_size_100000_goroutines_count_1000-10    200.006k ± 0%     1.040k ± 1%     -99.48% (p=0.000 n=10)

Видим:

  • при изменении нагрузки количество аллокаций растёт пропорционально количеству задач (и это логично, на каждую задачу создаём горутину) в семафоре;

  • при изменении нагрузки количество аллокаций константно для Worker pool и может поменяться, только если поменять заданное количество горутин.

Выводы

По ходу статьи мы познакомились с двумя паттернами конкурентности: семафор (Semaphore) и пул обработчиков (Worker pool).

При решении примера, указанного выше, можно использовать оба этих паттерна на выбор. Исходя из результатов бенчмаркинга, расхождения в затратах по времени и памяти не такие большие (в районе одного порядка). Разница ощутима только для ситуации, когда задач меньше чем горутин и, соответственно, наоборот.

Возможно у кого-то уже были практические примеры, где можно было бы чётко проследить, что конкретно лучше использовать, буду рад, если поделитесь в комментариях.

Подробнее просмотреть репозиторий вместе с тестами и бенчмарками можно здесь.

Комментарии (19)


  1. dsh2dsh
    17.08.2023 14:18
    +3

    Не стоит изобретать новые велосипеды. Уже есть https://pkg.go.dev/golang.org/x/sync/errgroup


    1. Sild
      17.08.2023 14:18
      +3

      Не то чтобы семафоры были изобретены только что https://pkg.go.dev/golang.org/x/sync/semaphore

      Вообще по стечению обстоятельств вот только что обсуждали мой код, где коллега убедительно убедил что использовать семафоры - хорошая затея. Код действительно стал проще.

      Правда в моем кейсе на производительность наплевать, так что бонус лично для меня чисто эстетический


      1. perfectgentlemande Автор
        17.08.2023 14:18

        Спасибо!
        А если это не NDA, то можно абстрактно расписать кейс?


        1. Sild
          17.08.2023 14:18
          +1

          Джсоны перекладываем...

          Достаем из сервис1 список ресурсов, и для каждого ресурса выполняем пайплайн в сервис2.
          Пайплайн синхронный и состоит из нескольких api-вызовов (условно, resource1/check, resource1/init, resource1/update)

          Чтобы не насиловать api сервис2, запускаем пайплайн на N ресурсов параллельно, если для какого-то ресурса пайплайн завершился - может запускать для следующего


          1. perfectgentlemande Автор
            17.08.2023 14:18

            интересный кейс, спасибо!


      1. dsh2dsh
        17.08.2023 14:18
        +1

        Я не точно выразился, думал сразу будет понятно. Я не семафоры имел ввиду, а worker pool. Хотя, истины ради, и я писал свою реализацию.


    1. perfectgentlemande Автор
      17.08.2023 14:18

      Спасибо! Видел этот пакет, даже пользовался пару раз)


      1. xakep666
        17.08.2023 14:18
        +3

        Порекомендую также посмотреть на https://github.com/sourcegraph/conc
        субъективно, оказалось удобнее, чем errgroup и semaphore


        1. micronull
          17.08.2023 14:18
          +1

          А мы пользуемся https://github.com/alitto/pond
          Группировка, таймауты и много ещё чего.
          Различные стратегии для наращивания воркеров.


  1. MihaTeam
    17.08.2023 14:18
    +1

    Отличная статья! Есть пример для семафора, к сожалению он не затрагивает воркер пул, но думаю тк статья про семафор, то имеет место быть. Плюс, если кто-то захочет найти еще примеры использования семафора, то может посмотреть исходники pgx (а точнее pgxpool).

    Когда я писал тесты для слоя базы данных(тестировал без мока бд). То была проблема с тем, что тесты ломали друг друга. Решений было несколько:
    1. Ограничить количество одновременных тестов до 1 (т.к у нас не только бд тесты, то их выполнение на 1 потоке занимает много времени)
    2. Создавать отдельные схемы для каждого тесты (усложнение логики)
    3. Запускать для каждого теста отдельный контейнер
    4. Использовать семафор с размером 1 для лока 1 теста бд за раз (плюсы: простая логика. Минусы, при большом количество тестов для базы данных скорее всего это станет узким горлом).
    Собственно 4 вариант и был выбран. К слову, если кто-то знает еще варианты решения этого вопроса, то буду рад узнать, как минимум для понимания того, что я мог упустить.

    Также было бы интересно посмотреть использование паттернов fan-in и fan-out на практике.


    1. dsh2dsh
      17.08.2023 14:18

      1. Использовать семафор с размером 1 для лока 1 теста бд за раз

      Что-то я не уловил. Разве этим самым вы не указываете, что тесты с БД должны выполняться последовательно? Но ведь тоже самое можно получить, просто не указывая для таких тестов

      t.Parallel()

      Или я чего-то не понял?


      1. MihaTeam
        17.08.2023 14:18
        +1

        Тесты из разных пакетов буду запускаться в любом случае параллельно(только если не ограничить количество одновременных задач, см первый вариант), t.Parallel() указывается для параллельного запуска тестов из одного пакета или сабтестов. В этом и была проблема, у меня несколько пакетов, 1 за каждую таблицу. Если я где-то ошибаюсь, то буду рад если кто-то меня поправит.


  1. NuclearMurderer
    17.08.2023 14:18
    +1

    Автору большое спасибо за примеры!


  1. Fenogik
    17.08.2023 14:18
    +1

    А зачем плодить миллион висящих горутин?

    for _, v := range usrs {
      sem.Acquire()
      go func(usr users.User) {
        defer sem.Release()
        ...
      }
    }


    1. perfectgentlemande Автор
      17.08.2023 14:18

      Вот меня мучил тот же вопрос, когда я гуглил реализации семафоров когда-то. Поэтому когда я решал аналогичную задачу, я делал это именно вторым способом. Горутины нет смысла плодить бесконечно, это забьет ресурсы, поэтому для меня в тот момент было логичным ограничить как раз количество созданных горутин.

      Но я не отрицаю, что первый способ нужен в рамках каких-нибудь других примеров (было бы интересно узнать для каких). И из плюсов я заметил, что не будет ситуации что ты воркерпулу из большого числа заранее созданных висящих горутин скормишь пару задач. Но это условный плюс.


  1. nskforward
    17.08.2023 14:18

           select {
    			case outputCh <- resultWithError{
    				User: usr,
    				Err:  err,
    			}:
    			case <-sgnlCh:
    				return
    		}

    В этой конструкции sgnlCh должен идти первым в селекте, а не последним


    1. Anexroid
      17.08.2023 14:18
      +1

      А какая разница? Порядок обработки select-case не определен, если в обоих каналах есть данные


      1. nskforward
        17.08.2023 14:18

        Вы правы, перемещение одного case вверх не изменит ситуацию. Мне изначально резануло глаз то, что первый case во всех случаях готов исполниться. А значит, даже если у нас есть сигнал на выход, нет никакой гарантии сразу завершить работу. Изначально я почему то думал, что первый case немного приоритетнее, но был не прав, извиняюсь.


        1. perfectgentlemande Автор
          17.08.2023 14:18

          при закрытии sgnlCh по факту не происходит уже чтения из outputCh. Это значит, что первый кейс (case outputCh <-) к моменту закрытия sgnlCh будет в блокировке всегда в данном примере.