В этой статье мы рассмотрим, как и почему изменилась реализация примитивов синхронизации из стандартной библиотеки Java и пакета java.util.concurrent для Kotlin Coroutines и для языка Kotlin в целом. Сразу хочу предупредить: рассматриваемые в статье библиотеки и классы будут оцениваться не с точки зрения поддержки legacy-функциональности и возможности использовать их в Java, а с точки зрения эффективности и возможности использовать их в корутинах и Kotlin Multiplatform. Поэтому эта статья будет больше полезна тем, кто собирается писать новые проекты на языке Kotlin.

В рамках данной статьи будут рассмотрены:

  • критические секции;

  • атомарные переменные;

  • реактивные переменные;

  • барьерная синхронизация.

Критические секции

Здесь мы не будем очень подробно останавливаться на стандартных реализациях критической секции в языке Java, таких как synchronized и ReentrantLock. Написано множество статей на эту тему, и я уверен, они всем известны. Рассмотрим, почему их использование для синхронизации корутин нежелательно, и приведем альтернативу.

synchronized / @Synchronized

Разработаем простой тест, где будем на разных потоках делать инкремент переменной value:

fun main() = runBlocking {
    var obj = Any()
    var value = 0

    Array(100_000) {
        async(Dispatchers.Default) {
            synchronized(obj) {
                ++value
            }
        }
    }.forEach { it.await() }

    assertEquals(100_000, value)
}

Данный код запускает 100 000 параллельно выполняющихся корутин (настолько параллельно, насколько это позволяет количество потоков в Dispatchers.Default). В данном примере синхронизация блоком synchronized будет работать корректно, и тест будет пройден. Модифицируем этот код, добавив suspend функцию delay внутрь блока synchronized:

...
synchronized(obj) {
    delay(1)
    ++value
}
...

Данный код даже не скомпилируется с ошибкой:

The 'delay' suspension point is inside a critical section

Все дело в том, что мы не можем вызывать никакие suspend функции внутри блока синхронизации, потому что текущий поток корутины должен уметь освобождаться для использования в других корутинах в момент начала активного ожидания. Стандартные реализации критической секции не рассчитаны на то, что в середине блока синхронизации поток может освободиться для других задач, а после ожидания suspend функции выполнение может быть продолжено на совершенно другом потоке.

ReentrantLock

В примере выше нас спас компилятор. Давайте посмотрим, что произойдет при использовании библиотечной реализации критической секции, например, ReentrantLock:

...
val lock = ReentrantLock()
...
lock.lock()
try {    
    delay(1)
    ++value
} finally {
    lock.unlock()
}
...

Такой код скомпилируется, но синхронизация не будет работать, а может, даже произойдет вылет с IllegalMonitorStateException. Что интересно, если в данном примере использовать lock.withLock { ... }, мы получим уже знакомую нам ошибку компиляции.

То же самое касается и полезнейшего класса ReentrantReadWriteLock, который позволяет экономить на операциях read after read. К сожалению, для синхронизации корутин он не будет работать по той же причине.

Mutex

В примерах с synchronized/ReentrantLock и suspend функциями нас мог кое-где спасти компилятор, а в худшем случае мы бы получили несинхронизированный код с вероятным вылетом в рантайме. К счастью для нас, разработчики корутин подумали о таком сценарии и специально для этого разработали интерфейс Mutex и его стандартную имплементацию. Модифицируем код при помощи Mutex:

...
val mutex = Mutex()
...
mutex.withLock {
    delay(1)
    ++value
}
...

Такой код отработает корректно: операции над переменной value синхронизированы, suspend функция delay не мешает синхронизации.

К сожалению, класс ReadWriteMutex (аналогичный ReentrantReadWriteLock) по состоянию на июль 2023 года еще не добавили, однако обсуждение давно ведется, и есть уже минимум одна реализация в PullRequest к GitHub Kotlin/kotlinx.coroutines.

Вы можете сказать, что для данного примера синхронизация единственной переменной value при помощи критической секции — не самое концептуально правильное решение. И будете правы, поэтому синхронизацию одиночных переменных рассмотрим в следующем разделе.

Атомарные переменные

