В этой статье мы рассмотрим, как и почему изменилась реализация примитивов синхронизации из стандартной библиотеки Java и пакета java.util.concurrent для Kotlin Coroutines и для языка Kotlin в целом. Сразу хочу предупредить: рассматриваемые в статье библиотеки и классы будут оцениваться не с точки зрения поддержки legacy-функциональности и возможности использовать их в Java, а с точки зрения эффективности и возможности использовать их в корутинах и Kotlin Multiplatform. Поэтому эта статья будет больше полезна тем, кто собирается писать новые проекты на языке Kotlin.
В рамках данной статьи будут рассмотрены:
критические секции;
атомарные переменные;
реактивные переменные;
барьерная синхронизация.
Критические секции
Здесь мы не будем очень подробно останавливаться на стандартных реализациях критической секции в языке Java, таких как synchronized и ReentrantLock. Написано множество статей на эту тему, и я уверен, они всем известны. Рассмотрим, почему их использование для синхронизации корутин нежелательно, и приведем альтернативу.
synchronized / @Synchronized
Разработаем простой тест, где будем на разных потоках делать инкремент переменной value:
fun main() = runBlocking {
var obj = Any()
var value = 0
Array(100_000) {
async(Dispatchers.Default) {
synchronized(obj) {
++value
}
}
}.forEach { it.await() }
assertEquals(100_000, value)
}
Данный код запускает 100 000 параллельно выполняющихся корутин (настолько параллельно, насколько это позволяет количество потоков в Dispatchers.Default). В данном примере синхронизация блоком synchronized будет работать корректно, и тест будет пройден. Модифицируем этот код, добавив suspend функцию delay внутрь блока synchronized:
...
synchronized(obj) {
delay(1)
++value
}
...
Данный код даже не скомпилируется с ошибкой:
The 'delay' suspension point is inside a critical section
Все дело в том, что мы не можем вызывать никакие suspend функции внутри блока синхронизации, потому что текущий поток корутины должен уметь освобождаться для использования в других корутинах в момент начала активного ожидания. Стандартные реализации критической секции не рассчитаны на то, что в середине блока синхронизации поток может освободиться для других задач, а после ожидания suspend функции выполнение может быть продолжено на совершенно другом потоке.
ReentrantLock
В примере выше нас спас компилятор. Давайте посмотрим, что произойдет при использовании библиотечной реализации критической секции, например, ReentrantLock:
...
val lock = ReentrantLock()
...
lock.lock()
try {
delay(1)
++value
} finally {
lock.unlock()
}
...
Такой код скомпилируется, но синхронизация не будет работать, а может, даже произойдет вылет с IllegalMonitorStateException. Что интересно, если в данном примере использовать lock.withLock { ... }
, мы получим уже знакомую нам ошибку компиляции.
То же самое касается и полезнейшего класса ReentrantReadWriteLock, который позволяет экономить на операциях read after read. К сожалению, для синхронизации корутин он не будет работать по той же причине.
Mutex
В примерах с synchronized/ReentrantLock и suspend функциями нас мог кое-где спасти компилятор, а в худшем случае мы бы получили несинхронизированный код с вероятным вылетом в рантайме. К счастью для нас, разработчики корутин подумали о таком сценарии и специально для этого разработали интерфейс Mutex и его стандартную имплементацию. Модифицируем код при помощи Mutex:
...
val mutex = Mutex()
...
mutex.withLock {
delay(1)
++value
}
...
Такой код отработает корректно: операции над переменной value синхронизированы, suspend функция delay не мешает синхронизации.
К сожалению, класс ReadWriteMutex (аналогичный ReentrantReadWriteLock) по состоянию на июль 2023 года еще не добавили, однако обсуждение давно ведется, и есть уже минимум одна реализация в PullRequest к GitHub Kotlin/kotlinx.coroutines.
Вы можете сказать, что для данного примера синхронизация единственной переменной value при помощи критической секции — не самое концептуально правильное решение. И будете правы, поэтому синхронизацию одиночных переменных рассмотрим в следующем разделе.
Атомарные переменные
Начнем этот раздел со спойлера: volatile (в котлине @Volatile) и классы пакета java.util.concurrent.atomic (например AtomicReference, AtomicInteger...) при правильном применении будут работать с корутинами не хуже, чем со стандартными Java потоками или какими-нибудь многопоточными фреймворками.
volatile / @Volatile
Для начала вспомним, для чего нам вообще нужен @Volatile. Разные потоки для ускорения работы могут кэшировать у себя значения глобальных переменных. Из-за чего в других потоках при обращении к этим переменным мы не всегда будем видеть их актуальное значение. Рассмотрим код, иллюстрирующий суть проблемы:
@Volatile var value1 = true
@Volatile var value2 = true
fun main() = runBlocking {
repeat(1_000) {
value1 = true
value2 = true
val job1 = async(Dispatchers.Default) {
value2 = false
while(value1);
}
val job2 = async(Dispatchers.Default) {
value1 = false
while(value2);
}
job1.await()
job2.await()
}
println("Success!")
}
Это правильный код, который успешно выполнится, но, если убрать аннотацию @Volatile хотя бы с одной переменной, то тест бесконечно зависнет в одном из циклов while, потому что поток корутины использует кэшированное значение общей переменной.
Однако и @Volatile можно использовать неправильно. Рассмотрим другой пример:
@Volatile var value = true
fun main() = runBlocking {
Array(10_000) {
async(Dispatchers.Default) {
value = !value
}
}.forEach { it.await() }
assertTrue(value)
}
В данном тесте запускается 10 000 конкурирующих корутин, которые меняют значение value на противоположное. Так как количество корутин чётное, итоговое значение не должно измениться и должно остаться равным true. Однако в половине случаев тест падает, даже несмотря на @Volatile. Дело в том, что операция value=!value — неатомарная, то есть состоит из нескольких операций: чтение value, потом запись нового value. Если неатомарные операции часто вызываются в любой многопоточной среде, есть риск получить неправильное значение переменной.
java.util.concurrent.atomic
Большую гибкость по сравнению с @Volatile дают классы пакета java.util.concurrent.atomic, которые позволяют делать атомарными операции инкремента, декремента, compareAndSet (атомарно сравнить с предыдущим значением и записать), getAndSet (атомарное получение старого значения и запись нового) и ряд других. Рассмотрим как можно более эффективно и просто синхронизировать уже знакомый нам код из раздела про критические секции:
fun main() = runBlocking {
var value = 0
Array(100_000) {
async(Dispatchers.Default) {
++value
}
}.forEach { it.await() }
assertEquals(100_000, value)
}
В данном примере кода переменная value не синхронизирована, поэтому значение к концу теста value=100_000 не гарантировано. Так как инкремент по умолчанию — неатомарная операция (++value — это по сути value=value+1 чтение и запись), то и @Volatile тут не поможет. Однако, если модифицировать этот код с помощью класса AtomicInteger:
fun main() = runBlocking {
var value = AtomicInteger(0)
Array(100_000) {
async(Dispatchers.Default) {
value.incrementAndGet()
}
}.forEach { it.await() }
assertEquals(100_000, value.get())}
Благодаря атомарности операции incrementAndGet в классе AtomicInteger синхронизация начинает работать правильно. Если задача не ограничивается настолько простыми операциями, и нужно синхронизировать целые участки кода, на помощь приходят уже рассмотренные нами критические секции.
kotlinx.atomicfu
Как я писал в начале раздела, если использовать volatile и классы пакета java.util.concurrent.atomic правильно, то они будет корректно работать и в корутинах. Тогда зачем же нужно что-то еще? — спросите вы.
Дело в том, что этот тезис касается JVM и Android, однако Kotlin и корутины уже давно вышли за пределы этих двух платформ. Если вы уже разрабатываете на Kotlin Multiplaform или потенциально планируете переход, то без kotlinx.atomicfu вам не обойтись:
fun main() = runBlocking {
var value = atomic(0)
Array(100_000) {
async(Dispatchers.Default) {
value.incrementAndGet()
}
}.forEach { it.await() }
assertEquals(100_000, value.value)
}
В любом случае, переходя на kotlinx.atomicfu для JVM и Android, вы ничего не теряете — все функции библиотеки помечены как actual и внутри используют тот же пакет java.util.concurrent.atomic.
Реактивные переменные
Атомарные переменные — это хорошо, но часто необходимо также подписываться на изменение переменной и выполнять в обработчике какой-то код. В этом разделе мы рассмотрим реализации реактивных переменных, то есть классов, которые позволяют:
через generic задать тип значения;
записывать значение синхронно;
получать значение синхронно;
хранить это значение как внутреннее состояние;
подписываться на изменения значения.
Реализации, не обладающие всеми этими свойствами, в данном разделе рассмотрены не будут.
LiveData
Первое решение, которое мы рассмотрим, — это LiveData. Уверен, многие Android-разработчики не раз сталкивались с этим классом. LiveData позволяет хранить значение, получать и менять его напрямую через геттер\сеттер, подписываться на изменение значения, привязывать подписку к LifecycleOwner.
Очень полезный класс, долгое время спасавший Android-разработчиков, работает с Android Data Binding и с Jetpack Compose, но не без недостатков. Их мы сейчас и рассмотрим.
Первый недостаток очевиден из того, что я писал выше: класс LiveData — штука специфичная и доступна только для платформы Android, нигде больше использовать его не получится.
Из первого вытекает и второй недостаток: основные методы setValue \ observe должны вызываться только на Main потоке приложения (Dispatchers.Main), иначе будет вылет в рантайме.
val liveData = MutableLiveData(true)
withContext(Dispatchers.Default) {
// IllegalStateException: Cannot invoke setValue on a background thread
liveData.value = false
}
Конечно, если использовать LiveData по основному назначению, например, для связи ViewModel + Fragment\Activity, скорее всего, никаких проблем не возникнет. Однако мой опыт говорит о том, что порой разработчики пытаются использовать этот класс еще и на уровне интеракторов\репозиториев и вообще повсюду, где может понадобиться реактивность, что порождает вылеты и замедление Main потока лишними вычислениями. Есть, конечно, метод postValue, который можно вызывать на любом потоке, но не без нюансов:
val liveData = MutableLiveData(true)
withContext(Dispatchers.Default) {
println(liveData.value) // Prints true
liveData.postValue(false)
println(liveData.value) // Also prints true
}
Значение false запланировано для записи в liveData на Main потоке, но сразу значение false мы не увидим.
Еще один недостаток: LiveData не дает такой гибкости как, например, RxJava с ее методами преобразования вроде map, filter, distinct, debounce и др. Ну это и понятно: LiveData — это не полноценный фреймворк, а класс, созданный для решения конкретных проблем.
Ну и последнее. Значение LiveData всегда может быть null. Даже если в Kotlin явно указать тип значения без вопроса, например, LiveData<Int>, это не мешает нам записать и считать null.
BehaviorSubject
Похожая на LiveData штука — BehaviorSubject из RxJava. Этот класс хранит значение, позволяет его менять, получать и подписываться на него. В то же время он дает возможность пользоваться всей функциональной мощью RxJava и может применяться для JVM, а не только для платформы Android. Еще один неоспоримый плюс: методы getValue и onNext (по сути setValue) синхронизированы и могут свободно использоваться в любой многопоточной среде, а во время подписки можно выбирать поток обработки с помощью метода observeOn.
Из явных минусов можно выделить то, что nullable значения при записи в BehaviorSubject нужно упаковывать в какой-то буферный класс (а потом распаковывать при получении), потому что null не является валидным значением для этого класса.
Так как в этой статье мы рассматриваем в качестве основного фреймворка для многопоточной разработки именно корутины и Coroutines Flow, было бы странно, особенно для новых проектов, добавлять в проект еще и библиотеку RxJava. Поэтому разработчики корутин разработали собственную реализацию реактивных переменных, которую мы рассмотрим в следующем пункте.
StateFlow
Интерфейс StateFlow и его реализация являются частью Coroutines Flow и решают все проблемы LiveData и BehaviorSubject. Класс хранит значение, позволяет его получать, менять, а также подписываться на его изменение, как и в уже рассмотренных выше классах. Однако есть и ряд преимуществ:
Значение value (запись и получение) синхронизировано и может использоваться в любой многопоточной среде.
Возможность записи\чтения null контролируется на уровне языка через generics. Если объявить StateFlow<Int?>, то value будет nullable, а если StateFlow<Int>, то нет.
StateFlow включает в себя все преимущества Coroutines Flow и возможность вызова suspend функций внутри преобразующих методов или прямо внутри collect.
При подписке на изменения есть возможность привязки к LifecycleOwner (через lifecycleScope) для Android.
Работает вместе с Android Data Binding, Jetpack Compose и даже Compose Multiplatform.
StateFlow работает не только для Android и JVM, но еще и для других платформ фреймворка Kotlin Multiplatform.
StateFlow — достаточно мощный и универсальный инструмент, который может использоваться для широкого спектра задач, таких как: связь ViewModel и View в архитектуре MVVM, хранение изменяющихся данных на уровне репозиториев с возможностью подписки на них, преобразование данных через функции Coroutines Flow и даже конструирование примитивов синхронизации, на чем мы подробно остановимся в следующем разделе.
Барьерная синхронизация
В данном разделе будет больше креатива — в корутинах пока еще нет стандартных реализаций барьерной синхронизации, поэтому нам придется самим написать этот код. Обычно под барьерной синхронизацией понимается следующее: несколько разных потоков ждут какого-то события, а когда оно происходит, потоки одновременно выходят из ожидания и продолжают свою работу. В случае с корутинами будет примерно то же самое, но ждать события будут не потоки, а, собственно, корутины. Значит, и метод ожидания будет suspend функцией. Выразим это через интерфейс:
interface Barrier {
suspend fun await()
@Throws(TimeoutCancellationException::class)
suspend fun await(timeout: Duration)
}
Очень простой интерфейс всего с двумя функциями: одна ждет с таймаутом, вторая без. Далее в этом разделе мы будем реализовывать недостающие нам классы барьерной синхронизации из пакета java.util.concurrent при помощи этого интерфейса.
CountDownLatch
Класс CountDownLatch из пакета java.util.concurrent действует следующим образом: в одних потоках мы уменьшаем счетчик на 1, а в других потоках ожидаем, пока счетчик не станет равным 0. Напомним функциональность этого класса:
задаем счетчик один раз в конструкторе, значение счетчика больше или равно нулю;
уменьшение счетчика на единицу (метод countDown);
получение значения счетчика синхронно (метод getCount);
ожидание обнуления счетчика с таймаутом (метод await).
Достаточно простой и полезный класс, но есть проблема, которая не позволяет эффективно использовать этот класс в корутинах. Проблема — в методе ожидания обнуления счетчика (метод await). Он не является suspend функцией и блокирует поток, а таймаут вызывает InterruptedException. Из-за этого эффективность корутин падает: текущий поток корутины не освобождается для задач в других корутинах. Также страдает и отменяемость: при вызове метода cancel (на текущей корутине или на всем scope целиком) корутина, зависшая в блокирующем методе await, не сможет отмениться, по крайней мере не сразу.
StateFlow — универсальный и мощный класс, который позволяет нам написать собственную реализацию CountDownLatch меньше чем за 50 строчек кода.
class CountDownBarrier(count: UInt) : Barrier {
private val stateFlow = MutableStateFlow(count)
val counterValue: UInt
@Synchronized get() = stateFlow.value
@Synchronized
fun countDown() {
if (stateFlow.value > 0u) {
--stateFlow.value
}
}
override suspend fun await() {
internalAwait()
}
@Throws(TimeoutCancellationException::class)
override suspend fun await(timeout: Duration) {
if (counterValue > 0u) {
withTimeout(timeout) { internalAwait() }
}
}
private suspend fun internalAwait() {
if (counterValue > 0u) {
// Await first value lower than 0 (suspend function).
stateFlow.first { it <= 0u }
}
}
}
В данной реализации вся изначальная функциональность CountDownLatch сохранилась, зато метод await теперь является suspend функцией. Использование @Synchronized тут оправдано, потому что должна быть возможность вызывать метод countDown и геттер на counterValue и вне корутин. Кроме того, внутри блоков @Synchronized никаких suspend функций не вызывается. Разработаем тест для класса CountDownBarrier:
fun main() = runBlocking {
val сountDownBarrier = CountDownBarrier(5u)
val awaitTasks = Array(10) { index ->
async(Dispatchers.Default) {
сountDownBarrier.await()
println("Await finished: index=$index, counter=${сountDownBarrier.counterValue}")
}
}
val countDownTasks = Array(1000) { index ->
async(Dispatchers.Default) {
println("Count down started: index=$index, counter=${сountDownBarrier.counterValue}")
сountDownBarrier.countDown()
println("Count down finished: index=$index, counter=${сountDownBarrier.counterValue}")
}
}
awaitTasks.forEach { it.await() }
countDownTasks.forEach { it.await() }
println("Success: counter=${сountDownBarrier.counterValue}")
}
Счетчик CountDownBarrier задан на 5, значит, не раньше чем через 5 вызовов метода countDown мы должны увидеть первые логи Await finished. Посмотрим, что выведет этот код:
Count down started: index=0, counter=5
Count down started: index=0, counter=5
Count down finished: index=0, counter=4
Count down started: index=1, counter=4
Count down finished: index=1, counter=3
Count down started: index=2, counter=3
Count down finished: index=2, counter=2
Count down started: index=3, counter=2
Count down finished: index=3, counter=1
Count down started: index=4, counter=1
Count down finished: index=4, counter=0
Await finished: index=9, counter=0
Await finished: index=8, counter=0
...
Счетчик обнулился, и только после этого ожидающие корутины среагировали на это и вышли из await, значит, класс CountDownBarrier работает корректно.
CyclicBarrier
Класс CyclicBarrier похож на CountDownLatch, но значение внутреннего счетчика меняется не публичным методом countDown, а в зависимости от количества ожидающих потоков. Когда количество ожидающих потоков становится равно заданному значению, все они одновременно выходят из ожидания. После освобождения всех потоков внутренний счетчик возвращается в начальное значение. Реализуем примитив барьера для корутин при помощи StateFlow:
class CoroutinesBarrier(
val initialCoroutinesCount: UShort
) : Barrier {
// @Volatile is not necessary here: this field is only used in @Synchronized blocks.
private var stateFlow = MutableStateFlow(initialCoroutinesCount)
val countLeftToReleaseBarrier: UShort
@Synchronized get() = stateFlow.value
override suspend fun await() {
internalAwait()
}
@Throws(TimeoutCancellationException::class)
override suspend fun await(timeout: Duration) {
withTimeout(timeout) { internalAwait() }
}
private suspend fun internalAwait() {
val (flowToAwait, countLeftToRelease) = countDownOrResetBarrier()
if (countLeftToRelease > 0u) {
// Await first value lower than 0 (suspend function).
flowToAwait.first { it <= 0u }
}
}
@Synchronized
private fun countDownOrResetBarrier(): Pair<Flow<UShort>, UShort> {
if (stateFlow.value > 0u) {
--stateFlow.value
}
val result = stateFlow to stateFlow.value
// Reset flow right before releasing awaiting coroutines.
if (stateFlow.value <= 0u) {
stateFlow = MutableStateFlow(initialCoroutinesCount)
}
return result
}
}
В отличие от CyclicBarrier, в конструкторе CoroutinesBarrier мы задаем не количество Java потоков, а количество корутин, при котором барьер освобождается. Чтобы можно было сбрасывать значение счетчика в начальное значение, stateFlow объявлена как var. При сбросе счетчика мы записываем в нее новый экземпляр MutableStateFlow. Дополнительная синхронизация (например, @Volatile) на переменную stateFlow не нужна, так как все обращения к этой переменной происходят только внутри блоков @Synchronized. Разработаем тест для класса CoroutinesBarrier:
fun main() = runBlocking {
val startTimeMillis = System.currentTimeMillis()
val coroutinesBarrier = CoroutinesBarrier(3u)
val awaitTasks = Array(6) { index ->
async(Dispatchers.Default) {
println("Coroutine started.")
delay(index * 100L)
coroutinesBarrier.await(5_000.milliseconds)
val durationMillis = System.currentTimeMillis() - startTimeMillis
println("Coroutine released: duration=$durationMillis.")
}
}.forEach { it.await() }
val durationMillis = System.currentTimeMillis() - startTimeMillis
println("Test finished: duration=$durationMillis.")
}
В этом тесте мы запускаем шесть корутин с увеличивающейся задержкой (100, 200 ... 600 миллисекунд). Сразу после задержки выставлен барьер на три корутины. Посмотрим, что выведет тест:
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=583.
Coroutine released: duration=583.
Coroutine released: duration=583.
Test finished: duration=583.
Из результатов видно, что барьер работает. Корутины выходили из барьера по три штуки: первые три — спустя 285 миллисекунд, вторые три — спустя 583 миллисекунды. Однако, если в этом тесте увеличить количество корутин с шести до семи, то вместо вывода Test finished программа вылетит с ошибкой спустя пять секунд:
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms
Это логично, так как семь на три не делится нацело, значит две пачки корутин по три штуки пройдут барьер, а одна последняя корутина будет ждать до таймаута.
Заключение
В данной статье мы рассмотрели, как правильно применять прототипы синхронизации потоков в контексте корутин, выделили плюсы и минусы существующих решений, а также написали собственную реализацию примитивов барьерной синхронизации для корутин. В следующей статье мы разберем реактивные потоки, раздельный доступ, семафоры и акторы.
Комментарии (6)
ZakharSS
14.07.2023 17:47"В отличие от CyclicBarrier в конструкторе мы задаем"... Тут опечатка, вместо CyclicBarrier нужно CountDownLatch. Спасибо за статью!
Aprsta Автор
14.07.2023 17:47Спасибо за отзыв, но там написано правильно. Имелось ввиду в классе CyclicBarrier мы задаем число Java потоков, а в нашей реализации CoroutinesBarrier мы задаем именно число корутин, при которых барьер освобождается. Сейчас поправлю эту фразу в статье, чтобы было понятнее.
Evgenij_Popovich
14.07.2023 17:47У
StateFlow
есть потокобезопасный методupdate
. Это я к тому, что нет необходимости синхронизации в вашей реализацииCountdownBarrier
Rusrst
Да ёлки палки, есть semaphore, а на его бинарной вариации можно собрать что угодно синхронизированное из мира coroutines. Один раз попробовать и альтернативы и не нужны будут, мне так старший товарищ дал пинка под одно место (спасибо ему огромное) и я написал реализацию reentrantlock (стандартную никак не завезут), но для coroutines.
P.s. кстати непонятно почему именно их вы не упомянули.
Aprsta Автор
Да, это правда, семафоры как и мьютексы даже есть в библиотеке kotlinx.coroutines и на их основе можно конструировать что угодно. Про них тоже собирался написать, но решил что статья получается слишком большая и разбил ее на две части. Первая часть получилась попроще, про более известные методы синхронизации и StateFlow.
А вот как раз во второй части будут семафоры с большим количеством примеров, SharedFlow, расскажу про практику выполнения кода на выделенном потоке и думаю даже удастся рассказать про каналы и акторы)
Rusrst
Мьютекс и семафор в принципе единственное что там есть. Все остальные примитивы синхронизации используют пакеты jvm и в мультиплатформе не применимо из-за этого. Варианты flow и каналов не рассматриваем, они хоть и потокобезопасны, но вот как на них сделать потокобезопасную коллекцию я не представляю. Да и вообще синхронизировать доступ к произвольной переменной.