Как компилятор преобразует suspend код, чтобы корутины можно было приостанавливать и возобновлять?

Корутины в Kotlin представлены ключевым словом suspend. Интересно, что там происходит внутри? Как компилятор преобразует suspend блоки в код, поддерживающий приостановку и возобновление работы корутины?

Знание этого поможет понимать, почему suspend функция не возвращает управление, пока не завершится вся запущенная работа и как код может приостановить выполнение без блокировки потоков.

TL;DR; Компилятор Kotlin создает специальную машину состояний для каждой suspend функции, эта машина берет управление корутиной на себя!

Новенький в Android? Взгляни на эти полезные ресурсы по корутинам:

Для тех, кто предпочитает видео:

Корутины, краткое введение

Говоря по-простому, корутины это асинхронные операции в Android. Как описано в документации, мы можем использовать корутины для управления асинхронными задачами, которые иначе могут блокировать основной поток и приводить к зависанию UI приложения.

Также корутины удобно использовать для замены callback-кода на императивный код. Например, посмотрите на этот код с использованием колбеков:

// Simplified code that only considers the happy path
fun loginUser(userId: String, password: String, userResult: Callback<User>) {
  // Async callbacks
  userRemoteDataSource.logUserIn { user ->
    // Successful network request
    userLocalDataSource.logUserIn(user) { userDb ->
      // Result saved in DB
      userResult.success(userDb)
    }
  }
}

Заменяем эти колбеки на последовательные вызовы функций с использованием корутин:

suspend fun loginUser(userId: String, password: String): UserDb {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

Для функций, которые вызываются в корутинах, мы добавили ключевое слово suspend. Так компилятор знает, что эти функции для корутин. С точки зрения разработчика, рассматривайте suspend функцию как обычную, выполнение которой может быть приостановлено и возобновлено в определенный момент.

В отличие от колбеков, корутины предлагают простой способ переключения между потоками и обработки исключений.

Но что в действительности делает компилятор внутри, когда мы отмечаем функцию как suspend?

Suspend под капотом

Давайте вернемся к suspend функции loginUser, посмотрите, другие функции которые она вызывает являются также suspend функциями:

suspend fun loginUser(userId: String, password: String): UserDb {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

// UserRemoteDataSource.kt
suspend fun logUserIn(userId: String, password: String): User

// UserLocalDataSource.kt
suspend fun logUserIn(userId: String): UserDb

Кратко говоря, компилятор Kotlin берет suspend функции и преобразовывает их в оптимизированную версию колбеков с использованием конечной машины состояний (о которой мы поговорим позже).

Интерфейс Continuation

Suspend функции взаимодействуют друг с другом с помощью Continuation объектов. Continuation объект - это простой generic интерфейс с дополнительными данными. Позже мы увидим, что сгенерированная машина состояний для suspend функции будет реализовывать этот интерфейс.

Сам интерфейс выглядит так:

interface Continuation<in T> {
  public val context: CoroutineContext
  public fun resumeWith(value: Result<T>)
}
  • context это экземпляр CoroutineContext, который будет использоваться при возобновлении.

  • resumeWith возобновляет выполнение корутины с Result, он может либо содержать результат вычисления, либо исключение.

С Kotlin 1.3 и далее, вы можете использовать extensions функции resume(value: T) и resumeWithException(exception: Throwable), это специализированные версии метода resumeWith.

Компилятор заменяет ключевое слово suspend на дополнительный аргумент completion (тип Continuation) в функции, аргуемнт используется для передачи результата suspend функции в вызывающую корутину:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  completion.resume(userDb)
}

Для упрощения, наш пример возвращает Unit вместо объекта User.

Байткод suspend функций фактически возвращает Any? так как это объединение (union) типов T | COROUTINE_SUSPENDED. Что позволяет функции возвращать результат синхронно, когда это возможно.

Если suspend функция не вызывает другие suspend функции, компилятор добавляет аргумент Continuation, но не будет с ним ничего делать, байткод функции будет выглядеть как обычная функция.

Кроме того, интерфейс Continuation можно увидеть в:

  • При конвертации колбек-API в корутины с использованием suspendCoroutine или suspendCancellableCoroutine (предпочтительнее использовать в большинстве случаев). Вы напрямую взаимодействуете с экземпляром Continuation, чтобы возобновить корутину, приостановленную после выполнения блока кода из аргументов suspend функции.

  • Вы можете запустить корутину при помощи startCoroutine extension функции в suspend методе. Она принимает Continuation как аргумент, который будет вызван, когда новая корутина завершится либо с результатом, либо с исключением.

Используем Dispatchers

Вы можете переключаться между разными диспетчерами для запуска вычислений на разных потоках. Как Kotlin знает, где возобновить suspend вычисления?

Есть подтип Continuation, он называется DispatchedContinuation, где его метод resumeделает вызов Dispatcher доступного в контексте корутины CoroutineContext. Все диспетчеры (Dispatchers) будут вызывать метод dispatch, кроме типа Dispatchers.Unconfined, он переопределяет метод isDispatchNeeded (он вызывается перед вызовом dispatch), который возвращает false в этом случае.

Сгенрированная машина состояний

Уточнение: Приведенный код не полностью соответствует байткоду сгенерированному компилятором. Это будет код на Kotlin, достаточно точный, для понимания того, что в действительности происходит внутри. Это представление сгенерировано корутинами версии 1.3.3 и может поменяться в следующих версиях библиотеки.

Компилятор Kotlin определяет, когда функция может остановится внутри. Каждая точка останова представляется как отдельное состояние в конечной машине состояний. Такие состояния компилятор помечает метками:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  
  // Label 0 -> first execution
  val user = userRemoteDataSource.logUserIn(userId, password)
  
  // Label 1 -> resumes from userRemoteDataSource
  val userDb = userLocalDataSource.logUserIn(user)
  
  // Label 2 -> resumes from userLocalDataSource
  completion.resume(userDb)
}