Начнем этот раздел со спойлера: volatile (в котлине @Volatile) и классы пакета java.util.concurrent.atomic (например AtomicReference, AtomicInteger...) при правильном применении будут работать с корутинами не хуже, чем со стандартными Java потоками или какими-нибудь многопоточными фреймворками.

volatile / @Volatile

Для начала вспомним, для чего нам вообще нужен @Volatile. Разные потоки для ускорения работы могут кэшировать у себя значения глобальных переменных. Из-за чего в других потоках при обращении к этим переменным мы не всегда будем видеть их актуальное значение. Рассмотрим код, иллюстрирующий суть проблемы:

@Volatile var value1 = true
@Volatile var value2 = true

fun main() = runBlocking {
    repeat(1_000) {
        value1 = true
        value2 = true

        val job1 = async(Dispatchers.Default) {
            value2 = false
            while(value1);
        }

        val job2 = async(Dispatchers.Default) {
            value1 = false
            while(value2);
        }

        job1.await()
        job2.await()
    }

    println("Success!")
}

Это правильный код, который успешно выполнится, но, если убрать аннотацию @Volatile хотя бы с одной переменной, то тест бесконечно зависнет в одном из циклов while, потому что поток корутины использует кэшированное значение общей переменной.

Однако и @Volatile можно использовать неправильно. Рассмотрим другой пример:

@Volatile var value = true

fun main() = runBlocking {
    Array(10_000) {
        async(Dispatchers.Default) {
            value = !value
        }
    }.forEach { it.await() }

    assertTrue(value)
}

В данном тесте запускается 10 000 конкурирующих корутин, которые меняют значение value на противоположное. Так как количество корутин чётное, итоговое значение не должно измениться и должно остаться равным true. Однако в половине случаев тест падает, даже несмотря на @Volatile. Дело в том, что операция value=!value — неатомарная, то есть состоит из нескольких операций: чтение value, потом запись нового value. Если неатомарные операции часто вызываются в любой многопоточной среде, есть риск получить неправильное значение переменной.

java.util.concurrent.atomic

Большую гибкость по сравнению с @Volatile дают классы пакета java.util.concurrent.atomic, которые позволяют делать атомарными операции инкремента, декремента, compareAndSet (атомарно сравнить с предыдущим значением и записать), getAndSet (атомарное получение старого значения и запись нового) и ряд других. Рассмотрим как можно более эффективно и просто синхронизировать уже знакомый нам код из раздела про критические секции:

fun main() = runBlocking {
    var value = 0

    Array(100_000) {
        async(Dispatchers.Default) {
            ++value
        }
    }.forEach { it.await() }

    assertEquals(100_000, value)
}

В данном примере кода переменная value не синхронизирована, поэтому значение к концу теста value=100_000 не гарантировано. Так как инкремент по умолчанию — неатомарная операция (++value — это по сути value=value+1 чтение и запись), то и @Volatile тут не поможет. Однако, если модифицировать этот код с помощью класса AtomicInteger:

fun main() = runBlocking {
    var value = AtomicInteger(0)

    Array(100_000) {
        async(Dispatchers.Default) {
            value.incrementAndGet()
        }
    }.forEach { it.await() }

    assertEquals(100_000, value.get())}

Благодаря атомарности операции incrementAndGet в классе AtomicInteger синхронизация начинает работать правильно. Если задача не ограничивается настолько простыми операциями, и нужно синхронизировать целые участки кода, на помощь приходят уже рассмотренные нами критические секции.

kotlinx.atomicfu

Как я писал в начале раздела, если использовать volatile и классы пакета java.util.concurrent.atomic правильно, то они будет корректно работать и в корутинах. Тогда зачем же нужно что-то еще? — спросите вы. 

Дело в том, что этот тезис касается JVM и Android, однако Kotlin и корутины уже давно вышли за пределы этих двух платформ. Если вы уже разрабатываете на Kotlin Multiplaform или потенциально планируете переход, то без kotlinx.atomicfu вам не обойтись:

fun main() = runBlocking {
    var value = atomic(0)

    Array(100_000) {
        async(Dispatchers.Default) {
            value.incrementAndGet()
        }
    }.forEach { it.await() }

    assertEquals(100_000, value.value)
}

