Знакомство

Всем привет! Меня зовут Иван, я Android-разработчик и сегодня я хочу поговорить с вами о такой вещи, как Coroutines в языке Kotlin

Не знаю, как у вас, а я всегда интересовался как устроены те или иные вещи на низком уровне, поэтому, когда я начал активно заниматься Android разработкой и познакомился с Kotlin, одним из главных вопросов, который не уходил у меня из головы, как устроены Coroutines на низком уровне?

Обычно, когда ты начинаешь задаваться подобным вопросом, в голове возникает ассоциация с чем-то магическим. Есть какая-то неведомая сущность, которой ты говоришь что-то сделать и она что-то делает. При изучении Coroutines в самом начале пути ощущения у меня были точно такими же

Тот самый гном-волшебник, который помогает при работе с корутинами
Тот самый гном-волшебник, который помогает при работе с корутинами

Конечно же, никаких гномов-волшебников не существует, а сами Coroutines работают по определенным алгоритмам, причем не то чтобы очень сложным

В рамках этой статьи я не буду претендовать на полную достоверность и всезнайность. Я, как и многие тысячи разработчиков, ежедневно сталкиваюсь с Coroutines и просто хочу поделиться своим пониманием того, как они устроены и получить обратный фидбек

Поехали

Во-первых, давайте ответим на вопрос: что такое Coroutines?

Для этого предлагаю разделить этот вопрос на три категории и ответить с учетом каждой

  • Асинхронность - Coroutines являются асинхронным механизмом, это означает, что если у нас есть несколько отдельных операций, которые мы можем выполнять параллельно друг другу, то при помощи Coroutines мы можем с легкостью это сделать, наши операции действительно смогут выполняться одновременно, либо параллельно (если у нас есть больше двух потоков), либо асинхронно (механизм будет освобождать поток для выполнения других задач)

  • Гибкий механизм - Coroutines созданы под влиянием множества других механизмов, которые были раньше на языке Java и в целом мире многопоточности. Он позволяет покрыть все кейсы, связанные с многопоточностью/асинхронностью и представляет собой единую библиотеку. Благодаря этому мы с легкостью теперь можем перемещаться от проекта к проекту и не переживать, что нам придётся учить новые библиотеки/паттерны работы с многопоточностью

  • Реализация - этому я уделю все остальное внимание в этой статье

Реализация

Итак, что представляет собой Coroutine с точки зрения реализации?

Одна из картинок в моей презентации
Одна из картинок в моей презентации

Если говорить максимально абстрактно, то я бы сказал что Coroutine - это контейнер, который состоит из двух слоев

  1. CoroutineContext - тоже контейнер, внутри которого находится множество элементов, регулирующих выполнение этой самой Coroutine. Жизненный цикл, обработка ошибок и все остальное

  2. Continuation - тут мне нравится приводить подобный пример: представим, что у нас есть операция запроса к серверу, она долгая и мы должны дождаться результата и после этого идти дальше. Как мы обычно это делаем? Прокидываем Callback в ту функцию которая является блокирующей, а после того, как она выполнила свои операции, она вызывает этот переданный ей Callback и передает туда результат своих вычислений. Continuation - это точно такой же Callback, а основным методом является resumeWith

Вот как выглядит интерфейс Continuation:

/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

Подробнее про Continuation

Давайте попробуем осознать реальность на осязаемом примере

У нас есть следующий код:

suspend fun operationC() {  
    delay(500)  
    println("End operation C")  
}  
  
suspend fun operationB() {  
    operationC()  
    println("End operation B")  
}  
  
suspend fun operationA() {  
    operationB()  
    println("End operation A")  
}  
  
suspend fun main() = operationA()

Если попробовать визуализировать этот код, основываясь на знаниях, полученных выше, то мы можем создать схему, где функции вызывают друг друга последовательно, передавая в каждую свой Continuation, они в свою очередь принимают его и когда приходит время вызывают также последовательно метод resumeWith

Идем последовательно вниз и вверх
Идем последовательно вниз и вверх

Конечно, все немного сложнее

Перед тем как перейти к тому, как устроен механизм Coroutine я предлагаю мысленно задать следующие вопросы. Это те вопросы которые я задавал сам себе и которые мне помогли построить корректную картину мира работы Coroutine

  • Что происходит с coroutine после приостановки? Как она возвращается в нужное состояние?

  • В какой момент coroutine понимает, что ей нужно приостановить выполнение?

  • Как вообще для coroutine организуется смена потоков?

Постараюсь сегодня ответить на первые два вопроса, а третий оставим на будущее :)

Что происходит с Coroutine после приостановки? Как она возвращается в нужное состояние?

Для того чтобы ответить на этот вопрос попробуем понять во что компилируется наш код

