Привет!
Вэтой статье я хотел бы расcказать, как можно было бы сделать свой RWMutex, но с возможностью по таймауту или по срабатыванию контекста пропустить блокировку. То есть реализовать TryLock(context.Context) и RTryLock(context.Context), но уже для своего Mutex.
На картинке изображено, как нужно наливать воду в очень узкое горлышко.
Для начала следует уточнить, что для 99% задач такие методы вообще не нужны. Нужны они становятся тогда, когда блокируемый ресурс может очень долго не отпускаться. Хочу отметить, что если блокируемый ресурс долгое время остаётся занятым, стоит в начале попробовать оптимизировать логику таким образом, что бы минимизировать время блокировки.
Подробнее об этом написано в посте Танцы с мьютексами в Go в примере 2.
Но если всё же нам приходится иметь долгое удержание одним потоком ресурсов, то как мне кажется без TryLock будет сложно обойтись.
Когда мы работаем так, что у нас нет параллельных потоков, мы будем просто менять переменную методами из пакета atomic, записывая факт блокировки или разблокировки мьютекса. Но вот если наш запрос пришел в заблокированнй мьютекс, то мы пойдём по длинному сценарию. В этом случае нам нужно будет разблокировать ожидающие потоки по завершении блокировки, и для этого, можно воспользоваться каналом. А точнее свойством канала, что если мы его закрываем, то все читающие из него потоки разблокируются.
Создадим Mutex:
// RWTMutex - Read Write and Try Mutex
type RWTMutex struct {
state int32
mx sync.Mutex
ch chan struct{}
}
state — состояние mutex, мы будем работать с ним через atomic.AddInt32, atomic.LoadInt32 и atomic.CompareAndSwapInt32
ch — канал, который будет разблокировать потоки.
mx — мьютекс, который используется для того, что бы разблокировка потоков и создания каналов не происходили параллельно.
А теперь можно перейти к реализации:
// TryLock - try locks mutex with context
func (m *RWTMutex) TryLock(ctx context.Context) bool {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
// Slow way
return m.lockST(ctx)
}
// RTryLock - try read locks mutex with context
func (m *RWTMutex) RTryLock(ctx context.Context) bool {
k := atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
// Slow way
return m.rlockST(ctx)
}
Как можно увидеть, если Мьютекс не залочен, то его можно просто заблокировать, но вот если нет, то мы перейдём к более сложной схеме.
В начале получим канал, и переходим в бесконечный цикл, если получилось залочить, выходим с успехом, а если нет, то мы начинаем ждать одного из 2х событий, или что канал разблокируется, или что разблокирует поток ctx.Done():
func (m *RWTMutex) chGet() chan struct{} {
m.mx.Lock()
if m.ch == nil {
m.ch = make(chan struct{}, 1)
}
r := m.ch
m.mx.Unlock()
return r
}
func (m *RWTMutex) lockST(ctx context.Context) bool {
ch := m.chGet()
for {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
func (m *RWTMutex) rlockST(ctx context.Context) bool {
ch := m.chGet()
var k int32
for {
k = atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
Давайте раблокируем мьютекс.
Нам потребуется поменять состояние и при необходимости разблокировать канал.
Как я уже писал выше, если канал закрыть, то case <-ch пропустит поток выполнения дальше.
func (m *RWTMutex) chClose() {
if m.ch == nil {
return
}
var o chan struct{}
m.mx.Lock()
if m.ch != nil {
o = m.ch
m.ch = nil
}
m.mx.Unlock()
if o != nil {
close(o)
}
}
// Unlock - unlocks mutex
func (m *RWTMutex) Unlock() {
if atomic.CompareAndSwapInt32(&m.state, -1, 0) {
m.chClose()
return
}
panic("RWTMutex: Unlock fail")
}
// RUnlock - unlocks mutex
func (m *RWTMutex) RUnlock() {
i := atomic.AddInt32(&m.state, -1)
if i > 0 {
return
} else if i == 0 {
m.chClose()
return
}
panic("RWTMutex: RUnlock fail")
}
Собственно мьютекс готов, нужно к нему написать пару тестов и стандартных методов типа Lock() и RLock()
Бенчмарки на моей машине показали вот такие скорости
Описанные выше методы
BenchmarkRWTMutexTryLockUnlock-8 92154297 12.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWTMutexTryRLockRUnlock-8 64337136 18.4 ns/op 0 B/op 0 allocs/op
Стандартный RWMutex
BenchmarkRWMutexLockUnlock-8 44187962 25.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexRLockRUnlock-8 94655520 12.6 ns/op 0 B/op 0 allocs/op
Стандартный Mutex
BenchmarkMutexLockUnlock-8 94345815 12.7 ns/op 0 B/op 0 allocs/op
То есть скорость работы сопоставима с обычным RWMutex и Mutex.
Alexei_987
Возможно я что-то не так понял (У меня мало опыта программирования на Go), но у вас очень странные бенчмарки т.к. вы тестируете только быстрый сценарий (Фактически у вас только одна горутина пытается захватить мьютекс и поэтому всю магию каналов вы фактически не проверяете). Ну и кроме того внутри канала лежит самый обычный мьютекс просто он спрятан от вас… Думаю более правильным и эффективным будет использование Cond (https://golang.org/pkg/sync/#Cond) вместо канала. А вообще странно конечно что такие простые вещи не реализованы в стандартной библиотеке и приходится писать свои велосипеды
super_botan Автор
Спасибо, что прочитали и вникли в тему.
По поводу cond, я как то упустил из виду. Бегло прочитав я согласен, что вероятно он подходит больше. Я думаю я перепишу с его использованием или использованием того, что под ним лежит. Но пока по изучаю.
Да бенчи проверяют только быстрый путь. Дополню вместе с переходом на cond.
И видимо нужно добавить еще пару веселых фишек типа поднятие уровня блокировки RLock -> Lock и понижение, и некую приоритезацию для блокировки. Они не дороги в разработке и по идее по ресурсам тоже.
super_botan Автор
Добавил бенчмарки:
BenchmarkDT_RWTMutexTryLockUnlock-8 1911730 625 ns/op 192 B/op 1 allocs/op
BenchmarkDT_RWMutexLockUnlock-8 2423566 494 ns/op 0 B/op 0 allocs/op
BenchmarkDT_MutexLockUnlock-8 2577855 466 ns/op 0 B/op 0 allocs/op
Потребляется память за счёт того, что пересоздаётся канал.
Оценил Cond. Реализацию с ним плохо представляю без создания доп потока (а это 4 кб) так, что бы корректно реагировать на ctx.Done().
Если я не прав прошу меня попправить.
Alexei_987
Бенчмарки кривые…
Я предполагал что будет что-то вида:
Плюс к этому:
Основная проблема каналов это то что для обычного мьютекса (не RWMutext) если он освобождается то достаточно разбудить ровно один другой поток и дать ему залочить мьютекс. В случае же канала если у вас один мьютекс ожидает например 100 потоков то вы разбудите их все, а потом одному из них удасться захватить мьютекс. Это ужасно неэффективно и вот собственно этот момент надо сравнить в вашей реализации и в стандартной либе…
Реализацию через Cond не вижу проблем если честно… ровно в том же месте где у вас закрывается канал необходимо выставлять событие "мьютекс освободился" и все кому это интересно могут проснуться и попытаться его захватить
super_botan Автор
Бенчмарки 2х видов
Для незаблокированного мьютекса, тут просто заблокировать и разблокировать когда нет параллельного потока, то есть случай, когда по коду пойдёт быстрый случай:
Для заблокированного мьютекса, тут требуется организовать 2 потока и сделать примерно следующее: блокируем мьютекс, затем создаём рутину (~ 250 нс на моей машине) в которой мьютекс разблокируем, и снова блокируем мьютекс (тут мы начинаем ожидать той самой разблокировки в параллельном потоке) и потом разблокируем мьютекс для следующей итерации.
Получается:
По поводу sync.Cond. Я потоки лочу через select { case } это позволяет мне их разблокировать несколькими независимыми событиями. Если я буду блочить через Cond.Wait() то мне придётся разлочивать Cond на основании <-ctx.Done(), а этого я без дополнительного потока не смогу сделать.
Как-то так.
Alexei_987
1-й бенчмарк не имеет смысла впринципе т.к. проверяется по сути скорость работы атомик операций на вашем процессоре. единственное его предназначение это может быть поиск каких то простейших ошибок. В случае если горутин множество 2 горутины слишком мало для того чтобы собственно косяки Вашей реализации стали очевидны. Сравните именно случай где за один мьютекс конкурируют 100 потоков и сравните разницу для вашей реализации и для стандартного мьютекса
super_botan Автор
Про 100 потоков — организовать сложно, но вот с 2мя тест есть, как раз его написал по исходному замечанию. И он показывает, что обычные мьютексы быстрее на четверть, чем мой. + мой жрёт доп память на каждую итерацию примерно 200 байт.
Alexei_987
Не вижу сложностей если честно… что мешает создать 100-1000 горутин? вопрос только в том что каждая отдельная горутина должна работать достаточно долго для того чтобы можно было пренебречь расходами на ее создание. т.е. если у вас ~ 250 нс уходит на создание горутины значит она должна работать хотя бы несколько секунд и сделать много итераций. Думаю лучший вариант бенчмарка будет сделать простой счетчик и инкрементить его из множества горутин. это позволит симулировать плюс минус реальный сценарий и убедится что все работает корректно. Моя основная идея в том что реализация на каналах будет подвержена проблеме "Проснувшегося стада" (https://en.wikipedia.org/wiki/Thundering_herd_problem) и надо проверить работу этого сценария и подумать можно ли улучшить что-то в реализации для того чтобы его избежать
Alexei_987
А вот это уже печально, я надеялся что Го позволяет делать select на канал либо Cond… если это не возможно то это очень печально (и еще раз подтверждает мои мысли что Go это недоделанный язык)
super_botan Автор
Да увы, я как почитал конд тоже думал получится приспособить, но нет путей. Жалко.