Привет всем гоферам! В данной статье я хочу разобрать как можно воспользоваться модулем sync/atomic
для типа float64.
Постановка задачи
У нас есть канал из которого мы можем читать результаты выполнения задач. Результат задачи содержит флаг показывающий была ли ошибка при выполнении задачи и результат выполнения этой задачи (тип float64). Нужно найти сумму всех результатов и количество ошибок.
Реализация с использованием sync.Mutex
Задачу можно решить с использованием sync.Mutex
:
package main
import (
"fmt"
"math"
"sync"
"sync/atomic"
)
const countWorkers = 1000
const countTasks = 10000000
type Result struct {
value float64
hasError bool
}
func MakeTasks(count int) <-chan Result {
ch := make(chan Result)
go func() {
for i := 0; i < count; i++ {
ch <- Result{
value: float64(i) * 2.42,
hasError: (i % 10) == 0,
}
}
close(ch)
}()
return ch
}
func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var errMu sync.Mutex
var mu sync.Mutex
var countErrors int64
var result float64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
errMu.Lock()
countErrors++
errMu.Unlock()
} else {
mu.Lock()
result += item.value
mu.Unlock()
}
}
wg.Done()
}()
}
wg.Wait()
return result, countErrors
}
func main() {
ch := MakeTasks(countTasks)
fmt.Println(ProcessUsingMutex(ch, countWorkers))
}
Реализация с использованием sync/atomic
Но использование sync.Mutex
может замедлять нашу программу. Поэтому можно переписать решение с использованием sync/atomic
. Для количества ошибок это сделать довольно просто. Вместо использования sync.Mutex
используем atomic.AddInt64
, то есть:
errMu.Lock()
countErrors++
errMu.Unlock()
заменяем на
atomic.AddInt64(&countErrors, 1)
Но для float64 нет функции AddFloat64
. Но её можно реализовать. Для того, что бы лучше понять как это можно сделать напишем функцию AddInt64
с использованием CompareAndSwapInt64
func AddInt64(addr *int64, delta int64) (new int64) {
for {
if v := *addr; atomic.CompareAndSwapInt64(addr, v, v+delta) {
return v
}
}
}
Но для float64
нет и CompareAndSwap
, но мы можем сконвертировать значение float64
в uint64
используя math.Float64bits
и оперировать уже uint64
.
Реализация AddFloat64
может быть такой:
func AddFloat64(addr *uint64, delta float64) uint64 {
for {
cur := atomic.LoadUint64(addr)
curVal := math.Float64frombits(cur)
nxtVal := curVal + delta
nxt := math.Float64bits(nxtVal)
if atomic.CompareAndSwapUint64(addr, cur, nxt) {
return nxt
}
}
}
После того, как мы реализовали AddFloat64
можно полностью переписать нашу функцию, без использования мютексов:
func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) {
var wg sync.WaitGroup
var countErrors int64
var total uint64
for i := 0; i < countWorkers; i++ {
wg.Add(1)
go func() {
for item := range ch {
if item.hasError {
atomic.AddInt64(&countErrors, 1)
} else {
AddFloat64(&total, item.value)
}
}
wg.Done()
}()
}
wg.Wait()
return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors)
}
Для сравнения mutex и atomic я написал небольшой бенчмарк
package main
import (
"testing"
)
func BenchmarkProcessUsingAtomic(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingAtomic(ch, countWorkers)
}
}
func BenchmarkProcessUsingMutex(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
ch := MakeTasks(countTasks)
b.StartTimer()
ProcessUsingMutex(ch, countWorkers)
}
}
Запуск данного benchmark показывает что atomic на 5-7% быстрее.
BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS
Заключение
Модуль sync/atomic
может быть полезен, если его использовать вместо мютексов так как атомики работают немного быстрее мютексов. Если хочется использовать атомики для типов, которые не поддерживаются в sync/atomic
можно попробовать использовать стандартные функции для реализации нужного функционала, как я описал в данной статье или воспользоваться структурой Value
из модуля sync/atomic
.
Комментарии (9)
wilcot
17.07.2022 03:59Ради интереса попробовал поменять местами запуски бенчмарка и получил противоположный результат:
BenchmarkProcessUsingMutex-8 1 3627994802 ns/op BenchmarkProcessUsingAtomic-8 1 3707776703 ns/op
Результат стабильно воспроизводится (первый бенчмарк всегда работает быстрее), причем даже пытался увеличить в 10 раз количество задач. Похоже что компьютер у меня устает, но все же результат странный.
drblez
17.07.2022 06:32+8Тут синхронизация не нужна вовсе. Это же мап-редьюс. Много источников собираем в один канал и результаты этого канала суммируем. Если, вдруг, по какой-то причине надо суммировать параллельно, то надо делать суммы во многих рутинах и потом отправлять в одну для итога.
AndreyBerenda Автор
18.07.2022 09:02Используя каналы можно решить данную задачу и будет наверное проще это сделать.
Я сильно не погружался еще в каналы(я на golang не так давно пишу), но если посмотретьruntime/chan.go
, то можно увидеть, что в каналах используется mutex(там не sync.Mutex, но все равно используется mutex). Поэтому, если выбирать каналы, то получается выбирать mutex. Но mutex под капотом использует атомики, поэтому если задачу можно решить с использованием atomic, то скорей всего это будет более оптимальное решение по времени, чем с использованием mutex(но все нужно мерить в каждой конкретной задаче, может быть прирост будет минимальный или его вообще не будет).
В этой статье я хотел показать как пользоваться атомиками для float64 и ничего больше, я не хотел показать этой статьей, что решение моей задачи оптимально именно с использованием атомиков(в каждом конкретном случае нужно мерить и находить компромис между поддерживаемым и читаемым кодом и скоростью его выполнения).
Решение с атомиками, получилось не сильно сложное, как мне показалось, поэтому его можно рассмотреть при выборе решения задачи.wilcot
18.07.2022 10:10Вы в канал будете писать итоговую сумму для каждого worker, это выйдет намного быстрее, так как вместо countTasks вызовов atomic.*, у вас будет countWorkers вызовов Mutex.Lock/Unlock (то есть намного меньше).
AndreyBerenda Автор
18.07.2022 10:39Если собирать в каждом воркере свою сумму и при завершении воркера суммировать все результаты, то мы не знаем суммарное количество до завершения, а часто хочется иметь прогресс(сейчас 50 ошибок и тотал 50123,12), если не нужно знать суммарное количество до завершения, то суммировать в воркере и потом получить тотал с использованием каналов, mutex или атомиков можно (в данном случае ты прав, наверное самое оптимальное решение будет с использованием канала).
Наверное можно собирать периодически тотал со всех воркеров(допустим раз в 10 секунд), но это уже немного другая задача
illiafox
18.07.2022 18:10
if atomic.CompareAndSwapUint64(addr, cur, nxt) {
return nxt
}Реализация с циклом
for
похожа на принцип работы мьютекса, поэтому производительность будет одинаковая, а в некоторых моментах даже хуже (т.к. циклы в lockSlow() и unlockSlow() написаны на более низком уровне)Имхо, в вашем примере прирост скорости вышел только из-за работы шедулера горутин, поэтому на других устройствах результаты будут совершенно другими.
Вывод: ждем 1.19
JekaMas
Подождите в пределах месяца и заиспользуйте
atomic.Pointer[T]
https://pkg.go.dev/sync/atomic@master#Pointer , вроде должен упростить вашу задачу.AndreyBerenda Автор
Спасибо большое, обязательно гляну