Привет всем гоферам! В данной статье я хочу разобрать как можно воспользоваться модулем sync/atomic для типа float64.

Постановка задачи

У нас есть канал из которого мы можем читать результаты выполнения задач. Результат задачи содержит флаг показывающий была ли ошибка при выполнении задачи и результат выполнения этой задачи (тип float64). Нужно найти сумму всех результатов и количество ошибок.

Реализация с использованием sync.Mutex

Задачу можно решить с использованием sync.Mutex:

package main

import (
	"fmt"
	"math"
	"sync"
	"sync/atomic"
)

const countWorkers = 1000
const countTasks = 10000000

type Result struct {
	value    float64
	hasError bool
}

func MakeTasks(count int) <-chan Result {
	ch := make(chan Result)
	go func() {
		for i := 0; i < count; i++ {
			ch <- Result{
				value:    float64(i) * 2.42,
				hasError: (i % 10) == 0,
			}

		}
		close(ch)
	}()
	return ch
}

func ProcessUsingMutex(ch <-chan Result, countWorkers int) (float64, int64) {
	var wg sync.WaitGroup
	var errMu sync.Mutex
	var mu sync.Mutex
	var countErrors int64
	var result float64

	for i := 0; i < countWorkers; i++ {
		wg.Add(1)
		go func() {
			for item := range ch {
				if item.hasError {
					errMu.Lock()
					countErrors++
					errMu.Unlock()
				} else {
					mu.Lock()
					result += item.value
					mu.Unlock()
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	return result, countErrors
}


func main() {
	ch := MakeTasks(countTasks)
	fmt.Println(ProcessUsingMutex(ch, countWorkers))
}

Реализация с использованием sync/atomic

Но использование sync.Mutex может замедлять нашу программу. Поэтому можно переписать решение с использованием sync/atomic. Для количества ошибок это сделать довольно просто. Вместо использования sync.Mutex используем atomic.AddInt64, то есть:

errMu.Lock()
countErrors++
errMu.Unlock()

заменяем на

atomic.AddInt64(&countErrors, 1)

Но для float64 нет функции AddFloat64. Но её можно реализовать. Для того, что бы лучше понять как это можно сделать напишем функцию AddInt64 с использованием CompareAndSwapInt64

func AddInt64(addr *int64, delta int64) (new int64) {
	for {
		if v := *addr; atomic.CompareAndSwapInt64(addr, v, v+delta) {
			return v
		}
	}
}

Но для float64 нет и CompareAndSwap, но мы можем сконвертировать значение float64 в uint64 используя math.Float64bits и оперировать уже uint64.

Реализация AddFloat64 может быть такой:

func AddFloat64(addr *uint64, delta float64) uint64 {
	for {
		cur := atomic.LoadUint64(addr)
		curVal := math.Float64frombits(cur)
		nxtVal := curVal + delta
		nxt := math.Float64bits(nxtVal)
		if atomic.CompareAndSwapUint64(addr, cur, nxt) {
			return nxt
		}
	}
}

После того, как мы реализовали AddFloat64 можно полностью переписать нашу функцию, без использования мютексов:

func ProcessUsingAtomic(ch <-chan Result, countWorkers int) (float64, int64) {
	var wg sync.WaitGroup
	var countErrors int64
	var total uint64
	for i := 0; i < countWorkers; i++ {
		wg.Add(1)
		go func() {
			for item := range ch {
				if item.hasError {
					atomic.AddInt64(&countErrors, 1)
				} else {
					AddFloat64(&total, item.value)
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	return math.Float64frombits(atomic.LoadUint64(&total)), atomic.LoadInt64(&countErrors)
}

Для сравнения mutex и atomic я написал небольшой бенчмарк

package main

import (
	"testing"
)

func BenchmarkProcessUsingAtomic(b *testing.B) {

	for i := 0; i < b.N; i++ {
		b.StopTimer()
		ch := MakeTasks(countTasks)
		b.StartTimer()
		ProcessUsingAtomic(ch, countWorkers)
	}
}

func BenchmarkProcessUsingMutex(b *testing.B) {
	for i := 0; i < b.N; i++ {
		b.StopTimer()
		ch := MakeTasks(countTasks)
		b.StartTimer()
		ProcessUsingMutex(ch, countWorkers)
	}
}

Запуск данного benchmark показывает что atomic на 5-7% быстрее.

BenchmarkProcessUsingAtomic-8 1 2468118459 ns/op
BenchmarkProcessUsingMutex-8 1 2640532917 ns/op
PASS

Заключение

Модуль sync/atomic может быть полезен, если его использовать вместо мютексов так как атомики работают немного быстрее мютексов. Если хочется использовать атомики для типов, которые не поддерживаются в sync/atomic можно попробовать использовать стандартные функции для реализации нужного функционала, как я описал в данной статье или воспользоваться структурой Value из модуля sync/atomic.

Комментарии (9)


  1. JekaMas
    17.07.2022 03:12
    +1

    Подождите в пределах месяца и заиспользуйте atomic.Pointer[T]https://pkg.go.dev/sync/atomic@master#Pointer , вроде должен упростить вашу задачу.


    1. AndreyBerenda Автор
      18.07.2022 09:02

      Спасибо большое, обязательно гляну


  1. wilcot
    17.07.2022 03:59

    Ради интереса попробовал поменять местами запуски бенчмарка и получил противоположный результат:

    BenchmarkProcessUsingMutex-8     1    3627994802 ns/op
    BenchmarkProcessUsingAtomic-8    1    3707776703 ns/op
    

    Результат стабильно воспроизводится (первый бенчмарк всегда работает быстрее), причем даже пытался увеличить в 10 раз количество задач. Похоже что компьютер у меня устает, но все же результат странный.


    1. Ava256
      17.07.2022 15:21

      А если по отдельности запускать, в разных процессах, какой будет результат?


  1. drblez
    17.07.2022 06:32
    +8

    Тут синхронизация не нужна вовсе. Это же мап-редьюс. Много источников собираем в один канал и результаты этого канала суммируем. Если, вдруг, по какой-то причине надо суммировать параллельно, то надо делать суммы во многих рутинах и потом отправлять в одну для итога.


    1. AndreyBerenda Автор
      18.07.2022 09:02

      Используя каналы можно решить данную задачу и будет наверное проще это сделать.
      Я сильно не погружался еще в каналы(я на golang не так давно пишу), но если посмотреть runtime/chan.go, то можно увидеть, что в каналах используется mutex(там не sync.Mutex, но все равно используется mutex). Поэтому, если выбирать каналы, то получается выбирать mutex. Но mutex под капотом использует атомики, поэтому если задачу можно решить с использованием atomic, то скорей всего это будет более оптимальное решение по времени, чем с использованием mutex(но все нужно мерить в каждой конкретной задаче, может быть прирост будет минимальный или его вообще не будет).
      В этой статье я хотел показать как пользоваться атомиками для float64 и ничего больше, я не хотел показать этой статьей, что решение моей задачи оптимально именно с использованием атомиков(в каждом конкретном случае нужно мерить и находить компромис между поддерживаемым и читаемым кодом и скоростью его выполнения).
      Решение с атомиками, получилось не сильно сложное, как мне показалось, поэтому его можно рассмотреть при выборе решения задачи.


      1. wilcot
        18.07.2022 10:10

        Вы в канал будете писать итоговую сумму для каждого worker, это выйдет намного быстрее, так как вместо countTasks вызовов atomic.*, у вас будет countWorkers вызовов Mutex.Lock/Unlock (то есть намного меньше).


        1. AndreyBerenda Автор
          18.07.2022 10:39

          Если собирать в каждом воркере свою сумму и при завершении воркера суммировать все результаты, то мы не знаем суммарное количество до завершения, а часто хочется иметь прогресс(сейчас 50 ошибок и тотал 50123,12), если не нужно знать суммарное количество до завершения, то суммировать в воркере и потом получить тотал с использованием каналов, mutex или атомиков можно (в данном случае ты прав, наверное самое оптимальное решение будет с использованием канала).
          Наверное можно собирать периодически тотал со всех воркеров(допустим раз в 10 секунд), но это уже немного другая задача


  1. illiafox
    18.07.2022 18:10


    if atomic.CompareAndSwapUint64(addr, cur, nxt) {
    return nxt
    }

    Реализация с циклом for похожа на принцип работы мьютекса, поэтому производительность будет одинаковая, а в некоторых моментах даже хуже (т.к. циклы в lockSlow() и unlockSlow() написаны на более низком уровне)

    Имхо, в вашем примере прирост скорости вышел только из-за работы шедулера горутин, поэтому на других устройствах результаты будут совершенно другими.

    Вывод: ждем 1.19