Предположим, у вас в коде есть критическая секция, которая не должна выполняться более, чем одним потоком одновременно.
В мире Java одним из стандартных решений является добавление ключевого слова synchronized
к сигнатуре метода. В Kotlin для получения того же эффекта используется аннотация @Synchronized
repeat(2) {
thread { criticalSection() }
}
@Synchronized
fun criticalSection() {
println("Starting!")
Thread.sleep(10)
println("Ending!")
}
Данный код выведет следующее:
Starting!
Ending!
Starting!
Ending!
Видно, что оба вызова отработали последовательно, один после другого.
Теперь предположим, что мы хотим использовать корутины. Что у нас получится?
val scope = CoroutineScope(Job())
repeat(2) {
scope.launch { criticalSectionSuspending() }
}
@Synchronized
suspend fun criticalSectionSuspending() {
println("Starting!")
delay(10)
println("Ending!")
}
А получится, что вызовы критической секции пересекутся, что очень не здорово.
Starting!
Starting!
Ending!
Ending!
Понять, что же происходит, можно, разобравшись, как устроены корутины под капотом. Они реализованы с использованием подхода передачи продолжения. (Краткое объяснение можно посмотреть в моём докладе про корутины Grokking Coroutines, Dan Lew, а для более глубокого понимания рекомендую посмотреть доклад Романа Елизарова Deep Dive into Coroutines.)
В рамках этой статьи всё, что вам надо знать — это то, что останавливаемые функции на самом деле не исполняются последовательно, строка за строкой. Когда останавливаемая функция доходит до точки остановки, она ставится на паузу и передаёт управление какой-то другой функции (позже, когда управление будет возвращено этой функции, — она продолжит выполнение).
Таким образом, во втором примере кода на самом деле происходит следующее:
1. criticalSectionSuspending()
стартует, забирает блокировку и печатает Starting!
2. Доходит до delay()
(который является точкой остановки), выходит из функции и отдаёт блокировку.
3. Так как блокировка свободна, начинается второй запуск criticalSectionSuspending()
, которая забирает блокировку, печатает Starting!
, останавливается и тоже отдаёт блокировку.
4. Когда delay()
заканчивается, criticalSectionSuspending()
запускается снова, но уже с предыдущего места остановки.
Для большей наглядности привожу временную диаграмму выполнения одной функции.
Как видите, время остановки проходит вне критической секции и её блокировка не удерживается. Именно поэтому разные потоки имеют доступ к синхронизированной останавливаемой функции — они выполняют её не одновременно.
Это известная проблема в Kotlin. Фактически, компилятор не позволит использовать synchronized() {}
с точкой останова внутри. Такой код не скомпилируется с ошибкой "The 'delay' suspension point is inside a critical section"
Я убеждён, что в случае использования аннотации @Synchronized
поведение компилятора должно быть аналогичным. Проблема заведена в YouTrack, но особых подвижек пока нет.
Как же быть?
Во-первых, следует признать, что проблема не в том, что «мы не можем использовать synchronized
». synchronized
— это просто средство обеспечения работоспособности критических секций. И единственная причина того, что у нас есть эти критические секции в том, что у нас есть общее изменяемое состояние. Соответственно, проблема звучит следующим образом: «нам нужен способ управления общим изменяемым состоянием в многопоточной среде».
К счастью, у нас есть официальное руководство по Kotlin, в котором есть раздел Shared mutable state and concurrency, описывающий несколько неплохих вариантов действий. Для нас более всего подходит секция про мьютексы, так как они наиболее похожи на синхронизацию.
val mutex = Mutex()
val scope = CoroutineScope(Job())
repeat(2) {
scope.launch { criticalSectionSuspendingLocked() }
}
suspend fun criticalSectionSuspendingLocked() {
mutex.withLock {
println("Starting!")
delay(10)
println("Ending!")
}
}
Код выше работает ровно так, как нам нужно, — выводит не перемешанные сообщения.
Я не хочу, чтобы люди считали, что использование мьютексов является универсальным решением при работе с общим изменяемым состоянием. В документации описано несколько подходов, и вы должны самостоятельно оценить, какой из них подходит к вашей конкретной ситуации. Но в случае, когда вы использовали синхронизированную функцию и теперь хотите её приостановить, мьютекс — это самый естественный выбор.
От переводчика:
Эту статью я нашёл во время поисков ответа на вопрос: Почему мьютекс в моём конкретном случае работает сильно медленнее (в полтора - два раза), чем синхронизация? Просто имейте в виду, если у вас нет точек останова в горячей критической секции, то лучше использовать синхронизацию, так как мьютекс может сильно просадить производительность.
gkozlenko
Не разрабатываю на Kotlin, но насколько я понимаю корутины могут использовать один и тот же поток. А синхронизация с помощью synchronize блока или метода обладает reentrance свойством, то есть один и тот же поток может заходить в synchronize блок, не снимая блокировку. А вот мютексы таким свойством не обладают.
rjhdby Автор
Могут один, а могут и разные. Мало того, корутина может "проснуться" не в том потоке, где засыпала.