В любом случае, переходя на kotlinx.atomicfu для JVM и Android, вы ничего не теряете — все функции библиотеки помечены как actual и внутри используют тот же пакет java.util.concurrent.atomic.

Реактивные переменные

Атомарные переменные — это хорошо, но часто необходимо также подписываться на изменение переменной и выполнять в обработчике какой-то код. В этом разделе мы рассмотрим реализации реактивных переменных, то есть классов, которые позволяют:

  • через generic задать тип значения;

  • записывать значение синхронно;

  • получать значение синхронно;

  • хранить это значение как внутреннее состояние;

  • подписываться на изменения значения.

Реализации, не обладающие всеми этими свойствами, в данном разделе рассмотрены не будут.

LiveData

Первое решение, которое мы рассмотрим, — это LiveData. Уверен, многие Android-разработчики не раз сталкивались с этим классом. LiveData позволяет хранить значение, получать и менять его напрямую через геттер\сеттер, подписываться на изменение значения, привязывать подписку к LifecycleOwner. 

Очень полезный класс, долгое время спасавший Android-разработчиков, работает с Android Data Binding и с Jetpack Compose, но не без недостатков. Их мы сейчас и рассмотрим.

Первый недостаток очевиден из того, что я писал выше: класс LiveData — штука специфичная и доступна только для платформы Android, нигде больше использовать его не получится.

Из первого вытекает и второй недостаток: основные методы setValue \ observe должны вызываться только на Main потоке приложения (Dispatchers.Main), иначе будет вылет в рантайме.

val liveData = MutableLiveData(true)

withContext(Dispatchers.Default) {
    // IllegalStateException: Cannot invoke setValue on a background thread
    liveData.value = false
}

Конечно, если использовать LiveData по основному назначению, например, для связи ViewModel + Fragment\Activity, скорее всего, никаких проблем не возникнет. Однако мой опыт говорит о том, что порой разработчики пытаются использовать этот класс еще и на уровне интеракторов\репозиториев и вообще повсюду, где может понадобиться реактивность, что порождает вылеты и замедление Main потока лишними вычислениями. Есть, конечно, метод postValue, который можно вызывать на любом потоке, но не без нюансов:

val liveData = MutableLiveData(true)

withContext(Dispatchers.Default) {
    println(liveData.value) // Prints true
    liveData.postValue(false)
    println(liveData.value) // Also prints true
}

Значение false запланировано для записи в liveData на Main потоке, но сразу значение false мы не увидим.

Еще один недостаток: LiveData не дает такой гибкости как, например, RxJava с ее методами преобразования вроде map, filter, distinct, debounce и др. Ну это и понятно: LiveData — это не полноценный фреймворк, а класс, созданный для решения конкретных проблем.

Ну и последнее. Значение LiveData всегда может быть null. Даже если в Kotlin явно указать тип значения без вопроса, например, LiveData<Int>, это не мешает нам записать и считать null.

BehaviorSubject

Похожая на LiveData штука — BehaviorSubject из RxJava. Этот класс хранит значение, позволяет его менять, получать и подписываться на него. В то же время он дает возможность пользоваться всей функциональной мощью RxJava и может применяться для JVM, а не только для платформы Android. Еще один неоспоримый плюс: методы getValue и onNext (по сути setValue) синхронизированы и могут свободно использоваться в любой многопоточной среде, а во время подписки можно выбирать поток обработки с помощью метода observeOn.

Из явных минусов можно выделить то, что nullable значения при записи в BehaviorSubject нужно упаковывать в какой-то буферный класс (а потом распаковывать при получении), потому что null не является валидным значением для этого класса.

Так как в этой статье мы рассматриваем в качестве основного фреймворка для многопоточной разработки именно корутины и Coroutines Flow, было бы странно, особенно для новых проектов, добавлять в проект еще и библиотеку RxJava. Поэтому разработчики корутин разработали собственную реализацию реактивных переменных, которую мы рассмотрим в следующем пункте.

StateFlow

