В первой части статьи были рассмотрены следующие распространённые способы организации конкурентного кода при работе с потоками и корутинами: критические секции, атомарные переменные, реактивные переменные, барьерная синхронизация.

В этой части будут разобраны другие важные подходы:

  • Ограничение параллелизма:

    • Семафоры.

  • Синхронизированный обмен данными:

    • Каналы передачи данных.

    • Горячие потоки.

  • Thread confinement:

    • Модель акторов.

    • Последовательные обработчики задач.

    • Dispatcher confinement.

Для каждого из этих подходов рассмотрим конкретные реализации, выделим плюсы и минусы, приведём примеры использования. В конце объединим все реализации в общую таблицу.

Ограничение параллелизма

В первой части статьи уже были описаны критические секции, которые ограничивают участок кода и не позволяют выполнять его одновременно более чем одному потоку или корутине. В этом разделе рассмотрим менее строгий ограничитель параллелизма — семафоры.

Семафоры

Семафоры ограничивают количество потоков (или корутин), которые могут получить одновременный доступ к выбранному участку кода. В стандартной библиотеке корутин уже есть реализация семафора — kotlinx.coroutines.sync.Semaphore. Если не вдаваться в детали, то корутинная реализация семафора не будет принципиально отличаться от java.util.concurrent.Semaphore, за исключением очевидного — в kotlinx.coroutines.sync.Semaphore ограничивается количество корутин, а не количество потоков. Ограничение в виде количества корутин задаётся в конструкторе (параметр permits).

Пример использования семафоров в условиях параллелизма и ограниченной памяти:

val systemMemory = AtomicInteger(1_024)

fun main() = runBlocking {
    val coroutinesCount = 5
    val semaphore = Semaphore(coroutinesCount)

    Array(1_000) {
        async(Dispatchers.IO) {
            semaphore.withPermit {
                emulateHeavyTask()
            }
        }
    }.forEach { it.await() }

    println("Success!")
}

В данном коде память строго ограничена — systemMemory равна 1024. Есть задача emulateHeavyTask, которая требует ровно 200 единиц памяти. И если ее одновременно запустит больше пяти корутин, то сработает проверка assertTrue(«Not enough memory for task!»). Здесь семафор дает возможность обрабатывать задачи группами — не больше пяти корутин одновременно, что позволяет эффективно распределять ограниченные ресурсы.

Вот несколько типичных задач, для которых могут понадобиться семафоры:

  • Ограничение количества одновременных запросов к серверу. Помимо оптимизации, мы не превысим rate limit сервера.

  • Сохранение файлов на диск. Диски имеют конечную пропускную способность I/O, обычно нет смысла пытаться сохранять на диск параллельно десятки файлов — разумнее сохранять пачками по несколько файлов.

  • Контроль выделяемой памяти. Например, если нужно загрузить в память и обрабатывать несколько картинок одновременно, то здесь лучше ограничить параллелизм, чтобы не получить OutOfMemory.

Общий паттерн во всех случаях один: ключевой ресурс имеет конечную ёмкость, а корутин потенциально больше, чем ресурс способен обслужить. Если систему не ограничить, то она либо деградирует по производительности, либо получит отказы, либо упадёт.

Синхронизированный обмен данными

В этом разделе будут рассмотрены реализации методов для передачи данных между потоками или корутинами. В этом контексте ограничимся реализациями, которые входят в стандартную библиотеку и не требуют дополнительной синхронизации.

Каналы передачи данных

Канал передачи данных решает типичную задачу producer‑consumer: один поток производит данные и кладёт их в очередь, другой — забирает и обрабатывает. Стандартная реализация канала для Threads — это потокобезопасная очередь BlockingQueue. Пример использования BlockingQueue:

fun main() {
    val queue: BlockingQueue<String> = ArrayBlockingQueue(3)

    val producer1 = Thread {
        repeat(5) { index ->
            Thread.sleep(Random.nextLong(10, 40))
            val msg = "P1 -> $index"
            queue.put(msg)
            println("put: $msg")
        }
    }

    val producer2 = Thread {
        repeat(5) { index ->
            Thread.sleep(Random.nextLong(10, 40))
            val msg = "P2 -> $index"
            queue.put(msg)
            println("put: $msg")
        }
    }

    val consumer = Thread {
        while (true) {
            val msg = queue.take()
            if (msg == "STOP") break
            println("take: $msg")
            Thread.sleep(200)
        }
        println("consumer: done")
    }

    consumer.start()
    producer1.start()
    producer2.start()

    producer1.join()
    producer2.join()

    println("main: sending STOP")
    queue.put("STOP")

    consumer.join()
    println("main: done")
}

В данном коде создаётся очередь ArrayBlockingQueue с capacity=3 — это значит, что в ней может находиться одновременно только три элемента, а попытка добавить новые будет блокировать поток до тех пор, пока место не освободится.

  • Два producer потока отправляют несколько событий со случайным интервалом 10–40 миллисекунд, а третий поток consumer обрабатывает события каждые 200 миллисекунд.

  • Цикл потока обработчика не завершится, пока не получит сигнал остановки STOP.

  • Чтобы получить сигнал и завершить цикл, необходимо обработать все значения от producer1 и producer2, потому что они были отправлены раньше.

Результат выполнения:

put: P2 -> 0
take: P2 -> 0
put: P1 -> 0
put: P1 -> 1
put: P2 -> 1
take: P1 -> 0
put: P2 -> 2
take: P1 -> 1
put: P1 -> 2
take: P2 -> 1
put: P2 -> 3
take: P2 -> 2
put: P1 -> 3
take: P1 -> 2
put: P2 -> 4
take: P2 -> 3
put: P1 -> 4
main: sending STOP
take: P1 -> 3
take: P2 -> 4
take: P1 -> 4
consumer: done
main: done

По результатам видно, что capacity=3 работает — одновременно в очереди лежит не больше трех значений. Кроме того, можно заметить, что между main: sending STOP и consumer: done обрабатываются все оставшиеся значения в очереди.

Для корутин реализован класс Channel, который работает очень похоже на класс ArrayBlockingQueue. Основное отличие: когда канал переходит в ожидание следующего значения, он переводит корутину в состояние suspend, в отличие от ArrayBlockingQueue, который просто блокирует поток. Пример с ArrayBlockingQueue, представленный выше, один в один переписывается на корутины и каналы:

fun main() = runBlocking {
    val channel = Channel<String>(capacity = 3)

    val producer1 = launch {
        repeat(5) { index ->
            delay(Random.nextLong(10, 40))
            val msg = "P1 -> $index"
            channel.send(msg)
            println("send: $msg")
        }
    }

    val producer2 = launch {
        repeat(5) { index ->
            delay(Random.nextLong(10, 40))
            val msg = "P2 -> $index"
            channel.send(msg)
            println("send: $msg")
        }
    }

    val consumer = launch {
        for (msg in channel) {
            println("recv: $msg")
            delay(200)
        }

        println("consumer: done")
    }

    joinAll(producer1, producer2)
    println("main: closing channel")
    channel.close()

    consumer.join()
    println("main: done")
}

Результат выполнения аналогичен примеру с ArrayBlockingQueue. Кроме того, видно, что в случае с каналами специальный сигнал STOP (так называемый Poison pill) уже не нужен — есть стандартный метод close(), закрывающий канал. Если канал закрыт, то при отправке новых сообщений будет выброшено исключение ClosedSendChannelException. При этом все отправленные до момента закрытия сообщения могут быть прочитаны и после закрытия канала.

Горячие потоки

Горячий поток — это инструмент, который отправляет данные непрерывно независимо от наличия подписчиков. Новые подписчики получают только текущие и будущие данные, но не те, которые были отправлены до подписки.

Стандартной реализацией горячих потоков для Threads являются наследники интерфейсов java.util.concurrent.Flow.Publisher / Subscriber из Java Flow API, а также PublishSubject из библиотеки rxjava.

