Как правило, в Go для безопасного доступа к общим данным используются мьютексы. Да, каналы тоже можно приспособить для изменения общих данных, так как они потокобезопасны, но это усложняет и замедляет логику.
Но в этой статье мы поговорим о другом. Современные процессоры имеют поддержку атомарных операций, что позволяет на основе них организовывать работу с общими данными до нескольких раз быстрее, чем с помощью общепринятых вариантов. Так как мьютексы реализованы на основе ОС, каналы сделаны на основе внутреннего кода Go с использованием тех же мьютексов из ОС под капотом, а атомарные операции делает сам процессор аппаратно за существенно меньшее количество тактов.
Пример кода, который при многопоточности неожиданно начинает глючить
Начнём с основ. Вначале будет код, который параллельно к переменной прибавляет и убавляет по миллиону единиц. В типе A лежит счётчик, тип B добавляет 1 (метод B.II), тип С убавляет 1 (метод C.DD). И всё это параллельно. Казалось бы, тут не нужны никакие примитивы синхронизации, так как общее число прибавлений и убавлений равно миллиону, а поэтому результат будет равен нулю.
package main
import (
"fmt"
"sync"
)
type A struct {
i int
}
func (a *A) Inc(delta int) {
a.i += delta
}
func (a *A) Get() int {
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Однако если мы запустим этот код, то получим что угодно, но только не ноль. Пробуем!
Объяснить это можно только тем, что процессы параллельно лезут к одним и тем же ячейкам памяти, их действия накладываются друг на друга (хотя чему тут накладываться? инкремент/декремент — одна из самых примитивных операций), но компилятор не догадывается их использовать, а использует несколько регистров, вступают в действие кеши L1, L2 и L3, и на выходе вместо нуля белиберда.
Классика: мьютексы решают вопрос
Хорошо, теперь перепишем этот код классически, на базе мьютексов:
package main
import (
"fmt"
"sync"
)
type A struct {
i int
m sync.Mutex
}
func (a *A) Inc(delta int) {
a.m.Lock()
defer a.m.Unlock()
a.i += delta
}
func (a *A) Get() int {
a.m.Lock()
defer a.m.Unlock()
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Мьютекс ожидаемо делает своё дело, и на выходе мы получаем ноль. Проверить самому. Кстати, мьютекс можно разместить в любом объекте, не только в защищаемом, и он сделает своё благородное дело. Только из соображений понятности и поддержки кода мьютексы обычно размещают в защищаемых объектах.
На сцену выходят примы: атомарные операции
Теперь же решим эту задачу неблокирующим, быстрым и красивым способом. Атомарным.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type A struct {
i int64
}
func (a *A) Inc(delta int64) {
atomic.AddInt64(&a.i, delta)
}
func (a *A) Get() int64 {
return atomic.LoadInt64(&a.i)
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Во дела! Работает! Мы тоже получили на выходе ноль после параллельной работы увеличивающей и уменьшающей процедуры.
Достаточно было только использовать специфическую атомарную функцию atomic.AddInt64. Атомарное сложение — очень полезная функция, так как неблокирующим способом мы можем корректно складывать некие результаты из разных потоков практически без затрат времени на синхронизацию. Также я использовал atomic.LoadInt64, но в данном примере она необязательна.
Специфика атомарных операций в том, что они работают только с узким набором данных — это 32-х, 64-х битные данные и указатели.
▍ Сравнение скорости мьютексов и атомарных операций
Для того чтобы сравнить скорость действия атомарных операций и мьютексов я увеличил число операций до 100 миллионов.
Итак, атомарные операции: 2.12 секунды, мьютексы 5.85 секунды.
Как и следовало ожидать, атомарные операции существенно быстрее, примерно в 3 раза.
Частый случай использования атомарных операций, когда несколько горутин что-то делают с общим аккумулятором. Например, суммы убытков и прибылей из разных источников складываются и отображаются онлайн. А вот суммирование массива несколькими горутинами через атомарное сложение не ускоришь. Так как оно существенно дороже по процессорному времени, чем операция обычного сложения из-за сбросов кешей процессора.
С точки зрения процессора (и всех его многочисленных ядер) есть мир до атомарной операции и после. На основе этого принципа из атомарных операций можно делать самопальные конструкции для синхронизации многопоточной работы. И довольно быстрые.
Важный момент: скорость атомарных операций в виртуальной среде может быть значительно ниже из-за того, что они могут эмулироваться. Похоже, что так происходит в WSL.
Самодельный мьютекс на основе CompareAndSwap
Для того, чтобы создавать самодельные мьютексы, можно использовать специальную атомарную фунцию
CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
С ней всё ещё проще. Сделаем самодельный мьютекс.
type MyMutex struct {
i int64
}
func (m *MyMutex) get() int64 {
return atomic.LoadInt64(&m.i)
}
func (m *MyMutex) set(val int64) {
atomic.StoreInt64(&m.i, val)
}
func (m *MyMutex) Lock() {
for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
}
return
}
func (m *MyMutex) Unlock() {
if m.get() == 0 {
panic(fmt.Errorf("Double unlocking"))
}
m.set(0)
}
Этот подход известен в системном программировании как Compare and swap. Также для этой цели можно использовать подход Test and set, но пакет sync/atomic не предоставляет функций для этого подхода. «Compare and swap» в википедии считают более хорошим подходом. Дополнительная информация. При тестировании этот самодельный мьютекс показывает на 40% худшие результаты, чем оригинальный. Из-за нескольких причин:
- Есть цикл (ниже), на который тратится много процессорного времени.
Если бы могли в него вставить, что-то типа ассемблерных инструкций NO_OPERATION или какой-то микрозадержки, то стал бы лучше работать. Стандартная функция time.Sleep() плохо работает для этой цели. При выставлении 1 наносекунды там может быть в тысячи раз большая реальная задержка.for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true { }
- Планировщик ОС, при использовании системного мьютекса, выделяет поменьше квантов времени тому потоку, который просто ждёт захвата мьютекса.
- Библиотечный мьютекс, наверняка, написан на более низком уровне.
- Обвязка для самодельного мьютекса, скорее всего, менее эффективно компилируется в ассемблерные инструкции.
Безопасная многопоточная работа с общими данными без каналов и мьютексов
Хорошо, а если нам нужны какие-то более сложные операции и структуры данных? Можно ли тут получить пользу от атомарных операций?
Да, можно! Перейдём к самому интересному. Создадим на основе атомарных операций самодельную потокобезопасную конструкцию, которая будет контролировать состояние конечного автомата. И сделаем это по другому, чем ранее в статье.
▍ Конечный автомат, обслуживающий несколько потоков на атомарных операциях
Некий автомат имеет 3 состояния: открыт, занят уменьшением, занят увеличением. Это условно. Состояний может быть больше. Они могут быть называться как угодно. Суть в том, что есть общие данные любого вида и мы организуем к ним бесконфликтный доступ разных методов в зависимости от состояния объекта. Без использования мьютексов и каналов.
Например, потому что мы знаем, что атомарные операции могут быть сильно быстрее, или просто любим ненормальное программирование.
Итак, представляю код. В нём (для демонстрации потенциальных вариантов реализации) я обошёлся без функции CompareAndSwap. Хотя с ней было бы чуть проще.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
const (
Open int64 = iota
Increasing
Decreasing
)
type A struct {
state int64
i int64
}
func (a *A) SetState(state int64) {
atomic.StoreInt64(&a.state, state)
}
func (a *A) GetState() int64 {
return atomic.LoadInt64(&a.state)
}
func (a *A) AddState(delta int64) int64 {
return atomic.AddInt64(&a.state, delta)
}
// По хорошему тут должно быть 2 процедуры: для состояния Increasing и для Decreasing,
// но не хотелось плодить почти одинаковый код
func (a *A) Inc(delta int64) {
if delta > 0 {
a.SetState(Increasing)
} else {
a.SetState(Decreasing)
}
a.i += delta
a.SetState(Open)
}
func (a *A) Init() {
a.SetState(Open)
}
func (a *A) Get() int64 {
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
i_begin:
for a.GetState() != Open {
}
if a.AddState(Increasing) == Increasing {
a.Inc(1)
} else {
goto i_begin
}
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
d_begin:
for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
a.Inc(-1)
} else {
goto d_begin
}
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
a.Init()
var nCount = 1000000
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Запустить в Go Playground. Если проблем с многопоточностью нет, то код должен вывести ноль как результат. И выводит.
Обратите внимание, что в этом коде операция инкремента теперь обычная, а не атомарная, но тем не менее всё корректно считается.
Логика: когда объект A объявляет о том, что он открыт, то объекты B и C пытаются переключить его состояние в Increasing или Decreasing с помощью добавления к нулю (это состояние Open) значений Increasing или Decreasing. Тот, кто первый добавил, получает, например, Increasing. Второй уже получит значение Increasing + Decreasing, которое не даст ему начать делать свою работу и переведёт в состояние ожидания, пока состояние не станет Open. Мы можем использовать сколько угодно состояний, главное, чтобы сумма номеров любых остальных не равнялась никакому номеру другого состояния.
Вот кусочек с главным хаком, который делает нашу работу потокобезопасной:
for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
a.Inc(-1)
}
Например, можно использовать степени двойки: 1, 2, 4, 8, 16, 32. Сумма любого количества состояний не будет равна никакому другому номеру состояния. Каждая степень двойки представлена бинарно отдельным битом, который не пересекается с другими битами. Каждое состояние будет представлено машинным словом со всего одной битовой единицей, только в разном месте. И поэтому при сложении состояний мы будем получать результат с несколькими единицами.
Наш переключатель реагирует только на чистые состояния, в которых только по одному включённому биту. И поэтому каждая горутина легко отличает ситуации, когда ей удалось занять очередь на исполнение, и начинает исполнять свою работу.
Для 100 миллионов циклов этот код выполнялся 14 секунд, аналогичный код с тремя состояними на мьютексах выполнялся 25.3 секунды. Код на мьютексах немного сложнее, чем мог бы быть, но только для того, чтобы имитировать объект с 3 состояниями и с минимально блокирующим доступом к нему. Наверное, более правильно его переписать на каналах, но тогда он будет выполняться ещё дольше, так как под капотом каналов всё те же мьютексы.
Сразу скажу, что в Go Playground не получится потестить мои скрипты со 100М выполнений, и функция времени там работает некорректно. Для того, чтобы воспроизвести мои эксперименты вам придётся компилировать самому.
Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!
Вывод
Атомарные функции в Go дают возможность организовать как блокирующие, так и в особенности неблокирующие алгоритмы с механизмами синхронизации, которые иногда работают в 2-3 раза быстрее, чем мьютексы и каналы. В некоторых ситуациях низкоуровневого программирования это может дать существенную пользу, хотя и требует экстремальной внимательности во время разработки.
Во всех остальных ситуациях разработчики Go рекомендуют использовать каналы и мьютексы, как более понятные и наглядные в применении.
Нативные мьютексы от ОС имеют более тесную связь с планировшиком задач, поэтому нет смысла переписывать именно мьютексы на атомарных операциях, но в других задачах, в особенности в неблокирующих алгоритмах, стоит рассмотреть атомарные операции для ускорения алгоритмов.
Более детально посмотреть на атомарные функции вы можете в документации Go.
Ранее на Хабре на эту тему: Атомарные и неатомарные операции, Atomic operation.
Telegram-канал со скидками, розыгрышами призов и новостями IT ?
Комментарии (25)
unreal_undead2
06.08.2024 21:36Перед использованием атомиков стоит сначала подумать, нельзя ли модифицировать в каждом треде свои локальные данные (возможно, с последующей редукцией). Кстати, как в go работать c thread-local переменными?
inetstar Автор
06.08.2024 21:36В Go треды запускаются автоматически с командой go. Поэтому любая локальная переменная в горутине - это локальная переменная в треде. Поэтому работать с ними нужно как с обычными переменными.
unreal_undead2
06.08.2024 21:36Всё таки маппинг горутин на треды не 1:1. Что делать, если хочется модифицировать именно per-thread переменные (одну копию в разных горутинах, выполняющихся в конкретном треде), и в конце ещё пройтись по всем для получения конечного ответа?
inetstar Автор
06.08.2024 21:36В Го нет нормальных механизмов для того, чтобы отличать один тред от другого. Я видел какие-то хаки для этого.
Я думаю, если вы сформулируете вашу задачу без привязки к тредам, то есть более правильное решение в парадигме Го. В чём задача?unreal_undead2
06.08.2024 21:36Скажем, банально посчитать упомянутую сумму элементов массива, задействовав параллелизм (чтобы загрузить все ядра и контроллеры памяти на сокете), но без атомиков.
inetstar Автор
06.08.2024 21:36Я бы сделал так: сам массив разместил бы в общей памяти, а в каждую горутину передавал бы индексы начала интервала для суммирования и конца интервала для суммирования, полученные суммы участков массива сложил бы атомными сложениями, или отправил по каналу в суммирующую горутину.
То есть у меня есть пул воркеров, которые читают канал, в который приходят структуры {nBegin: int, nEnd int}. Далее каждый воркер суммирует в нужном диапазоне и отправляет результат в суммирующую горутину (или атомно складывает)unreal_undead2
06.08.2024 21:36полученные суммы участков массива сложил бы атомными сложениями, или отправил по каналу в суммирующую горутину.
Как то дороговато по сравнению с обычным инкрементом thread-local переменной.
inetstar Автор
06.08.2024 21:36Для частичных сумм в диапазоне [nBegin, nEnd) это будет обычный инкремент суммы. Естественно, что таким образом нужно обрабатывать очень большие массивы, чтобы был выигрыш.
unreal_undead2
06.08.2024 21:36Я имею в виду сложение результатов работы горутин. Для простой суммы элементов не так страшно, поскольку можно статически поделить массив по числу ядер и соответственно создать небольшое число горутин. Но если надо суммировать значение функции от элемента, время работы которой существенно зависит от данных, для балансировки работы стоит создать побольше горутин, и тогда редукция на атомиках может быть дороговата.
inetstar Автор
06.08.2024 21:36Для такого случая я бы сделал пул воркеров, в каждый из которых отправлял бы те же интервалы, только меньшие. Частичные суммы считал бы в самих воркерах, а результирующие частичные суммы каждый воркер бы сложил атомно или через канал после того, как входной канал с интервалами будет закрыт.
В таком случае число атомных сложений (или через канал) будет равно число воркеров. И это будет дешево.
Балансировка при таком подходе тоже автоматически выполняется.unreal_undead2
06.08.2024 21:36я бы сделал пул воркеров, в каждый из которых отправлял бы те же интервалы
Выглядит так, что частично дублируется функциональность, уже заложенная в рантайм go.
nickmanecannotbeempty
06.08.2024 21:36У меня какие-то странные бенчмарки получаются. Почему-то атомики работают медленнее, чем мьютексы.
Подскажите, может, я где-то проявил невнимательность?
https://github.com/harnyk/go-atomic-vs-mutex
inetstar Автор
06.08.2024 21:36Пакет testing не подходит для этой цели. Мьютекс используют системные вызовы, думаю, testing их не считает. Лучше измерить время внутри Go через time.Now() и time.Since()
Вы тестируете в WSL. В виртуальной среде атомные команды могут исполняться неэффективно
nickmanecannotbeempty
06.08.2024 21:36Это какая-то мистика прямо. Я запустил ваши тесты на голом линуксе без всякой виртуализации на той же машине, на которой запускал до этого их в WSL2.
Результаты на скриншоте. На той машине (AMD Ryzen 9) атомики медленнее, чем мьютексы, пусть и не настолько сильно.
Атомики медленнее в следующих средах:
1. мой лэптоп AMD Ryzen 9:
- голая винда
- голый линукс
- линукс через WSL
2. AMD EPYC 7763 64-Core в облачном контейнере, где выполняется Github Actions
Атомики быстрее:
- линукс на Android-смартфоне через Termux (практически на голом железе). Там arm64.
Это просто все машины, до которых я смог сегодня дотянуться.
Исходники тестов - ваши, без изменений: https://github.com/harnyk/go-atomic-vs-mutex/tree/master/kamenev .Пакет testing, как и договаривались, не использую, гоняю скомпилированные бинарники.
inetstar Автор
06.08.2024 21:36В моих случаях на голом железе атомики быстрее на:
1) AMD EPYC 7542 32-Core Processor
2) AMD Opteron 2435
У меня рабочая гипотеза, что на архитектуре Zen3 атомарные процедуры сложения работают как-то неэффективно.Или, наоборот, на Zen3 аппаратные инструкции связанные с мьютексами как-то подтянули.
Понял. В го мьютексы пытаются сделать захват через атомарную операцию и если не удалось используют системный мьютекс. Поэтому в каких-то случаях (удачных) соревнуются 64х битные атомарные операции и 32х битные (которые в го используются для первой попытки захвата мьютекса).
А в неудачных случаях, когда мьютексы захватываются непопеременно, то атомарные операции обгоняют. Из-за подключения системного мьютекса из ОС.
inetstar Автор
06.08.2024 21:36Я сделал 4 плюсующих потока и 4 минусующих.
Atomic Time 4.431332195s
Sum 0Mutex Time 47.299236939s
Sum 0Достаточно было увеличить число потоков и атомарные операции показали себя во всей красе.
Видимо, у вас выигрыш у мьютексов получался при 2 потоках из-за того, что планировщик Go удачно давал время каждой горутине так, что они не пересекались и не вызывались системные мьютексы.nickmanecannotbeempty
06.08.2024 21:36+1У меня на лэптопе Ryzen 9 (Linux 6.8.0-39-generic) на вашем 4-поточном тесте результаты следующие:
❯ ./t_atom ; ./t_mut Time 4.179402911s Sum 0 Time 7.681173311s Sum 0
Ну, то есть, мьютексов уже проигрывают, но не прямо в 10 раз
TovarischSuhov
06.08.2024 21:36+2Вы не поверите, но Го не использует системные мьютексы, а использует реализацию на атомиках, которая не привязанна к OC. Вот здесь можно подробнее посмотреть https://github.com/golang/go/blob/master/src/sync/mutex.go
inetstar Автор
06.08.2024 21:36Вы почти правы. Ценное замечание.
Вначале Go предпринимает попытку по-быстрому захватить свой собственный мьютекс через атомарную операцию, а если не получается, то использует системный вызов runtime_SemacquireMutex, а при разблокировке runtime_Semrelease.
Кстати, работа с семафором в системном мьютексе (поле sema), делается на небезопасных указателях, для которых динамически выделяется память. А для получения нового куска памяти используются также системные вызовы.
И ещё, интересный момент, когда я написал свой мьютекс на аналогичных операциях он оказался значительно медленнее более сложного Го-мьютекса. Это трудно поддаётся объяснению. Кроме как тем, что всё-таки используется системный семафор, который хорошо интегрирован с планировщиком задач ОС.
Мой мьютекс можно попробовать улучшить ещё одним способом. Я использовал 64-х битную операцию CompareAndSwap, а в Го-мьютексе 32-х битная.
OlegMax
06.08.2024 21:36Статья, полная пионерского задора:
"Разработчики Go ничего не понимают в многопоточности, щас я покажу, как надо мьютексы писать. Зачем системные вызовы, если можно просто жарить процессор в циклах?"
"Стоит рассмотреть атомарные операции для ускорения алгоритмов". В C/C++ уже давно сложился консенсус, что не стоит самому строить что-то сложное поверх атомарных операций. Но можно использовать lock-free библиотеки, очень тщательно убедившись в их надежности.
inetstar Автор
06.08.2024 21:36+11-я цитата фальшивая. Я такого не писал.
Вот реальная цитата:
Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!
Кстати, смешной момент в том, что мьютексы в Го написаны примерно также, через CompareAndSwap. С подключением системного вызова, чтобы в цикле не жарить проц.
Большинство людей даже и не знает об атомарных операциях и что они поддерживаются на уровне процессора.
Я их реально использую в коде. Они упрощают и убыстряют многие моменты параллельной работы.
olivera507224
Забавно
inetstar Автор
Поздравляем! Вы выиграли в лотерею!
Mnwamnowich
Для победы в лотерею, нужен всего лишь простой, советский
GOMAXPROCS=1