Корутины — это паттерн проектирования, предназначенный для написания асинхронных программ, способных выполнять нескольких задач одновременно.

В асинхронных программах несколько задач выполняются параллельно в отдельных потоках, не дожидаясь завершения друг-друга. Потоки — ресурс дорогостоящий, и слишком большое их количество выливается в ощутимые накладные расходы в виде высокого потребления памяти и нагрузки на процессор.

Корутины — это альтернативный способ написания асинхронных программ, но в сравнении с потоками гораздо более легковесный. Они представляют собой вычисления, которые работают поверх потоков.

Мы можем приостановить выполнение корутины, чтобы позволить другим корутинам отработать в том же потоке. В дальнейшем выполнение этой корутины может быть возобновлено в том же или даже в другом потоке.

В момент приостановки работы корутины связанные с ней вычисления останавливаются, сохраняются в памяти и удаляются из потока, позволяя ему свободно заниматься другими задачами. Таким образом, мы можем одновременно запускать множество корутин, имея в нашем распоряжении небольшой пул потоков, что позволяет в значительной степени ограничить использование системных ресурсов.

В этой статье мы с вами разберемся, как использовать корутины в Kotlin.

Примеры

К данной статье прилагаются рабочие примеры кода, доступные на GitHub.

Делаем выполнение программы параллельным  с помощью потоков

Начнем мы с того, что запустим программу, которая выполнит несколько операторов и вызовет какую-нибудь долго выполняющуюся функцию:

//Todo: оператор1
//Todo: вызов долго выполняющейся функции
//Todo: оператор2
...
...

Если выполнять все операторы последовательно в одном потоке, то функция longRunningFunction заблокирует выполнение оставшихся операторов, и программа в целом будет выполняться очень долго.

Для повышения эффективности мы запустим функцию longRunningFunction в отдельном потоке, а остальная часть программы продолжит выполняться в главном потоке:

import kotlin.concurrent.thread