Компилятор использует when для состояний:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  when(label) {
    0 -> { // Label 0 -> first execution
        userRemoteDataSource.logUserIn(userId, password)
    }
    1 -> { // Label 1 -> resumes from userRemoteDataSource
        userLocalDataSource.logUserIn(user)
    }
    2 -> { // Label 2 -> resumes from userLocalDataSource
        completion.resume(userDb)
    }
    else -> throw IllegalStateException(/* ... */)
  }
}

Этот код неполный, так как различные состояния не могут обмениваться информацией. Компилятор использует для обмена тот же самый объект Continuation. Вот почему родительски тип в Continuation это Any? вместо ожидаемого возвращаемого типа User.

При этом компилятор создает приватный класс, который:

  1. хранит нужные данные

  2. вызывает функцию loginUser рекурсивно для возобновления вычисления

Ниже представлен примерный вид такого сгенерированного класса:

Комментарии в коде были добавлены вручную для объяснения действий

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
  
  class LoginUserStateMachine(
    // completion parameter is the callback to the function 
    // that called loginUser
    completion: Continuation<Any?>
  ): CoroutineImpl(completion) {
  
    // Local variables of the suspend function
    var user: User? = null
    var userDb: UserDb? = null
  
    // Common objects for all CoroutineImpls
    var result: Any? = null
    var label: Int = 0
  
    // this function calls the loginUser again to trigger the
    // state machine (label will be already in the next state) and
    // result will be the result of the previous state's computation
    override fun invokeSuspend(result: Any?) {
      this.result = result
      loginUser(null, null, this)
    }
  }
  /* ... */
}

Поскольку invokeSuspend вызывает loginUser только с аргументом Continuation, остальные аргументы в функции loginUser будут нулевыми. На этом этапе компилятору нужно только добавить информацию как переходить из одного состояния в другое.

Компилятору нужно знать:

  1. Функция вызывается первый раз или

  2. Функция была возобновлена из предыдущего состояния Для этого проверяется тип аргумента Continuation в функции:

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
  /* ... */
  val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
  /* ... */
}

Если функция вызывается первый раз, то создается новый экземпляр LoginUserStateMachineи аргумент completion передается в этот экземпляр, чтобы возобновить вычисление. Иначе продолжится выполнение машины состояний.

Давайте взглянем на код, который генерирует компилятор для смены состояний и обмена информацией между ними:

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    /* ... */

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Next time this continuation is called, it should go to state 1
            continuation.label = 1
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.user = continuation.result as User
            // Next time this continuation is called, it should go to state 2
            continuation.label = 2
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
          /* ... leaving out the last state on purpose */
    }
}

Обратите внимание на различия между этим и предыдущим примером кода:

  • Появилась переменная label из LoginUserStateMachine, которая передается в when.

  • Каждый раз при обработке нового состояния проверяется есть ли ошибка.

  • Перед вызовом следующей suspend функции (logUserIn), LoginUserStateMachineобновляет переменную label.

  • Когда внутри машины состояний вызывается другая suspend функция, экземпляр Continuation (с типом LoginUserStateMachine) передается как аргумент. Вложенная suspend функция также была преобразована компилятором со своей машиной состояний. Когда эта внутренняя машина состояний завершит свою работу, она возобновит выполнение “родительской” машины состояний.

Последнее состояние должно возобновить выполнение completion через вызов continuation.cont.resume (очевидно что входной аргумент completion, сохраняется в переменной continuation.cont экземпляра LoginUserStateMachine):

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    /* ... */

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        /* ... */
        2 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.userDb = continuation.result as UserDb
            // Resumes the execution of the function that called this one
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(/* ... */)
    }
}

Компилятор Kotlin делает много работы “под капотом”. Из suspend функции:

suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