Гигастрашный байт-код, открывать на свой страх и риск
import kotlin.Metadata;  
import kotlin.ResultKt;  
import kotlin.Unit;  
import kotlin.coroutines.Continuation;  
import kotlin.coroutines.intrinsics.IntrinsicsKt;  
import kotlin.coroutines.jvm.internal.ContinuationImpl;  
import kotlin.coroutines.jvm.internal.RunSuspendKt;  
import kotlinx.coroutines.DelayKt;  
import org.jetbrains.annotations.NotNull;  
import org.jetbrains.annotations.Nullable;  
  
@Metadata(  
   mv = {2, 3, 0},  
   k = 2,  
   xi = 50,  
   d1 = {"\u0000\n\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0005\u001a\u000e\u0010\u0000\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0003\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0004\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0005\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002¨\u0006\u0006"},  
   d2 = {"operationC", "", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "operationB", "operationA", "main", "Sources of fmk-android.WhereMyChildren.main"}  
)  
public final class TestKt {  
   @Nullable  
   public static final Object operationC(@NotNull Continuation $completion) {  
      Continuation $continuation;  
      label20: {  
         if ($completion instanceof <undefinedtype>) {  
            $continuation = (<undefinedtype>)$completion;  
            if (($continuation.label & Integer.MIN_VALUE) != 0) {  
               $continuation.label -= Integer.MIN_VALUE;  
               break label20;  
            }  
         }  
  
         $continuation = new ContinuationImpl($completion) {  
            // $FF: synthetic field  
            Object result;  
            int label;  
  
            @Nullable  
            public final Object invokeSuspend(@NotNull Object $result) {  
               this.result = $result;  
               this.label |= Integer.MIN_VALUE;  
               return TestKt.operationC((Continuation)this);  
            }  
         };  
      }  
  
      Object $result = $continuation.result;  
      Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();  
      switch ($continuation.label) {  
         case 0:  
            ResultKt.throwOnFailure($result);  
            $continuation.label = 1;  
            if (DelayKt.delay(500L, $continuation) == var3) {  
               return var3;  
            }  
            break;  
         case 1:  
            ResultKt.throwOnFailure($result);  
            break;  
         default:  
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");  
      }  
  
      System.out.println("End operation C");  
      return Unit.INSTANCE;  
   }  
  
   @Nullable  
   public static final Object operationB(@NotNull Continuation $completion) {  
      Continuation $continuation;  
      label20: {  
         if ($completion instanceof <undefinedtype>) {  
            $continuation = (<undefinedtype>)$completion;  
            if (($continuation.label & Integer.MIN_VALUE) != 0) {  
               $continuation.label -= Integer.MIN_VALUE;  
               break label20;  
            }  
         }  
  
         $continuation = new ContinuationImpl($completion) {  
            // $FF: synthetic field  
            Object result;  
            int label;  
  
            @Nullable  
            public final Object invokeSuspend(@NotNull Object $result) {  
               this.result = $result;  
               this.label |= Integer.MIN_VALUE;  
               return TestKt.operationB((Continuation)this);  
            }  
         };  
      }  
  
      Object $result = $continuation.result;  
      Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();  
      switch ($continuation.label) {  
         case 0:  
            ResultKt.throwOnFailure($result);  
            $continuation.label = 1;  
            if (operationC($continuation) == var3) {  
               return var3;  
            }  
            break;  
         case 1:  
            ResultKt.throwOnFailure($result);  
            break;  
         default:  
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");  
      }  
  
      System.out.println("End operation B");  
      return Unit.INSTANCE;  
   }  
  
   @Nullable  
   public static final Object operationA(@NotNull Continuation $completion) {  
      Continuation $continuation;  
      label20: {  
         if ($completion instanceof <undefinedtype>) {  
            $continuation = (<undefinedtype>)$completion;  
            if (($continuation.label & Integer.MIN_VALUE) != 0) {  
               $continuation.label -= Integer.MIN_VALUE;  
               break label20;  
            }  
         }  
  
         $continuation = new ContinuationImpl($completion) {  
            // $FF: synthetic field  
            Object result;  
            int label;  
  
            @Nullable  
            public final Object invokeSuspend(@NotNull Object $result) {  
               this.result = $result;  
               this.label |= Integer.MIN_VALUE;  
               return TestKt.operationA((Continuation)this);  
            }  
         };  
      }  
  
      Object $result = $continuation.result;  
      Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();  
      switch ($continuation.label) {  
         case 0:  
            ResultKt.throwOnFailure($result);  
            $continuation.label = 1;  
            if (operationB($continuation) == var3) {  
               return var3;  
            }  
            break;  
         case 1:  
            ResultKt.throwOnFailure($result);  
            break;  
         default:  
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");  
      }  
  
      System.out.println("End operation A");  
      return Unit.INSTANCE;  
   }  
  
