Как правило, в Go для безопасного доступа к общим данным используются мьютексы. Да, каналы тоже можно приспособить для изменения общих данных, так как они потокобезопасны, но это усложняет и замедляет логику.

Но в этой статье мы поговорим о другом. Современные процессоры имеют поддержку атомарных операций, что позволяет на основе них организовывать работу с общими данными до нескольких раз быстрее, чем с помощью общепринятых вариантов. Так как мьютексы реализованы на основе ОС, каналы сделаны на основе внутреннего кода Go с использованием тех же мьютексов из ОС под капотом, а атомарные операции делает сам процессор аппаратно за существенно меньшее количество тактов.

Пример кода, который при многопоточности неожиданно начинает глючить


Начнём с основ. Вначале будет код, который параллельно к переменной прибавляет и убавляет по миллиону единиц. В типе A лежит счётчик, тип B добавляет 1 (метод B.II), тип С убавляет 1 (метод C.DD). И всё это параллельно. Казалось бы, тут не нужны никакие примитивы синхронизации, так как общее число прибавлений и убавлений равно миллиону, а поэтому результат будет равен нулю.

package main

import (
	"fmt"
	"sync"
)

type A struct {
	i int
}

func (a *A) Inc(delta int) {
	a.i += delta
}

func (a *A) Get() int {
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C

var wg sync.WaitGroup

func main() {
	var nCount = 1000000
        a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Однако если мы запустим этот код, то получим что угодно, но только не ноль. Пробуем!

Объяснить это можно только тем, что процессы параллельно лезут к одним и тем же ячейкам памяти, их действия накладываются друг на друга (хотя чему тут накладываться? инкремент/декремент — одна из самых примитивных операций), но компилятор не догадывается их использовать, а использует несколько регистров, вступают в действие кеши L1, L2 и L3, и на выходе вместо нуля белиберда.

Классика: мьютексы решают вопрос


Хорошо, теперь перепишем этот код классически, на базе мьютексов:

package main

import (
	"fmt"
	"sync"
)

type A struct {
	i int
	m sync.Mutex
}

func (a *A) Inc(delta int) {
	a.m.Lock()
	defer a.m.Unlock()

	a.i += delta
}

func (a *A) Get() int {
	a.m.Lock()
	defer a.m.Unlock()
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C
var wg sync.WaitGroup

func main() {
	var nCount = 1000000
	a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Мьютекс ожидаемо делает своё дело, и на выходе мы получаем ноль. Проверить самому. Кстати, мьютекс можно разместить в любом объекте, не только в защищаемом, и он сделает своё благородное дело. Только из соображений понятности и поддержки кода мьютексы обычно размещают в защищаемых объектах.

На сцену выходят примы: атомарные операции


Теперь же решим эту задачу неблокирующим, быстрым и красивым способом. Атомарным.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type A struct {
	i int64
}

func (a *A) Inc(delta int64) {
	atomic.AddInt64(&a.i, delta)
}

func (a *A) Get() int64 {
	return atomic.LoadInt64(&a.i)
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
		a.Inc(1)
	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
		a.Inc(-1)
	}
	wg.Done()
}

var a A
var b B
var c C

var wg sync.WaitGroup

func main() {
	var nCount = 1000000
	a.i = 0

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Во дела! Работает! Мы тоже получили на выходе ноль после параллельной работы увеличивающей и уменьшающей процедуры.

Достаточно было только использовать специфическую атомарную функцию atomic.AddInt64. Атомарное сложение — очень полезная функция, так как неблокирующим способом мы можем корректно складывать некие результаты из разных потоков практически без затрат времени на синхронизацию. Также я использовал atomic.LoadInt64, но в данном примере она необязательна.

Специфика атомарных операций в том, что они работают только с узким набором данных — это 32-х, 64-х битные данные и указатели.

▍ Сравнение скорости мьютексов и атомарных операций


Для того чтобы сравнить скорость действия атомарных операций и мьютексов я увеличил число операций до 100 миллионов.

Итак, атомарные операции: 2.12 секунды, мьютексы 5.85 секунды.
Как и следовало ожидать, атомарные операции существенно быстрее, примерно в 3 раза.

Частый случай использования атомарных операций, когда несколько горутин что-то делают с общим аккумулятором. Например, суммы убытков и прибылей из разных источников складываются и отображаются онлайн. А вот суммирование массива несколькими горутинами через атомарное сложение не ускоришь. Так как оно существенно дороже по процессорному времени, чем операция обычного сложения из-за сбросов кешей процессора.

С точки зрения процессора (и всех его многочисленных ядер) есть мир до атомарной операции и после. На основе этого принципа из атомарных операций можно делать самопальные конструкции для синхронизации многопоточной работы. И довольно быстрые.

Важный момент: скорость атомарных операций в виртуальной среде может быть значительно ниже из-за того, что они могут эмулироваться. Похоже, что так происходит в WSL.

Самодельный мьютекс на основе CompareAndSwap


Для того, чтобы создавать самодельные мьютексы, можно использовать специальную атомарную фунцию
CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

С ней всё ещё проще. Сделаем самодельный мьютекс.

type MyMutex struct {
    i int64
}

func (m *MyMutex) get() int64 {
    return atomic.LoadInt64(&m.i)
}

func (m *MyMutex) set(val int64) {
    atomic.StoreInt64(&m.i, val)
}

func (m *MyMutex) Lock() {
    for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
    }
    return
}

func (m *MyMutex) Unlock() {
    if m.get() == 0 {
      panic(fmt.Errorf("Double unlocking"))
    }

    m.set(0)
}

Этот подход известен в системном программировании как Compare and swap. Также для этой цели можно использовать подход Test and set, но пакет sync/atomic не предоставляет функций для этого подхода. «Compare and swap» в википедии считают более хорошим подходом. Дополнительная информация. При тестировании этот самодельный мьютекс показывает на 40% худшие результаты, чем оригинальный. Из-за нескольких причин:
  1. Есть цикл (ниже), на который тратится много процессорного времени.
    for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
    }
    
    Если бы могли в него вставить, что-то типа ассемблерных инструкций NO_OPERATION или какой-то микрозадержки, то стал бы лучше работать. Стандартная функция time.Sleep() плохо работает для этой цели. При выставлении 1 наносекунды там может быть в тысячи раз большая реальная задержка.
  2. Планировщик ОС, при использовании системного мьютекса, выделяет поменьше квантов времени тому потоку, который просто ждёт захвата мьютекса.
  3. Библиотечный мьютекс, наверняка, написан на более низком уровне.
  4. Обвязка для самодельного мьютекса, скорее всего, менее эффективно компилируется в ассемблерные инструкции.

Безопасная многопоточная работа с общими данными без каналов и мьютексов


Хорошо, а если нам нужны какие-то более сложные операции и структуры данных? Можно ли тут получить пользу от атомарных операций?

Да, можно! Перейдём к самому интересному. Создадим на основе атомарных операций самодельную потокобезопасную конструкцию, которая будет контролировать состояние конечного автомата. И сделаем это по другому, чем ранее в статье.

▍ Конечный автомат, обслуживающий несколько потоков на атомарных операциях


Некий автомат имеет 3 состояния: открыт, занят уменьшением, занят увеличением. Это условно. Состояний может быть больше. Они могут быть называться как угодно. Суть в том, что есть общие данные любого вида и мы организуем к ним бесконфликтный доступ разных методов в зависимости от состояния объекта. Без использования мьютексов и каналов.

Например, потому что мы знаем, что атомарные операции могут быть сильно быстрее, или просто любим ненормальное программирование.

Итак, представляю код. В нём (для демонстрации потенциальных вариантов реализации) я обошёлся без функции CompareAndSwap. Хотя с ней было бы чуть проще.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

const (
	Open int64 = iota
	Increasing
	Decreasing
)

type A struct {
	state int64
	i     int64
}

func (a *A) SetState(state int64) {
	atomic.StoreInt64(&a.state, state)
}

func (a *A) GetState() int64 {
	return atomic.LoadInt64(&a.state)
}

func (a *A) AddState(delta int64) int64 {
	return atomic.AddInt64(&a.state, delta)
}
// По хорошему тут должно быть 2 процедуры: для состояния Increasing и для Decreasing,
// но не хотелось плодить почти одинаковый код
func (a *A) Inc(delta int64) {
	if delta > 0 {
		a.SetState(Increasing)
	} else {
		a.SetState(Decreasing)
	}
	a.i += delta
	a.SetState(Open)
}

func (a *A) Init() {
	a.SetState(Open)
}

func (a *A) Get() int64 {
	return a.i
}

type B struct {
}

func (b *B) II(c int) {
	for i := 0; i < c; i++ {
	i_begin:
		for a.GetState() != Open {
		}
		if a.AddState(Increasing) == Increasing {
			a.Inc(1)
		} else {
			goto i_begin
		}

	}
	wg.Done()
}

type C struct {
}

func (c *C) DD(count int) {
	for i := 0; i < count; i++ {
	d_begin:
		for a.GetState() != Open {
		}
		if a.AddState(Decreasing) == Decreasing {
			a.Inc(-1)
		} else {
			goto d_begin
		}
	}
	wg.Done()
}

var a A
var b B
var c C
var wg sync.WaitGroup

func main() {
	a.Init()
	var nCount = 1000000

	wg.Add(1)
	go b.II(nCount)

	wg.Add(1)
	go c.DD(nCount)

	wg.Wait()
	fmt.Println("Sum", a.Get())
}

Запустить в Go Playground. Если проблем с многопоточностью нет, то код должен вывести ноль как результат. И выводит.

Обратите внимание, что в этом коде операция инкремента теперь обычная, а не атомарная, но тем не менее всё корректно считается.

Логика: когда объект A объявляет о том, что он открыт, то объекты B и C пытаются переключить его состояние в Increasing или Decreasing с помощью добавления к нулю (это состояние Open) значений Increasing или Decreasing. Тот, кто первый добавил, получает, например, Increasing. Второй уже получит значение Increasing + Decreasing, которое не даст ему начать делать свою работу и переведёт в состояние ожидания, пока состояние не станет Open. Мы можем использовать сколько угодно состояний, главное, чтобы сумма номеров любых остальных не равнялась никакому номеру другого состояния.

Вот кусочек с главным хаком, который делает нашу работу потокобезопасной:

for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
   a.Inc(-1)
}

Например, можно использовать степени двойки: 1, 2, 4, 8, 16, 32. Сумма любого количества состояний не будет равна никакому другому номеру состояния. Каждая степень двойки представлена бинарно отдельным битом, который не пересекается с другими битами. Каждое состояние будет представлено машинным словом со всего одной битовой единицей, только в разном месте. И поэтому при сложении состояний мы будем получать результат с несколькими единицами.

Наш переключатель реагирует только на чистые состояния, в которых только по одному включённому биту. И поэтому каждая горутина легко отличает ситуации, когда ей удалось занять очередь на исполнение, и начинает исполнять свою работу.

Для 100 миллионов циклов этот код выполнялся 14 секунд, аналогичный код с тремя состояними на мьютексах выполнялся 25.3 секунды. Код на мьютексах немного сложнее, чем мог бы быть, но только для того, чтобы имитировать объект с 3 состояниями и с минимально блокирующим доступом к нему. Наверное, более правильно его переписать на каналах, но тогда он будет выполняться ещё дольше, так как под капотом каналов всё те же мьютексы.

Сразу скажу, что в Go Playground не получится потестить мои скрипты со 100М выполнений, и функция времени там работает некорректно. Для того, чтобы воспроизвести мои эксперименты вам придётся компилировать самому.

Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!

Вывод


Атомарные функции в Go дают возможность организовать как блокирующие, так и в особенности неблокирующие алгоритмы с механизмами синхронизации, которые иногда работают в 2-3 раза быстрее, чем мьютексы и каналы. В некоторых ситуациях низкоуровневого программирования это может дать существенную пользу, хотя и требует экстремальной внимательности во время разработки.

Во всех остальных ситуациях разработчики Go рекомендуют использовать каналы и мьютексы, как более понятные и наглядные в применении.

Нативные мьютексы от ОС имеют более тесную связь с планировшиком задач, поэтому нет смысла переписывать именно мьютексы на атомарных операциях, но в других задачах, в особенности в неблокирующих алгоритмах, стоит рассмотреть атомарные операции для ускорения алгоритмов.

Более детально посмотреть на атомарные функции вы можете в документации Go.

Ранее на Хабре на эту тему: Атомарные и неатомарные операции, Atomic operation.

Telegram-канал со скидками, розыгрышами призов и новостями IT ?

Комментарии (25)


  1. olivera507224
    06.08.2024 21:36
    +1

    получим что угодно, но только не ноль

    Забавно


    1. inetstar Автор
      06.08.2024 21:36
      +4

      Поздравляем! Вы выиграли в лотерею!


      1. Mnwamnowich
        06.08.2024 21:36
        +1

        Для победы в лотерею, нужен всего лишь простой, советский GOMAXPROCS=1


  1. unreal_undead2
    06.08.2024 21:36

    Перед использованием атомиков стоит сначала подумать, нельзя ли модифицировать в каждом треде свои локальные данные (возможно, с последующей редукцией). Кстати, как в go работать c thread-local переменными?


    1. inetstar Автор
      06.08.2024 21:36

      В Go треды запускаются автоматически с командой go. Поэтому любая локальная переменная в горутине - это локальная переменная в треде. Поэтому работать с ними нужно как с обычными переменными.


      1. unreal_undead2
        06.08.2024 21:36

        Всё таки маппинг горутин на треды не 1:1. Что делать, если хочется модифицировать именно per-thread переменные (одну копию в разных горутинах, выполняющихся в конкретном треде), и в конце ещё пройтись по всем для получения конечного ответа?


        1. inetstar Автор
          06.08.2024 21:36

          В Го нет нормальных механизмов для того, чтобы отличать один тред от другого. Я видел какие-то хаки для этого.

          Я думаю, если вы сформулируете вашу задачу без привязки к тредам, то есть более правильное решение в парадигме Го. В чём задача?


          1. unreal_undead2
            06.08.2024 21:36

            Скажем, банально посчитать упомянутую сумму элементов массива, задействовав параллелизм (чтобы загрузить все ядра и контроллеры памяти на сокете), но без атомиков.


            1. inetstar Автор
              06.08.2024 21:36

              Я бы сделал так: сам массив разместил бы в общей памяти, а в каждую горутину передавал бы индексы начала интервала для суммирования и конца интервала для суммирования, полученные суммы участков массива сложил бы атомными сложениями, или отправил по каналу в суммирующую горутину.

              То есть у меня есть пул воркеров, которые читают канал, в который приходят структуры {nBegin: int, nEnd int}. Далее каждый воркер суммирует в нужном диапазоне и отправляет результат в суммирующую горутину (или атомно складывает)


              1. unreal_undead2
                06.08.2024 21:36

                полученные суммы участков массива сложил бы атомными сложениями, или отправил по каналу в суммирующую горутину.

                Как то дороговато по сравнению с обычным инкрементом thread-local переменной.


                1. inetstar Автор
                  06.08.2024 21:36

                  Для частичных сумм в диапазоне [nBegin, nEnd) это будет обычный инкремент суммы. Естественно, что таким образом нужно обрабатывать очень большие массивы, чтобы был выигрыш.


                  1. unreal_undead2
                    06.08.2024 21:36

                    Я имею в виду сложение результатов работы горутин. Для простой суммы элементов не так страшно, поскольку можно статически поделить массив по числу ядер и соответственно создать небольшое число горутин. Но если надо суммировать значение функции от элемента, время работы которой существенно зависит от данных, для балансировки работы стоит создать побольше горутин, и тогда редукция на атомиках может быть дороговата.


                    1. inetstar Автор
                      06.08.2024 21:36

                      Для такого случая я бы сделал пул воркеров, в каждый из которых отправлял бы те же интервалы, только меньшие. Частичные суммы считал бы в самих воркерах, а результирующие частичные суммы каждый воркер бы сложил атомно или через канал после того, как входной канал с интервалами будет закрыт.

                      В таком случае число атомных сложений (или через канал) будет равно число воркеров. И это будет дешево.

                      Балансировка при таком подходе тоже автоматически выполняется.


                      1. unreal_undead2
                        06.08.2024 21:36

                        я бы сделал пул воркеров, в каждый из которых отправлял бы те же интервалы

                        Выглядит так, что частично дублируется функциональность, уже заложенная в рантайм go.


  1. nickmanecannotbeempty
    06.08.2024 21:36

    У меня какие-то странные бенчмарки получаются. Почему-то атомики работают медленнее, чем мьютексы.

    Подскажите, может, я где-то проявил невнимательность?

    https://github.com/harnyk/go-atomic-vs-mutex


    1. inetstar Автор
      06.08.2024 21:36

      1. Пакет testing не подходит для этой цели. Мьютекс используют системные вызовы, думаю, testing их не считает. Лучше измерить время внутри Go через time.Now() и time.Since()

      2. Вы тестируете в WSL. В виртуальной среде атомные команды могут исполняться неэффективно


      1. nickmanecannotbeempty
        06.08.2024 21:36

        Это какая-то мистика прямо. Я запустил ваши тесты на голом линуксе без всякой виртуализации на той же машине, на которой запускал до этого их в WSL2.

        Результаты на скриншоте. На той машине (AMD Ryzen 9) атомики медленнее, чем мьютексы, пусть и не настолько сильно.

        Атомики медленнее в следующих средах:

        1. мой лэптоп AMD Ryzen 9:
        - голая винда
        - голый линукс
        - линукс через WSL

        2. AMD EPYC 7763 64-Core в облачном контейнере, где выполняется Github Actions

        Атомики быстрее:

        - линукс на Android-смартфоне через Termux (практически на голом железе). Там arm64.

        Это просто все машины, до которых я смог сегодня дотянуться.

        Исходники тестов - ваши, без изменений: https://github.com/harnyk/go-atomic-vs-mutex/tree/master/kamenev .

        Пакет testing, как и договаривались, не использую, гоняю скомпилированные бинарники.


        1. inetstar Автор
          06.08.2024 21:36

          В моих случаях на голом железе атомики быстрее на:

          1) AMD EPYC 7542 32-Core Processor
          2) AMD Opteron 2435

          У меня рабочая гипотеза, что на архитектуре Zen3 атомарные процедуры сложения работают как-то неэффективно.

          Или, наоборот, на Zen3 аппаратные инструкции связанные с мьютексами как-то подтянули.

          Понял. В го мьютексы пытаются сделать захват через атомарную операцию и если не удалось используют системный мьютекс. Поэтому в каких-то случаях (удачных) соревнуются 64х битные атомарные операции и 32х битные (которые в го используются для первой попытки захвата мьютекса).

          А в неудачных случаях, когда мьютексы захватываются непопеременно, то атомарные операции обгоняют. Из-за подключения системного мьютекса из ОС.


        1. inetstar Автор
          06.08.2024 21:36

          Я сделал 4 плюсующих потока и 4 минусующих.

          Atomic Time 4.431332195s
          Sum 0

          Mutex Time 47.299236939s
          Sum 0

          Достаточно было увеличить число потоков и атомарные операции показали себя во всей красе.

          Видимо, у вас выигрыш у мьютексов получался при 2 потоках из-за того, что планировщик Go удачно давал время каждой горутине так, что они не пересекались и не вызывались системные мьютексы.


          1. nickmanecannotbeempty
            06.08.2024 21:36
            +1

            У меня на лэптопе Ryzen 9 (Linux 6.8.0-39-generic) на вашем 4-поточном тесте результаты следующие:

            ❯ ./t_atom ; ./t_mut
            Time 4.179402911s
            Sum 0
            Time 7.681173311s
            Sum 0

            Ну, то есть, мьютексов уже проигрывают, но не прямо в 10 раз


  1. TovarischSuhov
    06.08.2024 21:36
    +2

    Вы не поверите, но Го не использует системные мьютексы, а использует реализацию на атомиках, которая не привязанна к OC. Вот здесь можно подробнее посмотреть https://github.com/golang/go/blob/master/src/sync/mutex.go


    1. inetstar Автор
      06.08.2024 21:36

      Вы почти правы. Ценное замечание.

      Вначале Go предпринимает попытку по-быстрому захватить свой собственный мьютекс через атомарную операцию, а если не получается, то использует системный вызов runtime_SemacquireMutex, а при разблокировке runtime_Semrelease.

      Кстати, работа с семафором в системном мьютексе (поле sema), делается на небезопасных указателях, для которых динамически выделяется память. А для получения нового куска памяти используются также системные вызовы.

      И ещё, интересный момент, когда я написал свой мьютекс на аналогичных операциях он оказался значительно медленнее более сложного Го-мьютекса. Это трудно поддаётся объяснению. Кроме как тем, что всё-таки используется системный семафор, который хорошо интегрирован с планировщиком задач ОС.

      Мой мьютекс можно попробовать улучшить ещё одним способом. Я использовал 64-х битную операцию CompareAndSwap, а в Го-мьютексе 32-х битная.


  1. GoDevSeoTaxi
    06.08.2024 21:36
    +2

    И все это - чтобы перекладывать json'ы

    Спасибо за статью!


  1. OlegMax
    06.08.2024 21:36

    Статья, полная пионерского задора:

    1. "Разработчики Go ничего не понимают в многопоточности, щас я покажу, как надо мьютексы писать. Зачем системные вызовы, если можно просто жарить процессор в циклах?"

    2. "Стоит рассмотреть атомарные операции для ускорения алгоритмов". В C/C++ уже давно сложился консенсус, что не стоит самому строить что-то сложное поверх атомарных операций. Но можно использовать lock-free библиотеки, очень тщательно убедившись в их надежности.


    1. inetstar Автор
      06.08.2024 21:36
      +1

      1-я цитата фальшивая. Я такого не писал.

      Вот реальная цитата:

      Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!

      Кстати, смешной момент в том, что мьютексы в Го написаны примерно также, через CompareAndSwap. С подключением системного вызова, чтобы в цикле не жарить проц.

      Большинство людей даже и не знает об атомарных операциях и что они поддерживаются на уровне процессора.

      Я их реально использую в коде. Они упрощают и убыстряют многие моменты параллельной работы.