В корутинах есть своя стандартная реализация горячих потоков — SharedFlow. Помимо поддержки корутин, у SharedFlow из коробки есть набор преимуществ, которые упрощают работу с горячими потоками:

  • При создании MutableSharedFlow можно указать replay‑кэш, который определяет, сколько последних значений получит новый подписчик сразу после подписки, а также extraBufferCapacity — дополнительный буфер сверх replay. Он нужен, чтобы emit не останавливался сразу, если подписчики не успевают обрабатывать значения.

  • Есть встроенные политики переполнения буфера:

    • SUSPEND — приостановить emitter.

    • DROP_OLDEST — удалить самое старое значение.

    • DROP_LATEST — отбросить новое значение.

  • SharedFlow наследует все преимущества kotlinx.coroutines.flow: все методы преобразования, возможность задать dispatcher, фильтрацию данных, методы подписки и другие.

  • Все публичные методы SharedFlow помечены как thread‑safe и не требуют дополнительной синхронизации.

  • SharedFlow будет работать в Kotlin Multiplatform в отличие от своих ближайших аналогов для Threads.

Пример работы SharedFlow:

fun main() = runBlocking {
    val bus = MutableSharedFlow<String>(
        replay = 0,
        extraBufferCapacity = 3,
        onBufferOverflow = BufferOverflow.SUSPEND
    )

    val consumer = launch {
        bus.take(10).collect { message ->
            println("recv: $message")
            delay(100L)
        }
        println("consumer: done")
    }

    suspend fun produce(name: String) {
        repeat(5) { index ->
            delay(Random.nextLong(10, 30))
            val message = "$name -> $index"
            bus.emit(message)
            println("emitted: $message")
        }
    }

    val p1 = launch { produce("P1") }
    val p2 = launch { produce("P2") }

    joinAll(p1, p2)
    consumer.join()
}

В данном коде создаётся MutableSharedFlow с параметрами extraBufferCapacity=3 и onBufferOverflow=SUSPEND. Когда есть два быстрых producer и один медленный consumer, параметр extraBufferCapacity=3 позволяет producer быстро отправить три значения в SharedFlow и только на четвёртом уйти в suspend.

emitted: P2 -> 0
recv: P2 -> 0
emitted: P1 -> 0
emitted: P2 -> 1
emitted: P1 -> 1
recv: P1 -> 0
emitted: P1 -> 2
recv: P2 -> 1
emitted: P2 -> 2
recv: P1 -> 1
emitted: P1 -> 3
recv: P1 -> 2
emitted: P2 -> 3
recv: P2 -> 2
emitted: P1 -> 4
recv: P1 -> 3
emitted: P2 -> 4
recv: P2 -> 3
recv: P1 -> 4
recv: P2 -> 4
consumer: done

Как и ожидалось, одновременно в SharedFlow не хранится больше трех значений, а также ни одно значение не потерялось из‑за выбранной политики переполнения буфера SUSPEND. При выборе одной из политик DROP_* значения неизбежно бы терялись.

На первый взгляд, SharedFlow похож на Channel, но базовые различия довольно существенные:

  • На каждый отправленный в Channel объект может быть только один получатель, в то время как каждое отправленное событие в SharedFlow получают все активные подписчики.

  • У Channel есть понятие конца данных и возможность закрытия канала, в то время как SharedFlow нельзя «закрыть» в привычном понимании — только отписаться от получения событий.

  • Channel не является Flow, а значит, все возможности преобразования данных и фильтрации отсутствуют. Но есть метод receiveAsFlow, который преобразует Channel во Flow. О нюансах такого преобразования можно прочитать в документации к методу.

Thread confinement

Подход thread confinement не предполагает использования никаких явных примитивов синхронизации, но требует некоторой дисциплины для его применения. Суть подхода состоит в том, чтобы изолировать обработку переменных, объектов и функций в выделенные потоки. Таким образом гарантируется отсутствие конкуренции между разными ветвями исполнения кода.

Модель акторов

Модель акторов описывает систему как набор изолированных сущностей (акторов), которые не делят память напрямую. Вместо этого они взаимодействуют только через асинхронную отправку сообщений и обрабатывают их по одному — это гарантирует отсутствие гонок между потоками.

На уровне языка Kotlin не получится реализовать модель акторов полноценно, потому что общая память не может быть изолирована. Например, никто не помешает разработчику обмениваться данными между акторами через любую статическую переменную. Полноценно модель акторов раскрывает себя в языках типа Erlang, где её нельзя обойти, используя общую память. Тем не менее при условии соблюдения договоренностей между разработчиками, такая модель вполне применима и к Kotlin + coroutines.