Генерируется большой кусок кода:

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {

    class LoginUserStateMachine(
        // completion parameter is the callback to the function that called loginUser
        completion: Continuation<Any?>
    ): CoroutineImpl(completion) {
        // objects to store across the suspend function
        var user: User? = null
        var userDb: UserDb? = null

        // Common objects for all CoroutineImpl
        var result: Any? = null
        var label: Int = 0

        // this function calls the loginUser again to trigger the 
        // state machine (label will be already in the next state) and 
        // result will be the result of the previous state's computation
        override fun invokeSuspend(result: Any?) {
            this.result = result
            loginUser(null, null, this)
        }
    }

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Next time this continuation is called, it should go to state 1
            continuation.label = 1
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.user = continuation.result as User
            // Next time this continuation is called, it should go to state 2
            continuation.label = 2
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
        2 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.userDb = continuation.result as UserDb
            // Resumes the execution of the function that called this one
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(/* ... */)
    }
}

Компилятор Kotlin преобразовывает каждую suspend функцию в машину состояний, с использованием обратных вызовов.

Зная как компилятор работает “под капотом”, вы лучше понимаете:

  • почему suspend функция не вернет результат пока не завершится вся работа, которая она начала;

  • каким образом код приостанавливается не блокируя потоки (вся информация, о том что нужно выполнить при возобновлении работы, хранится в объекте Continuation).

Комментарии (8)


  1. ertaquo
    07.04.2022 20:40
    +1

    Глупый вопрос, но почему ключевое слово для короутин именно suspend? Оно ведь означает "приостановить". Почему бы не сделать что-нибудь более понятное, типа async, asynchronous, awaitable, coroutine ?


    1. alexdevyatov
      08.04.2022 01:36
      +1

      suspend не является ключевым словом для корутин. Им помечают функции, которые как раз-таки могут прерываться.


    1. FirsofMaxim Автор
      08.04.2022 06:14
      +1

      Вот здесь есть ответ на ваш вопрос (по-моему в начале видео): https://www.youtube.com/watch?v=rB5Q3y73FTo


  1. Ivanzabu
    07.04.2022 23:15
    +1

    Отличная статья!

    Было бы здорово увидеть более подробное описание того, как это вяжется с многопоточностью и планированием задач.

    Ведь если я ничего не путаю, даже бесконечный цикл в suspend функции не вызывает зависания интерфейса если внутри самого цикла есть точка прерывания, это значит что в какой то момент вместо продолжения дробления данных в цикле state машины - она планирует эту задачу вероятно куда-то в handler.post на android и даёт тем самым прогрузить интерфейс, вот интересно узнать как эта часть работает. (или возможно dispatcher main потока каждую следующую итерацию планирует в post o.O)

    Ну и вообще наверное по работе dispatcher - ов должно быть много интересного - там же получается наверное возможна ситуация, когда одна корутина в процессе выполнения переключилась на другой поток, а другая наоборот переключилась с другого на текущий, и планировщик должен запустить её и вот это все))


    1. nevdokimof
      08.04.2022 00:24
      +2

      она планирует эту задачу вероятно куда-то в handler.post на android и даёт тем самым прогрузить интерфейс

      Именно так.

      Следите за руками: компилятор генерит имплементацию Continuation для каждой suspend функции. Эта имплементация наследуется от ContinuationImpl, где при вызове resumeWith() в текущем контексте ищется ContinuationInterceptor, у которого вызывается методinterceptContinuation().

      Идём далее, стандартные диспетчеры реализуют СontinuationInterceptor, и в методе interceptContinuation()они превращают переданный Continuation в DispatchedContinuation. DispatchedContinuation нужен для 2 вещей: во-первых, сам по себе в конечном итоге наследуется от Runnable, во-вторых, в resumeWith() он ищет в контексте CoroutineDispatcher и вызывает у него метод dispatch(context: CoroutineContext, block: Runnable) с самим собой в качестве аргумента.

      Дальше очень просто — в dispatch() переданные раннаблы раскидываются по экзекьюторам (ну, на самом деле не очень просто, всякие балансировки и оптимизации тоже присутствуют). Default и IO диспетчеры используют пулы потоков, а Main в Android использует Handler.

      В рамках комментария, конечно, сложно нормально описать всю эту кухню, есть в планах сделать статью по этой теме, да всё никак руки не доходят.


      1. Ivanzabu
        08.04.2022 00:30

        Спасибо за ответ, примерно так себе это и представлял, но теперь можно быть уверенным))

        Надеюсь когда нибудь у вас дойдут руки до статьи, было бы здорово)


      1. FirsofMaxim Автор
        08.04.2022 06:17

        Спасибо за подробный flow.


    1. FirsofMaxim Автор
      08.04.2022 06:16

      Спасибо, в оригинальной статье этот момент не описывается и мне тоже "нехватило" этой инфы. Постараюсь найти по этой теме, что-нибудь в ближайшее время.