Знакомство
Всем привет! Меня зовут Иван, я Android-разработчик и сегодня я хочу поговорить с вами о такой вещи, как Coroutines в языке Kotlin
Не знаю, как у вас, а я всегда интересовался как устроены те или иные вещи на низком уровне, поэтому, когда я начал активно заниматься Android разработкой и познакомился с Kotlin, одним из главных вопросов, который не уходил у меня из головы, как устроены Coroutines на низком уровне?
Обычно, когда ты начинаешь задаваться подобным вопросом, в голове возникает ассоциация с чем-то магическим. Есть какая-то неведомая сущность, которой ты говоришь что-то сделать и она что-то делает. При изучении Coroutines в самом начале пути ощущения у меня были точно такими же

Конечно же, никаких гномов-волшебников не существует, а сами Coroutines работают по определенным алгоритмам, причем не то чтобы очень сложным
В рамках этой статьи я не буду претендовать на полную достоверность и всезнайность. Я, как и многие тысячи разработчиков, ежедневно сталкиваюсь с Coroutines и просто хочу поделиться своим пониманием того, как они устроены и получить обратный фидбек
Поехали
Во-первых, давайте ответим на вопрос: что такое Coroutines?
Для этого предлагаю разделить этот вопрос на три категории и ответить с учетом каждой
Асинхронность - Coroutines являются асинхронным механизмом, это означает, что если у нас есть несколько отдельных операций, которые мы можем выполнять параллельно друг другу, то при помощи Coroutines мы можем с легкостью это сделать, наши операции действительно смогут выполняться одновременно, либо параллельно (если у нас есть больше двух потоков), либо асинхронно (механизм будет освобождать поток для выполнения других задач)
Гибкий механизм - Coroutines созданы под влиянием множества других механизмов, которые были раньше на языке Java и в целом мире многопоточности. Он позволяет покрыть все кейсы, связанные с многопоточностью/асинхронностью и представляет собой единую библиотеку. Благодаря этому мы с легкостью теперь можем перемещаться от проекта к проекту и не переживать, что нам придётся учить новые библиотеки/паттерны работы с многопоточностью
Реализация - этому я уделю все остальное внимание в этой статье
Реализация
Итак, что представляет собой Coroutine с точки зрения реализации?

Если говорить максимально абстрактно, то я бы сказал что Coroutine - это контейнер, который состоит из двух слоев
CoroutineContext - тоже контейнер, внутри которого находится множество элементов, регулирующих выполнение этой самой Coroutine. Жизненный цикл, обработка ошибок и все остальное
Continuation - тут мне нравится приводить подобный пример: представим, что у нас есть операция запроса к серверу, она долгая и мы должны дождаться результата и после этого идти дальше. Как мы обычно это делаем? Прокидываем Callback в ту функцию которая является блокирующей, а после того, как она выполнила свои операции, она вызывает этот переданный ей Callback и передает туда результат своих вычислений. Continuation - это точно такой же Callback, а основным методом является resumeWith
Вот как выглядит интерфейс Continuation:
/** * Interface representing a continuation after a suspension point that returns a value of type `T`. */ @SinceKotlin("1.3") public interface Continuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result: Result<T>) }
Подробнее про Continuation
Давайте попробуем осознать реальность на осязаемом примере
У нас есть следующий код:
suspend fun operationC() { delay(500) println("End operation C") } suspend fun operationB() { operationC() println("End operation B") } suspend fun operationA() { operationB() println("End operation A") } suspend fun main() = operationA()
Если попробовать визуализировать этот код, основываясь на знаниях, полученных выше, то мы можем создать схему, где функции вызывают друг друга последовательно, передавая в каждую свой Continuation, они в свою очередь принимают его и когда приходит время вызывают также последовательно метод resumeWith