В стандартной библиотеке Java нет полноценной реализации акторов. Обычно используются несложные самописные реализации через Executors.newSingleThreadExecutor() + BlockingQueue или внешние библиотеки с поддержкой акторов, например Akka или Apache Pekko.

Для корутин акторы были реализованы (kotlinx.coroutines.channels.actor), но по состоянию на середину 2026 года они помечены аннотацией @ObsoleteCoroutinesApi и их интерфейс или поведение, скорее всего, претерпят значительные изменения, поэтому не рекомендуются к использованию.

При этом простейший актор для корутин легко реализовать с помощью каналов:

class Actor<S, T>(
    scope: CoroutineScope,
    capacity: Int = Channel.BUFFERED,
    private val state: S,
    private val handler: suspend (S, T) -> Unit
) {
    private val mailbox = Channel<T>(capacity)
 
    private val job = scope.launch {
        for (message in mailbox) {
            handler(state, message)
        }
    }
 
    suspend fun send(message: T) {
        mailbox.send(message)
    }
 
    fun close() {
        mailbox.close()
    }
 
    suspend fun join() {
        job.join()
    }
}

Вот пример обработки сообщений таким актором с использованием Deferred для возврата ответа:

class BalanceState(var balance: Double)

sealed class BankMessage {
    class Deposit(val amount: Double) : BankMessage()
    class Withdraw(val amount: Double, val reply: CompletableDeferred<Boolean>) : BankMessage()
    class GetBalance(val reply: CompletableDeferred<Double>) : BankMessage()
}

class BankManager(private val scope: CoroutineScope) {

    private val actor = Actor<BalanceState, BankMessage>(
        scope = scope,
        state = BalanceState(0.0)
    ) { state, message -> 
        when (message) {
            is BankMessage.Deposit -> {
                state.balance += message.amount
                println("Deposited ${message.amount}, balance = ${state.balance}")
            }
            is BankMessage.Withdraw -> {
                if (state.balance >= message.amount) {
                    state.balance -= message.amount
                    println("Withdrew ${message.amount}, balance = ${state.balance}")
                    message.reply.complete(true)
                } else {
                    println("Insufficient funds for ${message.amount}, balance = ${state.balance}")
                    message.reply.complete(false)
                }
            }
            is BankMessage.GetBalance -> {
                message.reply.complete(state.balance)
            }
        }
    }
 
    suspend fun deposit(amount: Double) =
        actor.send(BankMessage.Deposit(amount))
 
    suspend fun withdraw(amount: Double): Boolean {
        val reply = CompletableDeferred<Boolean>()
        actor.send(BankMessage.Withdraw(amount, reply))
        return reply.await()
    }
 
    suspend fun getBalance(): Double {
        val reply = CompletableDeferred<Double>()
        actor.send(BankMessage.GetBalance(reply))
        return reply.await()
    }
}

Здесь актор используется для одной конкретной цели — проводить операции с балансом. Отправка сообщений в актор — единственный способ извне взаимодействовать с балансом, поэтому дополнительная синхронизация не требуется. Необходимые для актора аргументы передаются как поля класса BankMessage, а возврат ответа из актора реализован с помощью CompletableDeferred.

Последовательные обработчики задач

Иногда необходимо последовательно выполнять задачи на определенном потоке или Dispatcher. В таком случае будет ошибкой полагаться на такой код:

...
scope.launch(dispatcher) {
    // Task1.
}

scope.launch(dispatcher) {
    // Task2.
}
...

Строго говоря, не гарантируется, что Task2 выполнится после Task1. Можно модифицировать этот код через withContext или async/await вместо launch. Это будет работать для данного примера, но что если таких задач не две, а намного больше? Что, если задачи определены не в одном классе, а могут быть заданы из разных мест кода?

В Android‑разработке есть системный инструмент, который позволяет организовать очередь последовательно выполняемых задач — android.os.Handler. Метод post позволяет добавлять новые задачи в очередь для последовательного выполнения на выделенном HandlerThread. Это не единственное, что умеет Handler, но как абстракцию можно позаимствовать этот класс и на его основе написать собственный с поддержкой корутин:

class CoroutineHandler(baseDispatcher: CoroutineDispatcher) {

    private val exceptionHandler =
        CoroutineExceptionHandler { _, error -> onError(error) }

    private val scope = CoroutineScope(
        context = baseDispatcher.limitedParallelism(1) + SupervisorJob() + exceptionHandler,
    )

    private val queue = Channel<suspend () -> Unit>(Channel.UNLIMITED)

    init {
        scope.launch {
            while (currentCoroutineContext().isActive) {
                receiveQueueElement()
            }
        }
    }

    fun postAction(action: () -> Unit): Boolean =
        postSuspendAction { action() }

    fun postSuspendAction(action: suspend () -> Unit): Boolean =
        queue.trySend(action).isSuccess

    private suspend fun receiveQueueElement() {
        queue.receiveCatching()
            .onSuccess { action ->
                runCatching {
                    action.invoke()
                }.onFailure { throwable ->
                    onErrorOrCancellation(throwable)
                }
            }.onFailure { throwable ->
                 onErrorOrCancellation(throwable)
            }.onClosed {
                throw CancellationException()
            }
    }

    private fun onErrorOrCancellation(throwable: Throwable) {
             if (throwable is CancellationException) {
                throw throwable
            } else {
                onError(throwable)
            }
    }

    private fun onError(throwable: Throwable) {
        // Log error
    }
}

В CoroutineHandler вместо выделенного HandlerThread используется baseDispatcher.limitedParallelism(1), а в качестве очереди задач — Channel.

Рассмотрим пример использования CoroutineHandler применительно к реальной задаче — последовательной отправке событий аналитики:

class AnalyticsTracker(analytics: AnalyticsManager) {
    private val trackHandler: CoroutineHandler =  CoroutineHandler(Dispatchers.IO)

    init {
        analytics.initAnalytics()

        trackHandler.postSuspendAction {
            analytics.startSuspend()
        }
    }

    fun trackSingleEvent(eventName: String, eventBody: String) =
        trackHandler.postSuspendAction  {
            analytics.trackSingleEventSuspend(eventName, eventBody)
        }

    fun trackMultipleEvents(events: List<Event>) =
        trackHandler.postSuspendAction {
            analytics.trackMultipleEventsSuspend(events)
        }
}

В этом коде первым выполнится initAnalytics. Далее как первая задача в очереди выполнится startSuspend. Только после этого будет отправляться аналитика через потокобезопасные методы trackSingleEvent и trackMultipleEvents. Благодаря тому, что CoroutineHandler гарантирует последовательное выполнение, порядок отправки событий аналитики сохраняется.

Dispatcher confinement

Подход thread confinement лежит в основе архитектуры мобильных приложений Android и iOS. Характерная черта — выделение главного потока main, который используется в основном для задач, связанных с UI (обработка пользовательских событий, обновление интерфейса, методы lifecycle и др.). Попытки использовать его для долгих задач (вычислений, взаимодействия с базой данных или сетью) неизбежно приведут к зависанию или вылету приложения. Получается своего рода договор: если строго соблюдать архитектурную идею с main‑потоком, то не нужно думать, куда добавить synchronized или Mutex — всё работает без них. Но если договор нарушить, то возникает множество проблем.

В примере с CoroutineHandler используется не отдельный поток, а отдельный CoroutineScope с limitedParallelism(1), который гарантирует, что любые корутины, запущенные на данном CoroutineScope, не будут конкурировать между собой. Использование limitedParallelism(1) открывает возможность адаптации подхода thread confinement применительно к корутинам. Назовём этот новый подход dispatcher confinement.

Рассмотрим пример подхода dispatcher confinement на примере склада и отдела продаж:

fun main() = runBlocking {
    val warehouseDispatcher = Dispatchers.Default.limitedParallelism(1)
    val cashDispatcher = Dispatchers.Default.limitedParallelism(1)

    val warehouse = mutableMapOf("apple" to 10, "banana" to 5)
    var revenue = 0

    suspend fun sell(item: String, quantity: Int, price: Int) {
        val forSale = withContext(warehouseDispatcher) {
            val remainingInStock = warehouse[item] ?: 0
            val forSale = minOf(remainingInStock, quantity)
            val left = remainingInStock - forSale
            warehouse[item] = left
            println("Warehouse: forSale=$forSale $item, left=$left")
            forSale
        }

        if (forSale > 0) {
            withContext(cashDispatcher) {
                revenue += forSale * price
                println("cash: +${forSale * price}, revenue=$revenue")
            }
        }
    }

    coroutineScope {
        launch { sell("apple", 3, 2) }
        launch { sell("banana", 2, 4) }
        launch { sell("apple", 8, 2) }
    }

    println("Result: warehouse=$warehouse")
    println("Result: revenue=$revenue")
}

Процесс продажи товара состоит из двух этапов: на первом нужно взять товар со склада, на втором — рассчитать выручку. Склад работает на warehouseDispatcher, а отдел продаж — на cashDispatcher (оба помечены limitedParallelism(1)). Асинхронно запускается несколько процессов продаж через launch.

В данном примере мы как разработчики соблюдаем границы ответственности склада и отдела продаж, они работают независимо друг от друга. Такая дисциплина при написании кода позволила нам полностью отказаться от какой‑либо синхронизации переменных warehouse и revenue.

Результат выполнения кода:

Warehouse: forSale=3 apple, left=7
Warehouse: forSale=2 banana, left=3
Warehouse: forSale=7 apple, left=0
cash: +6, revenue=6
cash: +8, revenue=14
cash: +14, revenue=28
Result: warehouse={apple=0, banana=3}
Result: revenue=28

По выводу видно, что гонок потоков/корутин при работе склада и отдела продаж нет — все отгружается и рассчитывается правильно.

Заключение

Вывод вряд ли удивит разработчиков, которые вплотную сталкивались с многопоточным программированием — The best synchronization is no synchronization. Если можно организовать многопоточный код так, чтобы синхронизация не понадобилась вовсе, то лучше так и сделать. Поэтому для корутин подход thread confinement (или, как мы его назвали для корутин, dispatcher confinement) выглядит более выигрышным.

Если синхронизация всё же необходима, то kotlinx.coroutines из коробки предоставляет как реализации примитивов синхронизации (Semaphore, Mutex), так и более сложные классы, которые могут применяться для организации многопоточной разработки (Channel, StateFlow, SharedFlow…).

Даже если необходимый вам класс для синхронизации потоков (например, CountDownLatch, CyclicBarrier, Actor…) не был реализован применительно к корутинам, то на основе стандартных примитивов несложно создать аналог этого класса.

Рассмотрим итоговую таблицу со всеми рассмотренными подходами:

Подход

Реализации для потоков

Реализации для корутин

Критические секции

synchronized / @Synchronized

ReentrantLock

kotlinx.coroutines.sync.Mutex

Критические секции с разделением чтения/записи

ReentrantReadWriteLock

Стандартных реализаций нет, есть только Pull Request к GitHub Kotlin/kotlinx.coroutines.

Ограничение параллелизма

java.util.concurrent.Semaphore

kotlinx.coroutines.sync.Semaphore

Атомарные переменные

volatile / @Volatile (неуниверсальный вариант для атомарности)

java.util.concurrent.atomic.*

kotlinx.atomicfu.* (более современная реализация с поддержкой Kotlin Multiplatform)

Барьерная синхронизация

CyclicBarrier

CountDownLatch

Phaser

Стандартных реализаций нет, есть пример реализации в первой части статьи

Реактивные переменные

LiveData (Android)

BehaviorSubject (RxJava)

StateFlow / MutableStateFlow

Горячие потоки

SubmissionPublisher

PublishSubject (RxJava)

SharedFlow / MutableSharedFlow

Каналы передачи данных

ArrayBlockingQueue

LinkedBlockingQueue

Channel

Модель акторов

Akka / Apache Pekko

Стандартная реализация помечена Obsolete и не рекомендована к использованию, есть пример реализации в этой части статьи

Последовательные обработчики задач

newSingleThreadExecutor()

Handler (Android)

Стандартных реализаций нет, есть пример реализации в этой части статьи

Вопросы, замечания и дополнения по теме статьи приветствуются в комментариях.

Комментарии (0)