Интерфейс StateFlow и его реализация являются частью Coroutines Flow и решают все проблемы LiveData и BehaviorSubject. Класс хранит значение, позволяет его получать, менять, а также подписываться на его изменение, как и в уже рассмотренных выше классах. Однако есть и ряд преимуществ:

  • Значение value (запись и получение) синхронизировано и может использоваться в любой многопоточной среде.

  • Возможность записи\чтения null контролируется на уровне языка через generics. Если объявить StateFlow<Int?>, то value будет nullable, а если StateFlow<Int>, то нет.

  • StateFlow включает в себя все преимущества Coroutines Flow и возможность вызова suspend функций внутри преобразующих методов или прямо внутри collect.

  • При подписке на изменения есть возможность привязки к LifecycleOwner (через lifecycleScope) для Android.

  • Работает вместе с Android Data Binding, Jetpack Compose и даже Compose Multiplatform.

  • StateFlow работает не только для Android и JVM, но еще и для других платформ фреймворка Kotlin Multiplatform.

StateFlow — достаточно мощный и универсальный инструмент, который может использоваться для широкого спектра задач, таких как: связь ViewModel и View в архитектуре MVVM, хранение изменяющихся данных на уровне репозиториев с возможностью подписки на них, преобразование данных через функции Coroutines Flow и даже конструирование примитивов синхронизации, на чем мы подробно остановимся в следующем разделе.

Барьерная синхронизация

В данном разделе будет больше креатива — в корутинах пока еще нет стандартных реализаций барьерной синхронизации, поэтому нам придется самим написать этот код. Обычно под барьерной синхронизацией понимается следующее: несколько разных потоков ждут какого-то события, а когда оно происходит, потоки одновременно выходят из ожидания и продолжают свою работу. В случае с корутинами будет примерно то же самое, но ждать события будут не потоки, а, собственно, корутины. Значит, и метод ожидания будет suspend функцией. Выразим это через интерфейс:

interface Barrier {
    suspend fun await()

    @Throws(TimeoutCancellationException::class)
    suspend fun await(timeout: Duration)
}

Очень простой интерфейс всего с двумя функциями: одна ждет с таймаутом, вторая без. Далее в этом разделе мы будем реализовывать недостающие нам классы барьерной синхронизации из пакета java.util.concurrent при помощи этого интерфейса.

CountDownLatch

Класс CountDownLatch из пакета java.util.concurrent действует следующим образом: в одних потоках мы уменьшаем счетчик на 1, а в других потоках ожидаем, пока счетчик не станет равным 0. Напомним функциональность этого класса:

  • задаем счетчик один раз в конструкторе, значение счетчика больше или равно нулю;

  • уменьшение счетчика на единицу (метод countDown);

  • получение значения счетчика синхронно (метод getCount);

  • ожидание обнуления счетчика с таймаутом (метод await).

Достаточно простой и полезный класс, но есть проблема, которая не позволяет эффективно использовать этот класс в корутинах. Проблема — в методе ожидания обнуления счетчика (метод await). Он не является suspend функцией и блокирует поток, а таймаут вызывает InterruptedException. Из-за этого эффективность корутин падает: текущий поток корутины не освобождается для задач в других корутинах. Также страдает и отменяемость: при вызове метода cancel (на текущей корутине или на всем scope целиком) корутина, зависшая в блокирующем методе await, не сможет отмениться, по крайней мере не сразу.

StateFlow — универсальный и мощный класс, который позволяет нам написать собственную реализацию CountDownLatch меньше чем за 50 строчек кода.

class CountDownBarrier(count: UInt) : Barrier {
    private val stateFlow = MutableStateFlow(count)

    val counterValue: UInt
        @Synchronized get() = stateFlow.value

    @Synchronized
    fun countDown() {
        if (stateFlow.value > 0u) {
            --stateFlow.value
        }
    }

    override suspend fun await() {
        internalAwait()
    }

    @Throws(TimeoutCancellationException::class)
    override suspend fun await(timeout: Duration) {
        if (counterValue > 0u) {
            withTimeout(timeout) { internalAwait() }
        }
    }

    private suspend fun internalAwait() {
        if (counterValue > 0u) {
            // Await first value lower than 0 (suspend function).
            stateFlow.first { it <= 0u }
        }
    }
}