Конечно, все немного сложнее
Перед тем как перейти к тому, как устроен механизм Coroutine я предлагаю мысленно задать следующие вопросы. Это те вопросы которые я задавал сам себе и которые мне помогли построить корректную картину мира работы Coroutine
Что происходит с coroutine после приостановки? Как она возвращается в нужное состояние?
В какой момент coroutine понимает, что ей нужно приостановить выполнение?
Как вообще для coroutine организуется смена потоков?
Постараюсь сегодня ответить на первые два вопроса, а третий оставим на будущее :)
Что происходит с Coroutine после приостановки? Как она возвращается в нужное состояние?
Для того чтобы ответить на этот вопрос попробуем понять во что компилируется наш код
Гигастрашный байт-код, открывать на свой страх и риск
import kotlin.Metadata; import kotlin.ResultKt; import kotlin.Unit; import kotlin.coroutines.Continuation; import kotlin.coroutines.intrinsics.IntrinsicsKt; import kotlin.coroutines.jvm.internal.ContinuationImpl; import kotlin.coroutines.jvm.internal.RunSuspendKt; import kotlinx.coroutines.DelayKt; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @Metadata( mv = {2, 3, 0}, k = 2, xi = 50, d1 = {"\u0000\n\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0005\u001a\u000e\u0010\u0000\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0003\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0004\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002\u001a\u000e\u0010\u0005\u001a\u00020\u0001H\u0086@¢\u0006\u0002\u0010\u0002¨\u0006\u0006"}, d2 = {"operationC", "", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "operationB", "operationA", "main", "Sources of fmk-android.WhereMyChildren.main"} ) public final class TestKt { @Nullable public static final Object operationC(@NotNull Continuation $completion) { Continuation $continuation; label20: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if (($continuation.label & Integer.MIN_VALUE) != 0) { $continuation.label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl($completion) { // $FF: synthetic field Object result; int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return TestKt.operationC((Continuation)this); } }; } Object $result = $continuation.result; Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch ($continuation.label) { case 0: ResultKt.throwOnFailure($result); $continuation.label = 1; if (DelayKt.delay(500L, $continuation) == var3) { return var3; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } System.out.println("End operation C"); return Unit.INSTANCE; } @Nullable public static final Object operationB(@NotNull Continuation $completion) { Continuation $continuation; label20: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if (($continuation.label & Integer.MIN_VALUE) != 0) { $continuation.label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl($completion) { // $FF: synthetic field Object result; int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return TestKt.operationB((Continuation)this); } }; } Object $result = $continuation.result; Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch ($continuation.label) { case 0: ResultKt.throwOnFailure($result); $continuation.label = 1; if (operationC($continuation) == var3) { return var3; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } System.out.println("End operation B"); return Unit.INSTANCE; } @Nullable public static final Object operationA(@NotNull Continuation $completion) { Continuation $continuation; label20: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if (($continuation.label & Integer.MIN_VALUE) != 0) { $continuation.label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl($completion) { // $FF: synthetic field Object result; int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return TestKt.operationA((Continuation)this); } }; } Object $result = $continuation.result; Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch ($continuation.label) { case 0: ResultKt.throwOnFailure($result); $continuation.label = 1; if (operationB($continuation) == var3) { return var3; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } System.out.println("End operation A"); return Unit.INSTANCE; } @Nullable public static final Object main(@NotNull Continuation $completion) { Object var10000 = operationA($completion); return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE; } // $FF: synthetic method public static void main(String[] args) { RunSuspendKt.runSuspend(null.INSTANCE); } }
Как уже было сказано, для каждой функции генерируется свой Continuation который передается дальше, давайте попробуем взглянуть как выглядит псевдокод Continuation функции A
class OperationAContinuation( completion: Continuation<Unit> ) : ContinuationImpl(completion) { var label: Int = 0 var result: Any? = null override fun invokeSuspend(result: Result<Any?>): Any? { this.result = result.getOrNull() when (label) { 0 -> { label = 1 val r = operationB(this) if (r === COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } 1 -> { result.getOrThrow() } else -> error("Invalid state") } println("End operation A") return Unit } }
Как можно увидеть, там совершенно не то что мы ожидали изначально. Вместо Continuation у нас ContinuationImpl, вместо resumeWith непонятный метод invokeSuspend. Давайте попробуем разобраться почему так
ContinuationImpl и стейт-машина
Во-первых, поймём, что такое ContinuationImpl
ContinuationImpl - это реализация интерфейса Continuation (на самом деле мы реализуем класс BaseContinuationImpl, а тот в свою очередь уже Continuation, но об этом внизу). То есть внутри себя он уже реализует метод resumeWith, а значит именно по этой причине мы его не видим
Как же тогда выглядит реализованный resumeWith?
То как выглядит реализация со всеми комментариями
// This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result while (true) { with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return } } } }
Давайте попробуем разделить этот метод на несколько составных частей:
Бесконечный цикл - наш алгоритм работает в бесконечном while(true) и завершается только когда мы сами ему говорим завершиться
-
Выполнение операции - мы вызываем метод invokeSuspend и получаем результат
val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } -
Затем результат передаётся через метод resumeWith
if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return }
То есть резюмируя, resumeWith выполняет какую-то операцию в бесконечном цикле, потом когда операция успешно выполнилась то передает результат операции в родительский Continuation через resumeWith
Супер, а что все таки такое invokeSuspend?
Как мы уже увидели, invokeSuspend - это набор операций, которые вызываются в бесконечном цикле и возвращают конечный результат. Если говорить правильным языком, invokeSuspend является методом, реализующим паттерн state-machine, а в state-machine как раз хранится тело нашей функции (в данном случае функции A)
То есть у нас есть список шагов, и каждый раз, когда мы находимся на том или ином шаге мы выполняем определенный список операций, а после выполнения мы меняем наш шаг на следующий и при последующем вызове нашей state-machine мы выполняем уже другие операции. И так до конца
Еще раз отвечаем на вопрос: что происходит с coroutine после приостановки? Как она возвращается в нужное состояние?
Основываясь на полученных знаниях мы можем легко ответить на наш вопрос. После того как Coroutine приостанавливается и мы вновь хотим вернуться к ее выполнению, мы вызываем метод resumeWith. Тот в свою очередь входит в бесконечный цикл постоянно вызывая нашу state-machine через метод invokeSuspend и когда мы получаем конечный результат мы вызываем родительский Continuation через метод resumeWith
Попробуем ответить на следующий вопрос: в какой момент Coroutine понимает, что ей нужно приостановить выполнение?
Здесь нам снова стоит обратиться к методу resumeWith, а вернее к следующему участку
val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) }
Здесь мы видим COROUTINE_SUSPENDED. Что это?
Это специальная метка, которая говорит механизму Coroutine, что сейчас нам нужно приостановить работу и передать ресурсы потока другим операциям. А откуда она приходит? А как раз таки из нашей state-machine, с метода invokeSuspend
override fun invokeSuspend(result: Result<Any?>): Any? { this.result = result.getOrNull() when (label) { 0 -> { label = 1 val r = operationB(this) if (r === COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } 1 -> { result.getOrThrow() } else -> error("Invalid state") } println("End operation A") return Unit }
Резюмируя
Когда мы вызываем invokeSuspend, необходимо учитывать, что на одном из шагов (в нашем случае когда мы вызываем suspend-функцию operationB) мы можем получить метку COROUTINE_SUSPENDED которая говорит нам о том что мы должны приостановить работу resumeWith. После приостановки мы вернемся обратно уже только тогда, когда функция operationB вызовет родительский Continuation (функции operationA)
Это... все?
Вот мы вроде ответили на два вопроса которые нас интересовали, но все равно чувствуется какая-то недосказанность. Эта недосказанность связана с методом resumeWith, а точнее с его последней частью
if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return }
По completion.resumeWith(outcome) понятно, это то что мы ожидаем увидеть, а что это такое наверху?

Как мне кажется, это одна из тех оптимизаций которая сделала команда JetBrains и я думаю что у них в голове было следующее:
У нас есть метод resumeWith, он общий и все что там отличается это invokeSuspend который может возвращать разные результаты. Давайте вместо того, чтобы постоянно вызывать родительский resumeWith просто менять курсор и гонять следующий Continuation с его invokeSuspend в этом же бесконечном цикле. А когда уже мы достигнем корневого Continuation будем вызывать resumeWith
То есть если раньше мы исходили из мысли что мы последовательно спускаемся и поднимаемся через наши Continuation, то теперь наша схема выглядит следующим образом

Резюмируя
У нас есть наши ContinuationImpl классы которые реализуют BaseContinuationImpl, они являются цепочками для которых существует подобная оптимизация
Когда мы переходим в корневой Continuation мы вызываем метод resumeWith. Кто реализует этот корневой Continuation? Тут уже может быть множество вариаций. Одной из таких реализаций может быть StandaloneCoroutine, она реализует AbstractCoroutine а тот в свою очередь реализует интерфейс Continuation
Ну и напоследок покажу как выглядит реализация метода resumeWith для AbstractCoroutine
public final override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) }
Итог
Что же, в ходе нашего разбора мы увидели то как в действительности работает механизм Coroutine, из чего состоит Continuation и какой у него основной алгоритм работы.
На самом деле многое ещё осталось за рамками статьи: ни о дополнительных оптимизациях компилятора, ни о смене потоков, ни о регулировании жизненного цикла Coroutine мы сегодня не поговорили. Звучит как повод покопаться дальше и вновь вернуться к вам с новой статьей :)
Спасибо всем за внимание, жду ваши вопросы!!!