   @Nullable  
   public static final Object main(@NotNull Continuation $completion) {  
      Object var10000 = operationA($completion);  
      return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;  
   }  
  
   // $FF: synthetic method  
   public static void main(String[] args) {  
      RunSuspendKt.runSuspend(null.INSTANCE);  
   }  
}

Как уже было сказано, для каждой функции генерируется свой Continuation который передается дальше, давайте попробуем взглянуть как выглядит псевдокод Continuation функции A

class OperationAContinuation(
    completion: Continuation<Unit>
) : ContinuationImpl(completion) {

    var label: Int = 0
    var result: Any? = null

    override fun invokeSuspend(result: Result<Any?>): Any? {
        this.result = result.getOrNull()

        when (label) {
            0 -> {
                label = 1

                val r = operationB(this)
                if (r === COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED
                }
            }

            1 -> {
                result.getOrThrow()
            }

            else -> error("Invalid state")
        }

        println("End operation A")
        return Unit
    }
}

Как можно увидеть, там совершенно не то что мы ожидали изначально. Вместо Continuation у нас ContinuationImpl, вместо resumeWith непонятный метод invokeSuspend. Давайте попробуем разобраться почему так

ContinuationImpl и стейт-машина

Во-первых, поймём, что такое ContinuationImpl

ContinuationImpl - это реализация интерфейса Continuation (на самом деле мы реализуем класс BaseContinuationImpl, а тот в свою очередь уже Continuation, но об этом внизу). То есть внутри себя он уже реализует метод resumeWith, а значит именно по этой причине мы его не видим

Как же тогда выглядит реализованный resumeWith?

То как выглядит реализация со всеми комментариями
// This implementation is final. This fact is used to unroll resumeWith recursion.  
public final override fun resumeWith(result: Result<Any?>) {  
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume  
    var current = this  
    var param = result  
    while (true) {  
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure  
        // can precisely track what part of suspended callstack was already resumed        probeCoroutineResumed(current)  
        with(current) {  
            val completion = completion!! // fail fast when trying to resume continuation without completion  
            val outcome: Result<Any?> =  
                try {  
                    val outcome = invokeSuspend(param)  
                    if (outcome === COROUTINE_SUSPENDED) return  
                    Result.success(outcome)  
                } catch (exception: Throwable) {  
                    Result.failure(exception)  
                }  
            releaseIntercepted() // this state machine instance is terminating  
            if (completion is BaseContinuationImpl) {  
                // unrolling recursion via loop  
                current = completion  
                param = outcome  
            } else {  
                // top-level completion reached -- invoke and return  
                completion.resumeWith(outcome)  
                return  
            }  
        }  
    }  
}
public final override fun resumeWith(result: Result<Any?>) {  
    var current = this  
    var param = result  
    while (true) {  
        with(current) {  
            val completion = completion!!
            val outcome: Result<Any?> =  
                try {  
                    val outcome = invokeSuspend(param)  
                    if (outcome === COROUTINE_SUSPENDED) return  
                    Result.success(outcome)  
                } catch (exception: Throwable) {  
                    Result.failure(exception)  
                }  

            if (completion is BaseContinuationImpl) {  
                current = completion  
                param = outcome  
            } else {  
                completion.resumeWith(outcome)  
                return  
            }  
        }  
    }  
}

Давайте попробуем разделить этот метод на несколько составных частей:

  1. Бесконечный цикл - наш алгоритм работает в бесконечном while(true) и завершается только когда мы сами ему говорим завершиться

  2. Выполнение операции - мы вызываем метод invokeSuspend и получаем результат

    val completion = completion!!
    val outcome: Result<Any?> =  
        try {  
            val outcome = invokeSuspend(param)  
            if (outcome === COROUTINE_SUSPENDED) return  
            Result.success(outcome)  
        } catch (exception: Throwable) {  
            Result.failure(exception)  
        }  
    
  3. Затем результат передаётся через метод resumeWith

    if (completion is BaseContinuationImpl) {  
        current = completion  
        param = outcome  
    } else {  
        completion.resumeWith(outcome)  
        return  
    }  

То есть резюмируя, resumeWith выполняет какую-то операцию в бесконечном цикле, потом когда операция успешно выполнилась то передает результат операции в родительский Continuation через resumeWith

Супер, а что все таки такое invokeSuspend?

Как мы уже увидели, invokeSuspend - это набор операций, которые вызываются в бесконечном цикле и возвращают конечный результат. Если говорить правильным языком, invokeSuspend является методом, реализующим паттерн state-machine, а в state-machine как раз хранится тело нашей функции (в данном случае функции A)