В данной реализации вся изначальная функциональность CountDownLatch сохранилась, зато метод await теперь является suspend функцией. Использование @Synchronized тут оправдано, потому что должна быть возможность вызывать метод countDown и геттер на counterValue и вне корутин. Кроме того, внутри блоков @Synchronized никаких suspend функций не вызывается. Разработаем тест для класса CountDownBarrier:

fun main() = runBlocking {
    val сountDownBarrier = CountDownBarrier(5u)

    val awaitTasks = Array(10) { index ->
        async(Dispatchers.Default) {
            сountDownBarrier.await()
            println("Await finished: index=$index, counter=${сountDownBarrier.counterValue}")
        }
    }

    val countDownTasks = Array(1000) { index ->
        async(Dispatchers.Default) {
            println("Count down started: index=$index, counter=${сountDownBarrier.counterValue}")
            сountDownBarrier.countDown()
            println("Count down finished: index=$index, counter=${сountDownBarrier.counterValue}")
        }
    }

    awaitTasks.forEach { it.await() }
    countDownTasks.forEach { it.await() }

    println("Success: counter=${сountDownBarrier.counterValue}")
}

Счетчик CountDownBarrier задан на 5, значит, не раньше чем через 5 вызовов метода countDown мы должны увидеть первые логи Await finished. Посмотрим, что выведет этот код:

Count down started: index=0, counter=5
Count down started: index=0, counter=5
Count down finished: index=0, counter=4
Count down started: index=1, counter=4
Count down finished: index=1, counter=3
Count down started: index=2, counter=3
Count down finished: index=2, counter=2
Count down started: index=3, counter=2
Count down finished: index=3, counter=1
Count down started: index=4, counter=1
Count down finished: index=4, counter=0
Await finished: index=9, counter=0
Await finished: index=8, counter=0
...

Счетчик обнулился, и только после этого ожидающие корутины среагировали на это и вышли из await, значит, класс CountDownBarrier работает корректно.

CyclicBarrier

Класс CyclicBarrier похож на CountDownLatch, но значение внутреннего счетчика меняется не публичным методом countDown, а в зависимости от количества ожидающих потоков. Когда количество ожидающих потоков становится равно заданному значению, все они одновременно выходят из ожидания. После освобождения всех потоков внутренний счетчик возвращается в начальное значение. Реализуем примитив барьера для корутин при помощи StateFlow:

class CoroutinesBarrier(
	val initialCoroutinesCount: UShort
) : Barrier {

    // @Volatile is not necessary here: this field is only used in @Synchronized blocks.
    private var stateFlow = MutableStateFlow(initialCoroutinesCount)

    val countLeftToReleaseBarrier: UShort
        @Synchronized get() = stateFlow.value

    override suspend fun await() {
        internalAwait()
    }

    @Throws(TimeoutCancellationException::class)
    override suspend fun await(timeout: Duration) {
        withTimeout(timeout) { internalAwait() }
    }

    private suspend fun internalAwait() {
        val (flowToAwait, countLeftToRelease) = countDownOrResetBarrier()

        if (countLeftToRelease > 0u) {
            // Await first value lower than 0 (suspend function).
            flowToAwait.first { it <= 0u }
        }
    }

    @Synchronized
    private fun countDownOrResetBarrier(): Pair<Flow<UShort>, UShort> {
        if (stateFlow.value > 0u) {
            --stateFlow.value
        }

        val result = stateFlow to stateFlow.value

        // Reset flow right before releasing awaiting coroutines.
        if (stateFlow.value <= 0u) {
            stateFlow = MutableStateFlow(initialCoroutinesCount)
        }

        return result
    }
}

В отличие от CyclicBarrier, в конструкторе CoroutinesBarrier мы задаем не количество Java потоков, а количество корутин, при котором барьер освобождается. Чтобы можно было сбрасывать значение счетчика в начальное значение, stateFlow объявлена как var. При сбросе счетчика мы записываем в нее новый экземпляр MutableStateFlow. Дополнительная синхронизация (например, @Volatile) на переменную stateFlow не нужна, так как все обращения к этой переменной происходят только внутри блоков @Synchronized. Разработаем тест для класса CoroutinesBarrier:

fun main() = runBlocking {
    val startTimeMillis = System.currentTimeMillis()
    val coroutinesBarrier = CoroutinesBarrier(3u)

    val awaitTasks = Array(6) { index ->
        async(Dispatchers.Default) {
            println("Coroutine started.")
            delay(index * 100L)
            coroutinesBarrier.await(5_000.milliseconds)
            val durationMillis = System.currentTimeMillis() - startTimeMillis
            println("Coroutine released: duration=$durationMillis.")
        }
    }.forEach { it.await() }

    val durationMillis = System.currentTimeMillis() - startTimeMillis
    println("Test finished: duration=$durationMillis.")
}

В этом тесте мы запускаем шесть корутин с увеличивающейся задержкой (100, 200 ... 600 миллисекунд). Сразу после задержки выставлен барьер на три корутины. Посмотрим, что выведет тест:

Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=583.
Coroutine released: duration=583.
Coroutine released: duration=583.
Test finished: duration=583.

Из результатов видно, что барьер работает. Корутины выходили из барьера по три штуки: первые три — спустя 285 миллисекунд, вторые три — спустя 583 миллисекунды. Однако, если в этом тесте увеличить количество корутин с шести до семи, то вместо вывода Test finished программа вылетит с ошибкой спустя пять секунд:

kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms

Это логично, так как семь на три не делится нацело, значит две пачки корутин по три штуки пройдут барьер, а одна последняя корутина будет ждать до таймаута.

Заключение

В данной статье мы рассмотрели, как правильно применять прототипы синхронизации потоков в контексте корутин, выделили плюсы и минусы существующих решений, а также написали собственную реализацию примитивов барьерной синхронизации для корутин. В следующей статье мы разберем реактивные потоки, раздельный доступ, семафоры и акторы.

Комментарии (6)


  1. Rusrst
    14.07.2023 17:47
    +1

    Да ёлки палки, есть semaphore, а на его бинарной вариации можно собрать что угодно синхронизированное из мира coroutines. Один раз попробовать и альтернативы и не нужны будут, мне так старший товарищ дал пинка под одно место (спасибо ему огромное) и я написал реализацию reentrantlock (стандартную никак не завезут), но для coroutines.

    P.s. кстати непонятно почему именно их вы не упомянули.


    1. Aprsta Автор
      14.07.2023 17:47

      Да, это правда, семафоры как и мьютексы даже есть в библиотеке kotlinx.coroutines и на их основе можно конструировать что угодно. Про них тоже собирался написать, но решил что статья получается слишком большая и разбил ее на две части. Первая часть получилась попроще, про более известные методы синхронизации и StateFlow.

      А вот как раз во второй части будут семафоры с большим количеством примеров, SharedFlow, расскажу про практику выполнения кода на выделенном потоке и думаю даже удастся рассказать про каналы и акторы)


      1. Rusrst
        14.07.2023 17:47

        Мьютекс и семафор в принципе единственное что там есть. Все остальные примитивы синхронизации используют пакеты jvm и в мультиплатформе не применимо из-за этого. Варианты flow и каналов не рассматриваем, они хоть и потокобезопасны, но вот как на них сделать потокобезопасную коллекцию я не представляю. Да и вообще синхронизировать доступ к произвольной переменной.


  1. ZakharSS
    14.07.2023 17:47

    "В отличие от CyclicBarrier в конструкторе мы задаем"... Тут опечатка, вместо CyclicBarrier нужно CountDownLatch. Спасибо за статью!


    1. Aprsta Автор
      14.07.2023 17:47

      Спасибо за отзыв, но там написано правильно. Имелось ввиду в классе CyclicBarrier мы задаем число Java потоков, а в нашей реализации CoroutinesBarrier мы задаем именно число корутин, при которых барьер освобождается. Сейчас поправлю эту фразу в статье, чтобы было понятнее.


  1. Evgenij_Popovich
    14.07.2023 17:47

    У StateFlow есть потокобезопасный метод update. Это я к тому, что нет необходимости синхронизации в вашей реализации CountdownBarrier