fun main() {
    println("My program runs...: 
        ${Thread.currentThread().name}")

    thread {
        longRunningTask()
    }

    println("My program run ends...: 
        ${Thread.currentThread().name}")
}

fun longRunningTask(){
    println("executing longRunningTask on...: 
        ${Thread.currentThread().name}")
    Thread.sleep(1000)
    println("longRunningTask ends on thread ...: 
        ${Thread.currentThread().name}")
}

Вызывая Thread.sleep() внутри функции longRunningTask() мы имитируем задачу, на выполнение которой требуется время. Мы обернем вызов этой функции в функцию thread. Это позволит главному (main) потоку продолжить выполнение, не дожидаясь завершения функции longRunningTask().

Функция longRunningTask() будет выполняться в другом потоке, что мы можем наблюдать из вывода операторов println, если мы запустим эту программу:

My program runs...: main
My program run ends...: main
executing longRunningTask on...: Thread-0
longRunningTask ends on thread ...: Thread-0

Process finished with exit code 0

Как видно из вывода, программа начинает выполняться в потоке main. Она выполняет longRunningTask() в потоке Thread-0, но не дожидается его завершения и сразу переходит к выполнению следующего оператора println() снова в потоке main. Однако программа завершается с кодом выхода 0 только после окончания выполнения longRunningTask в потоке Thread-0.

В следующих разделах мы с вами перепишем эту программу с использованием корутин.

Добавляем необходимые зависимости

Язык Kotlin уже предоставляет нам базовые конструкции для написания корутин из коробки, но в библиотеке kotlinx-coroutines-core нам доступны более полезные конструкции, созданные на основе тех самых базовых корутин. Поэтому перед началом работы с корутинами нам необходимо добавить зависимость от библиотеки kotlinx-coroutines-core.

В качестве инструмента сборки мы здесь используем Gradle, поэтому зависимость от библиотеки kotlinx-coroutines-core будет выглядеть следующим образом:

dependencies {
    implementation 'org.jetbrains.kotlin:kotlin-stdlib'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.2'
}

Как видите, мы добавили зависимость от стандартной библиотеки Kotlin и библиотеки kotlinx-coroutines-core.

Простейшие корутины в Kotlin

Корутины очень часто характеризуют как “легковесные потоки” (lightweight threads), что по сути означает, что мы можем выполнять код с корутинами аналогично тому, как мы выполняем код с потоками. Давайте перепишем предыдущую программу, чтобы она запускала долго выполняющуюся функцию в корутине, а не в отдельном потоке, как показано ниже:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    launch { // запуск корутина
        longRunningTask()  // вызов долго выполняющейся функции
    }

    println("My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningTask(){
    println("executing longRunningTask on...: ${Thread.currentThread().name}")
    delay(1000)  // имитация долгого выполнения путем добавления задержки
    println(
     "longRunningTask ends on thread ...: ${Thread.currentThread().name}")
}

Давайте разберемся, что делает этот код: функция launch{} запускает новую корутину, которая выполняется параллельно с остальным кодом.

runBlocking{} также запускает новую корутину, но блокирует текущий поток (main) на время вызова до тех пор, пока весь код внутри тела функции runBlocking{} полностью не завершит свое выполнение.

Функция longRunningTask называется suspend-функцией. Она приостанавливает выполнение корутины, не блокируя поток, в котором она выполняется, что позволяет другим корутинам использовать этот поток для выполнения своего кода.

Более подробно о запуске новых корутин с помощью таких функций, как launch{} и runBlocking{}, мы поговорим в следующих разделах, посвященных корутин-билдерам и области видимости корутин.

При запуске этой программы мы получим следующий вывод:

My program runs...: main
My program run ends...: main
executing longRunningTask on...: main
longRunningTask ends on a thread ...: main

Process finished with exit code 0

Этот вывод наглядно демонстрирует, что вся программа целиком выполняется в main-потоке. Она не дожидается завершения работы longRunningTask, а сразу переходит к выполнению следующего оператора и выводит My program run ends...: main. Как видно из вывода двух операторов println в функции longRunningTask, корутина выполняется параллельно в том же потоке.

В следующих разделах мы с вами разберем различные компоненты корутин.

Suspend-функции

Suspend-функция — это самый главный элемент построения корутин. Она, как и любая другая обычная функция, может принимать один или несколько аргументов и что-нибудь возвращать. Поток, выполняющий обычную функцию, блокирует работу других функций до завершения ее выполнения. Если функция будет долго выполняться, например, из-за необходимости получить данные по сети через какой-нибудь внешний API, то это негативно скажется на производительности всей программы.

Чтобы избежать возникновения такого рода проблем, можно заменить обычную функцию на suspend-функцию и вызвать ее из области видимости корутины. Вызов suspend-функции приостанавливает выполнение функции и позволяет потоку выполнять другие действия. Через некоторое время приостановленная функция может быть возобновлена в том же или другом потоке.

Синтаксис suspend-функции аналогичен синтаксису обычной функции и объявляется с добавлением ключевого слова suspend, как показано ниже:

suspend fun longRunningTask(){
    ...
    ...
}

Функции, объявленные с ключевым словом suspend, преобразуются во время компиляции и становятся асинхронными. Рассмотрим пример вызова suspend-функции вместе с парой обычных функций:

fun main() = runBlocking{
    println("${Instant.now()}: 
        My program runs...: ${Thread.currentThread().name}")

    val productId = findProduct()

    launch (Dispatchers.Unconfined) { // запускаем корутину
        val price = fetchPrice(productId) // вызов suspend-функции
    }
    updateProduct()

    println("${Instant.now()}: My program run ends...: " +
            "${Thread.currentThread().name}")
}

suspend fun fetchPrice(productId: String) : Double{
    println("${Instant.now()}: fetchPrice starts on...: 
        ${Thread.currentThread().name} ")
    delay(2000) // имитация долгого выполнения путем добавления задержки
    println("${Instant.now()}: fetchPrice ends on...: 
        ${Thread.currentThread().name} ")
    return 234.5
}

fun findProduct() : String{
    println("${Instant.now()}: 
        findProduct on...: ${Thread.currentThread().name}")
    return "P12333"
}

fun updateProduct() : String{
    println("${Instant.now()}: 
        updateProduct on...: ${Thread.currentThread().name}")
    return "Product updated"
}

Как видно из примера, функции findProduct() и updateProduct() являются обычными функциями. Функция fetchPrice() — это долго выполняющаяся функция, что мы смоделировали, добавив оператор delay().

В main() мы сначала вызываем функцию findProduct(), а затем с помощью функции launch{} вызываем suspend-функцию fetchPrice(). После приостановки она возобновляет выполнение корутины в потоке. После этого мы вызываем функцию updateProduct().

Функция launch{} запускает корутину, как было описано ранее. Мы передаем в нее диспетчер (Dispatchers.Unconfined), управляющий потоками, в которых будет запускаться или возобновляться корутина. Более подробно о диспетчерах корутин мы поговорим в следующих разделах.

Запустим эту программу, чтобы посмотреть, как корутина приостанавливается и позволяет потоку выполнять другие обычные функции:

2022-06-24T04:09:40..: My program runs...: main
2022-06-24T04:09:40..: findProduct on...: main
2022-06-24T04:09:40..: fetchPrice starts on...: main
2022-06-24T04:09:40..: updateProduct on...: main
2022-06-24T04:09:40..: My program run ends...: main
2022-06-24T04:09:42..: fetchPrice ends on.: kotlinx.coroutines.DefaultExecutor

Process finished with exit code 0

Как видно из вывода, функции findProduct() и updateProduct() вызываются в потоке main. Функция fetchPrice() также запускается в потоке main и приостанавливается, чтобы позволить отработать функциям findProduct() и updateProduct() в этом же потоке (main). Функция fetchPrice() возобновляется для выполнения оператора println() уже в другом потоке.

Важно также понимать, что suspend-функции могут быть вызваны только другой suspend-функцией или из корутины. Функция delay(), вызываемая внутри функции fetchPrice(), также является suspend-функцией, предоставляемой библиотекой kotlinx-coroutines-core.

Области видимости и билдеры корутин

Как уже говорилось в предыдущих разделах, suspend-функции можно запускать только в областях видимости корутин, создаваемых корутин-билдерами, такими как launch{}.

Для запуска новой корутины мы используем корутин-билдер (coroutine builder), и для того, чтобы очертить ее жизненный цикл, мы устанавливаем соответствующую область видимости (coroutine scope). Область видимости корутины предоставляет методы для управления жизненным циклом корутины, позволяющие запускать и останавливать ее.

Далее мы разберем три корутин-билдера в Kotlin: runBlocking{}, launch{} и async{}:

Запуск корутин с блокированием текущего потока с помощью runBlocking

Корутины более эффективны, чем потоки, поскольку они приостанавливают и возобновляют выполнение вместо того, чтобы блокировать его. Однако в некоторых специфических случаях нам необходимо именно заблокировать поток. Например, в функции main() необходимо блокировать поток, иначе программа завершится, не дождавшись завершения работы корутины.

Корутин-билдер runBlocking запускает корутину, блокируя текущий выполняющийся поток, пока не будет завершен весь код в корутине.

Сигнатура функций runBlocking выглядит следующим образом:

expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, 
                           block: suspend CoroutineScope.() -> T): T

Функция принимает два параметра:

  1. context: Предоставляет контекст корутины, представленный интерфейсом CoroutineContext, который является индексированным набором инстансов Element.

  2. block: Вызываемый код корутины. Он принимает функцию типа suspend CoroutineScope.() -> Unit

Корутин-билдер runBlocking{} служит в качестве связующего элемента между обычным блокирующим кодом и библиотеками, написанными в suspend-стиле. Поэтому наиболее подходящие места для использования runBlocking{} это — функции main и тесты JUnit.

Функция runBlocking{}, вызываемая из функции main(), выглядит следующим образом:

fun main() = runBlocking{
    ...
    ...
}

Мы использовали runBlocking{} для блокировки выполнения во всех функциях main() наших предыдущих примеров.

Поскольку runBlocking{} блокирует выполняющийся поток, он редко используется внутри кода в телах функций, так как потоки — это дорогие ресурсы, и блокировать их неэффективно и нежелательно.

Запуск корутин в режиме “запустил и забыл” с помощью launch

Функция launch{} запускает новую корутину, которая не возвращает вызывающей стороне никакого результата. Она не блокирует текущий поток. Сигнатура функции launch{} имеет вид:

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> Unit
): Job

Функция принимает три параметра и возвращает объект Job:

  1. context: Предоставляет контекст корутины, представленный интерфейсом CoroutineContext, который является индексированным набором инстансов Element.

  2. start: Опция запуска корутины. По умолчанию используется значение CoroutineStart.DEFAULT, которое запланирует корутину на немедленное выполнение. Мы также можем установить опцию start в CoroutineStart.LAZY для ленивого запуска корутины.

  3. block: Вызываемый код корутины. Он принимает функцию типа suspend CoroutineScope.() -> Unit

Новая корутина, запущенная с помощью функции launch{}, выглядит следующим образом:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    // вызов launch с передачей всех 3 параметров
    val job:Job = launch (EmptyCoroutineContext, CoroutineStart.DEFAULT){
        longRunningTask()
    }

    // Другой вариант вызова launch с передачей только одного параметра block
    // параметры context и start устанавливаются в свои значения по умолчанию
    val job1:Job = launch{longRunningTask()} 
    
    job.join()

    println("My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningTask(){
    println("executing longRunningTask on...: ${Thread.currentThread().name}")
    delay(1000)
    println("longRunningTask ends on thread ...: 
        ${Thread.currentThread().name}")
}

Здесь функция launch{} вызывается внутри функции runBlocking{}. Функция launch{} запускает корутину, которая выполнит функцию longRunningTask и сразу же вернет ссылку на объект Job.

Мы вызываем на этом объекте Job метод join(), который приостанавливает выполнение корутины, освобождая текущий поток для выполнения любых других действий (например, выполнения другой корутины).

Мы также можем использовать объект Job для отмены выполнения корутины.

Возврат результата suspend-функции в запустивший ее поток с помощью async

async — это еще один способ запуска корутины. Иногда, когда мы запускаем корутину, нам может потребоваться значение, которое должно быть возвращено из этой корутины обратно в поток, который ее запустил.

async запускает корутину параллельно, аналогично launch. Однако перед запуском очередной корутины он дожидается завершения уже запущенной. Сигнатура async выглядит следующим образом:

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> T
): Deferred<T>

Функция async{} принимает те же три параметра, что и функция launch{}, но вместо Job возвращает инстанс Deferred<T>. Получить из инстанса Deferred<T> результат вычислений, выполняемых в корутине, мы можем вызвав метод await().

Мы можем использовать async, как показано в этом примере:

fun main() = runBlocking{
    println("program runs...: ${Thread.currentThread().name}")

    val taskDeferred = async {
        generateUniqueID()
    }

    val taskResult = taskDeferred.await()

    println("program run ends...:  
        ${taskResult}  ${Thread.currentThread().name}")
}

suspend fun generateUniqueID(): String{
    println("executing generateUniqueID on...: 
        ${Thread.currentThread().name}")
    delay(1000)
    println("generateUniqueID ends on thread ...: 
        ${Thread.currentThread().name}")

    return UUID.randomUUID().toString()
}

В данном примере мы генерируем уникальный идентификатор в suspend-функции generateUniqueID, которая вызывается из корутины, запущенной с помощью async. Функция async возвращает инстанс Deffered<T>. По умолчанию тип T — Unit.

Здесь тип T — String, так как suspend-функция generateUniqueID возвращает значение типа String.

Далее мы вызываем на deferred-инстансе taskDeferred метод await() для извлечения результата.

Запустив программу, мы получим следующий результат:

program runs...: main
executing generateUniqueID on...: main
generateUniqueID ends on thread ...: main
program run ends...:  f18ac8c7-25ef-4755-8ab8-73c8219aadd3  main

Process finished with exit code 0

В этом выводе мы видим результат работы нашей suspend-функции.

Диспетчеры корутин: Определение потока для выполнения корутины

Диспетчер корутины определяет, какой поток или пул потоков корутина будет использовать для своего выполнения. Все корутины выполняются в контексте, представленном интерфейсом CoroutineContext. CoroutineContext представляет собой индексированный набор элементов и доступен внутри корутины через свойство CoroutineContext. Важным элементом этого индексированного набора является диспетчер корутины.

Диспетчер корутины может ограничить выполнение корутины определенным потоком, передать ее в пул потоков или позволить ей выполняться без ограничений.

Как мы видели в предыдущем разделе, корутин-билдеры launch{} и async{} принимают в качестве параметра в своей сигнатуре необязательный CoroutineContext:

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> T
): Deferred<T>

CoroutineContext используется для явного указания диспетчера для новой корутины. В Kotlin существует несколько реализаций CoroutineDispatchers, которые мы можем указывать при создании корутин с помощью корутин-билдеров launch и async. Давайте рассмотрим самые популярные диспетчеры:

Наследование диспетчера от родительской корутины

Когда функция launch{} используется без параметров, она наследует CoroutineContext (а значит, и диспетчер) от CoroutineScope, из которого она запускается. Проследим это поведение на примере, приведенном ниже:

fun main() = runBlocking {
    launch {
        println(
           "launch default: running in thread ${Thread.currentThread().name}")
        longTask()
    }
}

suspend fun longTask(){
    println("executing longTask on...: ${Thread.currentThread().name}")
    delay(1000)
    println("longTask ends on thread ...: ${Thread.currentThread().name}")
}

Здесь корутин-билдер launch{} наследует контекст и, следовательно, диспетчер из области видимости runBlocking-корутины, которая выполняется в потоке main. Следовательно, и корутина, запущенная корутин-билдером launch{}, будет использовать тот же диспетчер, который заставит корутину выполняться в главном потоке.

При запуске этой программы мы увидим следующее:

completed tasks
launch default: running in  thread main
executing longTask on...: main
longTask ends on thread ...: main

Process finished with exit code 0

Как видно из этого вывода, корутина, запущенная билдером launch{}, также выполняется в потоке main.

Диспетчер по умолчанию для выполнения операций с интенсивными вычислениями

Диспетчер по умолчанию используется, когда в области видимости явно не указан другой диспетчер. Он представлен Dispatchers.Default и использует общий фоновый пул потоков. Пул потоков имеет размер, равный количеству ядер на машине, на которой выполняется наш код, при этом минимальное количество потоков — 2.

Для проверки этого поведения выполним следующий код:

fun main() = runBlocking {
    repeat(1000) {
      launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
        println("Default  : running in thread ${Thread.currentThread().name}")
            longTask()
        }
    }
}

Приведем фрагмент вывода, показывающий потоки, используемые корутиной:

Default  : running in thread DefaultDispatcher-worker-1
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-4
Default  : running in thread DefaultDispatcher-worker-3
Default  : running in thread DefaultDispatcher-worker-5
Default  : running in thread DefaultDispatcher-worker-6
Default  : running in thread DefaultDispatcher-worker-7
Default  : running in thread DefaultDispatcher-worker-8
Default  : running in thread DefaultDispatcher-worker-9
Default  : running in thread DefaultDispatcher-worker-10
Default  : running in thread DefaultDispatcher-worker-3
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-6
Default  : running in thread DefaultDispatcher-worker-4

Здесь мы видим 10 потоков из нашего пула потоков, используемых для выполнения корутин.

Мы также можем использовать limitedParallelism для ограничения количества активно выполняемых параллельно корутин, как показано в данном примере ниже:

fun main() = runBlocking {
    repeat(1000) {
        // будет выполнена на DefaultDispatcher с 
        // ограничением на выполнение до 3 корутин параллельно
        val dispatcher = Dispatchers.Default.limitedParallelism(3)
        launch(dispatcher) {
            println("Default : running in thread ${Thread.currentThread().name}")
            longTask()
        }
    }
}

Здесь мы установили ограничение на выполнение до 3 корутин параллельно.

Создание нового потока с помощью newSingleThreadContext

newSingleThreadContext создает новый поток, который будет предназначен только для выполнения данной корутины. Этот диспетчер гарантирует, что корутина будет постоянно выполняться в определенном нами потоке:

fun main() = runBlocking {
    launch(newSingleThreadContext("MyThread")) { // получит свой собственный новый поток MyThread
        println("newSingleThreadContext: running in  thread ${Thread.currentThread().name}")
        longTask()
    }
    println("completed tasks")
}

В данном примере мы выполняем нашу корутину в специально выделенном потоке с именем MyThread, что видно из вывода, полученного при выполнении программы:

newSingleThreadContext: running in  thread MyThread

Process finished with exit code 0

Однако выделенный поток является дорогостоящим ресурсом. В реальном приложении поток должен быть либо освобожден, если он больше не нужен, с помощью функции close, либо сохранен по его ссылке в переменной верхнего уровня и повторно использован.

Запуск без ограничений на выполнение с помощью Dispatchers.Unconfined

Диспетчер Dispatchers.Unconfined запускает корутину в вызывающем потоке, но только до первой приостановки. После приостановки корутина возобновляет работу в потоке, который полностью определяется вызванной suspend-функцией.

Изменим наш предыдущий пример, чтобы теперь в функцию launch{} был передан параметр Dispatchers.Unconfined:

fun main() = runBlocking {
    launch(Dispatchers.Unconfined) { // нет ограничений — будет запущено в главном потокое
      println(
        "Unconfined : running in thread ${Thread.currentThread().name}")
      longTask()
    }
    println("completed tasks")
}

При запуске этой программы мы получаем следующий результат:

Unconfined : running in thread main
executing longTask on...: main   // запуск корутины
completed tasks  // вывод из главного потока после приостановки корутины
longTask ends on thread ...: kotlinx.coroutines.DefaultExecutor  // корутина возобновляется

Process finished with exit code 0

Как видно из этого вывода, сразу после вызова корутина начинает выполняться в потоке main. Ее выполнение приостанавливается, чтобы дать возможность главному потоку отработать остальные задачи. Корутина возобновляет работу в другом потоке (kotlinx.coroutines.DefaultExecutor) для выполнения оператора println в функции longTask.

Диспетчер Unconfined целесообразно использовать для тех корутин, которые не сильно нагружают процессор и не обновляют общие данные (например, пользовательский интерфейс), связанные с каким-нибудь конкретным потоком. Этот диспетчер не стоит использовать в общем коде. Он полезен в ситуациях, когда какая-нибудь операция в корутине должна быть выполнена немедленно.

Отмена выполнения корутины

Мы можем захотеть отменить долго выполняющиеся задачи до их завершения. Примером ситуации, когда мы хотим отменить задачу, может служить переход на другой экран в приложении с пользовательским интерфейсом (например, Android), и результат выполнения долго выполняющейся функции нас больше не интересует.

Еще один пример: мы хотим завершить процесс из-за какого-либо исключения и провести очистку, отменив все длительно выполняющиеся задания.

В одном из предыдущих примеров мы уже видели функцию launch{}, возвращающую объект Job. Объект Job предоставляет метод cancel() для отмены запущенной корутины, который мы можем использовать, как показано в данном примере:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    val job:Job = launch {

        longRunningFunction()
    }
    delay(1500) // задержка, чтобы программа не успела завершиться
    job.cancel() // отмена задачи
    job.join()  // ожидание отмены задачи

    // job.cancelAndJoin() // мы также можем сделать это за один присест

    println(
        "My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningFunction(){
    repeat(1000){ i ->
        println("executing :$i step on thread: ${Thread.currentThread().name}")
        delay(600)
    }
}

В данном примере мы выполняем оператор print из функции longRunningFunction через каждые 600 миллисекунд. Таким образом имитируется длинная функция с 1000 итераций, где в конце каждой итерации выполняется оператор print. При запуске этой программы мы получаем следующий вывод:

My program runs...: main
executing step 0 on thread: main
executing step 1 on thread: main
executing step 2 on thread: main
My program run ends...: main

Process finished with exit code 0

Мы видим, что функция longRunningFunction выполняется до шага 2, а затем останавливается после вызова cancel на объекте job. Вместо двух отдельных операторов cancel и join мы можем использовать функцию расширения Job: cancelAndJoin, которая объединяет вызовы cancel и join.

Отмена корутин

Как уже говорилось в предыдущем разделе, для экономии памяти и вычислительных ресурсов нам необходимо отменять корутины, чтобы не выполнять больше работы, чем нужно. Для этого нам необходимо контролировать жизненный цикл корутины и отменять ее, когда она больше не нужна.

Для того чтобы код корутины был отменяемым, мы должные его к этому подготовить. Нам необходимо убедиться, что весь код в корутине адекватно отреагирует на отмену, периодически или перед началом выполнения любой длительной задачи делая проверки статуса отмены.

Существует два подхода к тому, чтобы сделать код корутины отменяемым:

Периодический вызов suspend-функции yield

Мы можем периодически вызывать suspend-функцию yield для проверки статуса отмены корутина и уступать поток (или пул потоков) текущей корутины, чтобы позволить другим корутинам работать на том же потоке (или пуле потоков):

fun main() = runBlocking{
    try {
        val job1 = launch {
            repeat(20){
                println(
                 "processing job 1: ${Thread.currentThread().name}")
                yield()
            }
        }

        val job2 = launch {
            repeat(20){
                println(
                 "processing job 2: ${Thread.currentThread().name}")
                yield()
            }
        }

        job1.join()
        job2.join()

    } catch (e: CancellationException) {
        // код очистки

    }
}

Здесь мы запускаем две корутины, каждая из которых вызывает функцию yield, чтобы позволить другой корутине работать в потоке main. Ниже показан фрагмент вывода в результате выполнения этой программы:

processing job 1: main
processing job 2: main
processing job 1: main
processing job 2: main
processing job 1: main

Мы видим вывод первой корутины, после чего она вызывает команду yield. Это приостанавливает работу первой корутины и позволяет выполняться второй. Аналогичным образом вторая корутина также вызывает функцию yield и позволяет первой корутине возобновить выполнение.

Когда будет принято решение об отмене корутина выбросит исключение kotlinx.coroutines.JobCancellationException. Мы можем перехватить это исключение и запустить код очистки.

Явная проверка статуса отмены с помощью функции isActive

Мы также можем явно проверить статус отмены запущенной корутины с помощью свойства isActive, которое является расширением и доступно внутри корутины через объект CoroutineScope:

fun main() = runBlocking{
    println("program runs...: ${Thread.currentThread().name}")

    val job:Job = launch {
        val files = File ("<File Path>").listFiles()
        var loop = 0

        while (loop < files.size-1 ) {
            if(isActive) { // проверка статуса отмены
                readFile(files.get(++loop))
            }
        }
    }
    delay(1500)
    job.cancelAndJoin()

    println("program run ends...: ${Thread.currentThread().name}")
}

suspend fun readFile(file: File) {
    println("reading file ${file.name}")
    if (file.isFile) {
        // обработка файла
    }
    delay(100)
}

Здесь мы обрабатываем набор файлов из каталога. Перед обработкой каждого файла мы проверяем статус отмены с помощью свойства isActive. Свойство isActive возвращает значение true, если текущая задача все еще активна (не завершена и не отменена).

Заключение

В этой статье мы рассмотрели различные способы использования корутин в Kotlin. Вот некоторые важные моменты, которые следует запомнить:

  1. Корутина — это параллельный паттерн проектирования, используемый для написания асинхронных программ.

  2. Корутины — это вычисления, выполняемые поверх потоков, которые могут быть приостановлены и возобновлены.

  3. Когда корутина "приостанавливается", соответствующие вычисления останавливаются, сохраняются в памяти и удаляются из потока, освобождая его для выполнения других действий.

  4. Корутины запускаются корутин-билдерами, которые также задают область видимости.

  5. launch{}, async{} и runBlocking{} — это различные типы корутин-билдеров.

  6. Функция launch возвращает job, с помощью которой также можно отменить корутину. Функция async возвращает инстанс Deferred<T>. Из инстанса Deferred<T> мы можем получить результат вычислений, выполненных в корутине, вызвав метод await().

  7. С отменой корутин есть один нюанс. Для того чтобы код корутины был отменяемым, он должен быть к этому готов. В противном случае мы не сможем отменить его в процессе выполнения даже после вызова Job.cancel().

  8. Функция async запускает корутину параллельно, аналогично функции launch{}. Однако перед запуском следующей корутины она дожидается завершения работы предыдущей.

  9. Диспетчер корутин определяет поток или потоки, которые соответствующая корутина использует для своего выполнения. Диспетчер может ограничить выполнение корутины определенным потоком, передать ее в пул потоков или позволить ей выполняться без каких либо ограничений.

  10. Корутины являются более легкими по сравнению с потоками. Когда выполнение корутины приостанавливается, поток может заниматься другими задачами, что позволяет использовать один и тот же поток для выполнения нескольких корутин.

Со всем кодом, использованным в статье, можно ознакомиться на Github.

Напоследок всех разработчиков на Kotlin приглашаем на открытый урок, посвященный возможностям Kotlin для создания DSL — разбираться будем на примере JsonBuilder. Урок пройдет 18 октября, записаться можно на странице курса по бэкенд-разработке на Kotlin.

Комментарии (0)