Привет!


Вэтой статье я хотел бы расcказать, как можно было бы сделать свой RWMutex, но с возможностью по таймауту или по срабатыванию контекста пропустить блокировку. То есть реализовать TryLock(context.Context) и RTryLock(context.Context), но уже для своего Mutex.


image


На картинке изображено, как нужно наливать воду в очень узкое горлышко.


Для начала следует уточнить, что для 99% задач такие методы вообще не нужны. Нужны они становятся тогда, когда блокируемый ресурс может очень долго не отпускаться. Хочу отметить, что если блокируемый ресурс долгое время остаётся занятым, стоит в начале попробовать оптимизировать логику таким образом, что бы минимизировать время блокировки.


Подробнее об этом написано в посте Танцы с мьютексами в Go в примере 2.


Но если всё же нам приходится иметь долгое удержание одним потоком ресурсов, то как мне кажется без TryLock будет сложно обойтись.


Когда мы работаем так, что у нас нет параллельных потоков, мы будем просто менять переменную методами из пакета atomic, записывая факт блокировки или разблокировки мьютекса. Но вот если наш запрос пришел в заблокированнй мьютекс, то мы пойдём по длинному сценарию. В этом случае нам нужно будет разблокировать ожидающие потоки по завершении блокировки, и для этого, можно воспользоваться каналом. А точнее свойством канала, что если мы его закрываем, то все читающие из него потоки разблокируются.


Создадим Mutex:


// RWTMutex - Read Write and Try Mutex
type RWTMutex struct {
    state int32
    mx    sync.Mutex
    ch    chan struct{}
}

state — состояние mutex, мы будем работать с ним через atomic.AddInt32, atomic.LoadInt32 и atomic.CompareAndSwapInt32


ch — канал, который будет разблокировать потоки.


mx — мьютекс, который используется для того, что бы разблокировка потоков и создания каналов не происходили параллельно.


А теперь можно перейти к реализации:


// TryLock - try locks mutex with context
func (m *RWTMutex) TryLock(ctx context.Context) bool {
    if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
        return true
    }

    // Slow way
    return m.lockST(ctx)
}
// RTryLock - try read locks mutex with context
func (m *RWTMutex) RTryLock(ctx context.Context) bool {
    k := atomic.LoadInt32(&m.state)
    if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
        return true
    }

    // Slow way
    return m.rlockST(ctx)
}

Как можно увидеть, если Мьютекс не залочен, то его можно просто заблокировать, но вот если нет, то мы перейдём к более сложной схеме.


В начале получим канал, и переходим в бесконечный цикл, если получилось залочить, выходим с успехом, а если нет, то мы начинаем ждать одного из 2х событий, или что канал разблокируется, или что разблокирует поток ctx.Done():


func (m *RWTMutex) chGet() chan struct{} {
    m.mx.Lock()
    if m.ch == nil {
        m.ch = make(chan struct{}, 1)
    }
    r := m.ch
    m.mx.Unlock()
    return r
}

func (m *RWTMutex) lockST(ctx context.Context) bool {
    ch := m.chGet()
    for {
        if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
            return true
        }
        if ctx == nil {
            return false
        }
        select {
        case <-ch:
            ch = m.chGet()
        case <-ctx.Done():
            return false
        }
    }
}

func (m *RWTMutex) rlockST(ctx context.Context) bool {
    ch := m.chGet()
    var k int32
    for {
        k = atomic.LoadInt32(&m.state)
        if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
            return true
        }
        if ctx == nil {
            return false
        }
        select {
        case <-ch:
            ch = m.chGet()
        case <-ctx.Done():
            return false
        }
    }
}

Давайте раблокируем мьютекс.


Нам потребуется поменять состояние и при необходимости разблокировать канал.


Как я уже писал выше, если канал закрыть, то case <-ch пропустит поток выполнения дальше.


func (m *RWTMutex) chClose() {
    if m.ch == nil {
        return
    }

    var o chan struct{}
    m.mx.Lock()
    if m.ch != nil {
        o = m.ch
        m.ch = nil
    }
    m.mx.Unlock()
    if o != nil {
        close(o)
    }
}
// Unlock - unlocks mutex
func (m *RWTMutex) Unlock() {
    if atomic.CompareAndSwapInt32(&m.state, -1, 0) {
        m.chClose()
        return
    }

    panic("RWTMutex: Unlock fail")
}
// RUnlock - unlocks mutex
func (m *RWTMutex) RUnlock() {
    i := atomic.AddInt32(&m.state, -1)
    if i > 0 {
        return
    } else if i == 0 {
        m.chClose()
        return
    }

    panic("RWTMutex: RUnlock fail")
}

Собственно мьютекс готов, нужно к нему написать пару тестов и стандартных методов типа Lock() и RLock()


Бенчмарки на моей машине показали вот такие скорости


Описанные выше методы
BenchmarkRWTMutexTryLockUnlock-8        92154297                12.8 ns/op             0 B/op          0 allocs/op
BenchmarkRWTMutexTryRLockRUnlock-8      64337136                18.4 ns/op             0 B/op          0 allocs/op

Стандартный RWMutex
BenchmarkRWMutexLockUnlock-8            44187962                25.8 ns/op             0 B/op          0 allocs/op
BenchmarkRWMutexRLockRUnlock-8          94655520                12.6 ns/op             0 B/op          0 allocs/op

Стандартный Mutex
BenchmarkMutexLockUnlock-8              94345815                12.7 ns/op             0 B/op          0 allocs/op

То есть скорость работы сопоставима с обычным RWMutex и Mutex.


Код на github