То есть у нас есть список шагов, и каждый раз, когда мы находимся на том или ином шаге мы выполняем определенный список операций, а после выполнения мы меняем наш шаг на следующий и при последующем вызове нашей state-machine мы выполняем уже другие операции. И так до конца

Еще раз отвечаем на вопрос: что происходит с coroutine после приостановки? Как она возвращается в нужное состояние?

Основываясь на полученных знаниях мы можем легко ответить на наш вопрос. После того как Coroutine приостанавливается и мы вновь хотим вернуться к ее выполнению, мы вызываем метод resumeWith. Тот в свою очередь входит в бесконечный цикл постоянно вызывая нашу state-machine через метод invokeSuspend и когда мы получаем конечный результат мы вызываем родительский Continuation через метод resumeWith

Попробуем ответить на следующий вопрос: в какой момент Coroutine понимает, что ей нужно приостановить выполнение?

Здесь нам снова стоит обратиться к методу resumeWith, а вернее к следующему участку

val outcome: Result<Any?> =  
    try {  
        val outcome = invokeSuspend(param)  
        if (outcome === COROUTINE_SUSPENDED) return  
        Result.success(outcome)  
    } catch (exception: Throwable) {  
        Result.failure(exception)  
    }  

Здесь мы видим COROUTINE_SUSPENDED. Что это?

Это специальная метка, которая говорит механизму Coroutine, что сейчас нам нужно приостановить работу и передать ресурсы потока другим операциям. А откуда она приходит? А как раз таки из нашей state-machine, с метода invokeSuspend

override fun invokeSuspend(result: Result<Any?>): Any? {
    this.result = result.getOrNull()

    when (label) {
        0 -> {
            label = 1

            val r = operationB(this)
            if (r === COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }

        1 -> {
            result.getOrThrow()
        }

        else -> error("Invalid state")
    }

    println("End operation A")
    return Unit
}

Резюмируя

Когда мы вызываем invokeSuspend, необходимо учитывать, что на одном из шагов (в нашем случае когда мы вызываем suspend-функцию operationB) мы можем получить метку COROUTINE_SUSPENDED которая говорит нам о том что мы должны приостановить работу resumeWith. После приостановки мы вернемся обратно уже только тогда, когда функция operationB вызовет родительский Continuation (функции operationA)

Это... все?

Вот мы вроде ответили на два вопроса которые нас интересовали, но все равно чувствуется какая-то недосказанность. Эта недосказанность связана с методом resumeWith, а точнее с его последней частью

if (completion is BaseContinuationImpl) {  
    current = completion  
    param = outcome  
} else {  
    completion.resumeWith(outcome)  
    return  
}  

По completion.resumeWith(outcome) понятно, это то что мы ожидаем увидеть, а что это такое наверху?

Сложно и запутанно, вы согласны?
Сложно и запутанно, вы согласны?

Как мне кажется, это одна из тех оптимизаций которая сделала команда JetBrains и я думаю что у них в голове было следующее:

У нас есть метод resumeWith, он общий и все что там отличается это invokeSuspend который может возвращать разные результаты. Давайте вместо того, чтобы постоянно вызывать родительский resumeWith просто менять курсор и гонять следующий Continuation с его invokeSuspend в этом же бесконечном цикле. А когда уже мы достигнем корневого Continuation будем вызывать resumeWith

То есть если раньше мы исходили из мысли что мы последовательно спускаемся и поднимаемся через наши Continuation, то теперь наша схема выглядит следующим образом

В одном и том же resumeWith гоняем разные Continuation, и свой и родительские
В одном и том же resumeWith гоняем разные Continuation, и свой и родительские

Резюмируя

У нас есть наши ContinuationImpl классы которые реализуют BaseContinuationImpl, они являются цепочками для которых существует подобная оптимизация

Когда мы переходим в корневой Continuation мы вызываем метод resumeWith. Кто реализует этот корневой Continuation? Тут уже может быть множество вариаций. Одной из таких реализаций может быть StandaloneCoroutine, она реализует AbstractCoroutine а тот в свою очередь реализует интерфейс Continuation

Ну и напоследок покажу как выглядит реализация метода resumeWith для AbstractCoroutine

public final override fun resumeWith(result: Result<T>) {
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

Итог

Что же, в ходе нашего разбора мы увидели то как в действительности работает механизм Coroutine, из чего состоит Continuation и какой у него основной алгоритм работы.

На самом деле многое ещё осталось за рамками статьи: ни о дополнительных оптимизациях компилятора, ни о смене потоков, ни о регулировании жизненного цикла Coroutine мы сегодня не поговорили. Звучит как повод покопаться дальше и вновь вернуться к вам с новой статьей :)

Спасибо всем за внимание, жду ваши вопросы!!!

Комментарии (0)