Как компилятор преобразует suspend код, чтобы корутины можно было приостанавливать и возобновлять?
Корутины в Kotlin представлены ключевым словом suspend. Интересно, что там происходит внутри? Как компилятор преобразует suspend блоки в код, поддерживающий приостановку и возобновление работы корутины?
Знание этого поможет понимать, почему suspend функция не возвращает управление, пока не завершится вся запущенная работа и как код может приостановить выполнение без блокировки потоков.
TL;DR; Компилятор Kotlin создает специальную машину состояний для каждой suspend функции, эта машина берет управление корутиной на себя!
Новенький в Android? Взгляни на эти полезные ресурсы по корутинам:
Для тех, кто предпочитает видео:
Корутины, краткое введение
Говоря по-простому, корутины это асинхронные операции в Android. Как описано в документации, мы можем использовать корутины для управления асинхронными задачами, которые иначе могут блокировать основной поток и приводить к зависанию UI приложения.
Также корутины удобно использовать для замены callback-кода на императивный код. Например, посмотрите на этот код с использованием колбеков:
// Simplified code that only considers the happy path
fun loginUser(userId: String, password: String, userResult: Callback<User>) {
// Async callbacks
userRemoteDataSource.logUserIn { user ->
// Successful network request
userLocalDataSource.logUserIn(user) { userDb ->
// Result saved in DB
userResult.success(userDb)
}
}
}
Заменяем эти колбеки на последовательные вызовы функций с использованием корутин:
suspend fun loginUser(userId: String, password: String): UserDb {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}
Для функций, которые вызываются в корутинах, мы добавили ключевое слово suspend. Так компилятор знает, что эти функции для корутин. С точки зрения разработчика, рассматривайте suspend функцию как обычную, выполнение которой может быть приостановлено и возобновлено в определенный момент.
В отличие от колбеков, корутины предлагают простой способ переключения между потоками и обработки исключений.
Но что в действительности делает компилятор внутри, когда мы отмечаем функцию как suspend?
Suspend под капотом
Давайте вернемся к suspend функции loginUser
, посмотрите, другие функции которые она вызывает являются также suspend функциями:
suspend fun loginUser(userId: String, password: String): UserDb {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}
// UserRemoteDataSource.kt
suspend fun logUserIn(userId: String, password: String): User
// UserLocalDataSource.kt
suspend fun logUserIn(userId: String): UserDb
Кратко говоря, компилятор Kotlin берет suspend функции и преобразовывает их в оптимизированную версию колбеков с использованием конечной машины состояний (о которой мы поговорим позже).
Интерфейс Continuation
Suspend функции взаимодействуют друг с другом с помощью Continuation
объектов. Continuation
объект - это простой generic интерфейс с дополнительными данными. Позже мы увидим, что сгенерированная машина состояний для suspend функции будет реализовывать этот интерфейс.
Сам интерфейс выглядит так:
interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(value: Result<T>)
}
context
это экземплярCoroutineContext
, который будет использоваться при возобновлении.resumeWith
возобновляет выполнение корутины с Result, он может либо содержать результат вычисления, либо исключение.
С Kotlin 1.3 и далее, вы можете использовать extensions функции resume(value: T) и resumeWithException(exception: Throwable), это специализированные версии метода
resumeWith
.
Компилятор заменяет ключевое слово suspend на дополнительный аргумент completion
(тип Continuation
) в функции, аргуемнт используется для передачи результата suspend функции в вызывающую корутину:
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
completion.resume(userDb)
}
Для упрощения, наш пример возвращает Unit
вместо объекта User
.
Байткод suspend функций фактически возвращает Any?
так как это объединение (union) типов T | COROUTINE_SUSPENDED
. Что позволяет функции возвращать результат синхронно, когда это возможно.
Если suspend функция не вызывает другие suspend функции, компилятор добавляет аргумент Continuation, но не будет с ним ничего делать, байткод функции будет выглядеть как обычная функция.
Кроме того, интерфейс Continuation
можно увидеть в:
При конвертации колбек-API в корутины с использованием suspendCoroutine или suspendCancellableCoroutine (предпочтительнее использовать в большинстве случаев). Вы напрямую взаимодействуете с экземпляром
Continuation
, чтобы возобновить корутину, приостановленную после выполнения блока кода из аргументов suspend функции.Вы можете запустить корутину при помощи startCoroutine extension функции в suspend методе. Она принимает
Continuation
как аргумент, который будет вызван, когда новая корутина завершится либо с результатом, либо с исключением.
Используем Dispatchers
Вы можете переключаться между разными диспетчерами для запуска вычислений на разных потоках. Как Kotlin знает, где возобновить suspend вычисления?
Есть подтип Continuation
, он называется DispatchedContinuation, где его метод resume
делает вызов Dispatcher
доступного в контексте корутины CoroutineContext
. Все диспетчеры (Dispatchers
) будут вызывать метод dispatch
, кроме типа Dispatchers.Unconfined
, он переопределяет метод isDispatchNeeded
(он вызывается перед вызовом dispatch
), который возвращает false в этом случае.
Сгенрированная машина состояний
Уточнение: Приведенный код не полностью соответствует байткоду сгенерированному компилятором. Это будет код на Kotlin, достаточно точный, для понимания того, что в действительности происходит внутри. Это представление сгенерировано корутинами версии 1.3.3 и может поменяться в следующих версиях библиотеки.
Компилятор Kotlin определяет, когда функция может остановится внутри. Каждая точка останова представляется как отдельное состояние в конечной машине состояний. Такие состояния компилятор помечает метками:
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
// Label 0 -> first execution
val user = userRemoteDataSource.logUserIn(userId, password)
// Label 1 -> resumes from userRemoteDataSource
val userDb = userLocalDataSource.logUserIn(user)
// Label 2 -> resumes from userLocalDataSource
completion.resume(userDb)
}
Компилятор использует when
для состояний:
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
when(label) {
0 -> { // Label 0 -> first execution
userRemoteDataSource.logUserIn(userId, password)
}
1 -> { // Label 1 -> resumes from userRemoteDataSource
userLocalDataSource.logUserIn(user)
}
2 -> { // Label 2 -> resumes from userLocalDataSource
completion.resume(userDb)
}
else -> throw IllegalStateException(/* ... */)
}
}
Этот код неполный, так как различные состояния не могут обмениваться информацией. Компилятор использует для обмена тот же самый объект Continuation
. Вот почему родительски тип в Continuation
это Any?
вместо ожидаемого возвращаемого типа User
.
При этом компилятор создает приватный класс, который:
хранит нужные данные
вызывает функцию
loginUser
рекурсивно для возобновления вычисления
Ниже представлен примерный вид такого сгенерированного класса:
Комментарии в коде были добавлены вручную для объяснения действий
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
class LoginUserStateMachine(
// completion parameter is the callback to the function
// that called loginUser
completion: Continuation<Any?>
): CoroutineImpl(completion) {
// Local variables of the suspend function
var user: User? = null
var userDb: UserDb? = null
// Common objects for all CoroutineImpls
var result: Any? = null
var label: Int = 0
// this function calls the loginUser again to trigger the
// state machine (label will be already in the next state) and
// result will be the result of the previous state's computation
override fun invokeSuspend(result: Any?) {
this.result = result
loginUser(null, null, this)
}
}
/* ... */
}
Поскольку invokeSuspend
вызывает loginUser
только с аргументом Continuation
, остальные аргументы в функции loginUser
будут нулевыми. На этом этапе компилятору нужно только добавить информацию как переходить из одного состояния в другое.
Компилятору нужно знать:
Функция вызывается первый раз или
Функция была возобновлена из предыдущего состояния Для этого проверяется тип аргумента
Continuation
в функции:
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
/* ... */
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
/* ... */
}
Если функция вызывается первый раз, то создается новый экземпляр LoginUserStateMachine
и аргумент completion
передается в этот экземпляр, чтобы возобновить вычисление. Иначе продолжится выполнение машины состояний.
Давайте взглянем на код, который генерирует компилятор для смены состояний и обмена информацией между ними:
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
/* ... */
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
when(continuation.label) {
0 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Next time this continuation is called, it should go to state 1
continuation.label = 1
// The continuation object is passed to logUserIn to resume
// this state machine's execution when it finishes
userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
}
1 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.user = continuation.result as User
// Next time this continuation is called, it should go to state 2
continuation.label = 2
// The continuation object is passed to logUserIn to resume
// this state machine's execution when it finishes
userLocalDataSource.logUserIn(continuation.user, continuation)
}
/* ... leaving out the last state on purpose */
}
}
Обратите внимание на различия между этим и предыдущим примером кода:
Появилась переменная
label
изLoginUserStateMachine
, которая передается вwhen
.Каждый раз при обработке нового состояния проверяется есть ли ошибка.
Перед вызовом следующей suspend функции (
logUserIn
),LoginUserStateMachine
обновляет переменнуюlabel
.Когда внутри машины состояний вызывается другая suspend функция, экземпляр
Continuation
(с типомLoginUserStateMachine
) передается как аргумент. Вложенная suspend функция также была преобразована компилятором со своей машиной состояний. Когда эта внутренняя машина состояний завершит свою работу, она возобновит выполнение “родительской” машины состояний.
Последнее состояние должно возобновить выполнение completion
через вызов continuation.cont.resume
(очевидно что входной аргумент completion
, сохраняется в переменной continuation.cont
экземпляра LoginUserStateMachine
):
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
/* ... */
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
when(continuation.label) {
/* ... */
2 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.userDb = continuation.result as UserDb
// Resumes the execution of the function that called this one
continuation.cont.resume(continuation.userDb)
}
else -> throw IllegalStateException(/* ... */)
}
}
Компилятор Kotlin делает много работы “под капотом”. Из suspend функции:
suspend fun loginUser(userId: String, password: String): User {
val user = userRemoteDataSource.logUserIn(userId, password)
val userDb = userLocalDataSource.logUserIn(user)
return userDb
}
Генерируется большой кусок кода:
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
class LoginUserStateMachine(
// completion parameter is the callback to the function that called loginUser
completion: Continuation<Any?>
): CoroutineImpl(completion) {
// objects to store across the suspend function
var user: User? = null
var userDb: UserDb? = null
// Common objects for all CoroutineImpl
var result: Any? = null
var label: Int = 0
// this function calls the loginUser again to trigger the
// state machine (label will be already in the next state) and
// result will be the result of the previous state's computation
override fun invokeSuspend(result: Any?) {
this.result = result
loginUser(null, null, this)
}
}
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
when(continuation.label) {
0 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Next time this continuation is called, it should go to state 1
continuation.label = 1
// The continuation object is passed to logUserIn to resume
// this state machine's execution when it finishes
userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
}
1 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.user = continuation.result as User
// Next time this continuation is called, it should go to state 2
continuation.label = 2
// The continuation object is passed to logUserIn to resume
// this state machine's execution when it finishes
userLocalDataSource.logUserIn(continuation.user, continuation)
}
2 -> {
// Checks for failures
throwOnFailure(continuation.result)
// Gets the result of the previous state
continuation.userDb = continuation.result as UserDb
// Resumes the execution of the function that called this one
continuation.cont.resume(continuation.userDb)
}
else -> throw IllegalStateException(/* ... */)
}
}
Компилятор Kotlin преобразовывает каждую suspend функцию в машину состояний, с использованием обратных вызовов.
Зная как компилятор работает “под капотом”, вы лучше понимаете:
почему suspend функция не вернет результат пока не завершится вся работа, которая она начала;
каким образом код приостанавливается не блокируя потоки (вся информация, о том что нужно выполнить при возобновлении работы, хранится в объекте
Continuation
).
Комментарии (8)
Ivanzabu
07.04.2022 23:15+1Отличная статья!
Было бы здорово увидеть более подробное описание того, как это вяжется с многопоточностью и планированием задач.
Ведь если я ничего не путаю, даже бесконечный цикл в suspend функции не вызывает зависания интерфейса если внутри самого цикла есть точка прерывания, это значит что в какой то момент вместо продолжения дробления данных в цикле state машины - она планирует эту задачу вероятно куда-то в handler.post на android и даёт тем самым прогрузить интерфейс, вот интересно узнать как эта часть работает. (или возможно dispatcher main потока каждую следующую итерацию планирует в post o.O)
Ну и вообще наверное по работе dispatcher - ов должно быть много интересного - там же получается наверное возможна ситуация, когда одна корутина в процессе выполнения переключилась на другой поток, а другая наоборот переключилась с другого на текущий, и планировщик должен запустить её и вот это все))
nevdokimof
08.04.2022 00:24+2она планирует эту задачу вероятно куда-то в handler.post на android и даёт тем самым прогрузить интерфейс
Именно так.
Следите за руками: компилятор генерит имплементацию
Continuation
для каждойsuspend
функции. Эта имплементация наследуется от ContinuationImpl, где при вызовеresumeWith()
в текущем контексте ищется ContinuationInterceptor, у которого вызывается методinterceptContinuation()
.Идём далее, стандартные диспетчеры реализуют
СontinuationInterceptor
, и в методеinterceptContinuation()
они превращают переданныйContinuation
вDispatchedContinuation
.DispatchedContinuation
нужен для 2 вещей: во-первых, сам по себе в конечном итоге наследуется отRunnable
, во-вторых, вresumeWith()
он ищет в контекстеCoroutineDispatcher
и вызывает у него методdispatch(context: CoroutineContext, block: Runnable)
с самим собой в качестве аргумента.Дальше очень просто — в
dispatch()
переданные раннаблы раскидываются по экзекьюторам (ну, на самом деле не очень просто, всякие балансировки и оптимизации тоже присутствуют).Default
иIO
диспетчеры используют пулы потоков, аMain
в Android использует Handler.В рамках комментария, конечно, сложно нормально описать всю эту кухню, есть в планах сделать статью по этой теме, да всё никак руки не доходят.
Ivanzabu
08.04.2022 00:30Спасибо за ответ, примерно так себе это и представлял, но теперь можно быть уверенным))
Надеюсь когда нибудь у вас дойдут руки до статьи, было бы здорово)
FirsofMaxim Автор
08.04.2022 06:16Спасибо, в оригинальной статье этот момент не описывается и мне тоже "нехватило" этой инфы. Постараюсь найти по этой теме, что-нибудь в ближайшее время.
ertaquo
Глупый вопрос, но почему ключевое слово для короутин именно
suspend
? Оно ведь означает "приостановить". Почему бы не сделать что-нибудь более понятное, типаasync
,asynchronous
,awaitable
,coroutine
?alexdevyatov
suspend не является ключевым словом для корутин. Им помечают функции, которые как раз-таки могут прерываться.
FirsofMaxim Автор
Вот здесь есть ответ на ваш вопрос (по-моему в начале видео): https://www.youtube.com/watch?v=rB5Q3y73FTo