Привет, Хабр!
Сегодня мы рассмотрим, как правильно переключать контексты в Kotlin Flow и почему flowOn
— это не то же самое, что withContext
.
Зачем вообще переключать контексты
В Kotlin‑корутинах потоки сами по себе недорогие — запускать их можно десятками тысяч. Но сама по себе корутина не даёт гарантий, что она работает в нужном месте. Контекст (то есть CoroutineDispatcher
) — это фактический выбор, на каком треде выполнится код. И если вы запустили flow
из UI, но не передали тяжёлую логику в Dispatchers.IO
, весь код, включая медленные запросы к базе или файловую систему, поедет в главном потоке.
Второй момент — инварианты потока. Kotlin Flow внутри себя строго следит за тем, чтобы всё исполнялось последовательно и в одном контексте. Если внезапно воткнуть withContext
или emit()
из другого диспатчера — получите IllegalStateException: Flow invariant is violated
. Эта ошибка как раз и говорит: «ты нарушил правила — emit пошёл с другого потока». Решается она только одним способом — использовать flowOn
, который создаёт буфер и грамотно отделяет upstream от downstream.
Ну и третий момент — гонки данных. Если в процессе эмита вы параллельно переключаете контексты, обрабатываете значения и отправляете их в общий StateFlow
, при неправильной синхронизации можно легко поймать гонку: два потока записывают одно значение одновременно. На тестах всё будет хорошо, а в работе может появится редкий баг, который трудно отловить. Поэтому переключать контекст нужно централизованно, понятно и через flowOn
— это безопасный, ожидаемый и проверяемый способ.
Как работает flowOn
flowOn
— это оператор, который определяет где и на каком контексте будет исполняться апстрим. Апстрим — это всё, что идёт выше по цепочке после flow {}
или любого другого оператора, возвращающего Flow
. Если вы пишете flowOn(Dispatchers.IO)
, это значит: «всё, что выше — emit
, map
, filter
и прочее — пусть выполняется на Dispatchers.IO
». А вот всё, что ниже, то есть после flowOn
, останется в том контексте, где находится коллектор.
Пример:
val usersFlow = flow {
logThread("start emit")
emit(loadUsers())
}.flowOn(Dispatchers.IO)
.map { user -> enrich(user) }
Что произойдёт:
loadUsers()
будет вызван на IOmap { enrich(...) }
выполнится уже в том контексте, где вы коллектите flow (например,Main
илиDefault
)
Эта особенность — разделение контекста между апстримом и даунстримом — реализуется через вставку промежуточного буфера. Именно flowOn
делает паузу между частями цепочки и не позволяет апстриму блокировать даунстрим. Если loadUsers()
вдруг начнёт тормозить, это не заблокирует collect
, потому что между ними есть буфер. Именно поэтому flowOn
работает адекватно с тяжёлыми операциями.
Также стоит понимать, что flowOn
влияет только на цепочку выше себя. Он не захватывает весь флоу целиком. Пример неправильного ожидания:
flow {
emit(doSomething())
}.map { heavyCompute(it) }
.flowOn(Dispatchers.Default)
heavyCompute
выполнится не на Dispatchers.Default
, а в том же контексте, где идёт collect
, потому что flowOn
влияет только на emit
и всё выше. Если вы хотите, чтобы и emit
, и map
были на Default
, нужно переставить flowOn
выше:
flow {
emit(doSomething())
}
.flowOn(Dispatchers.IO)
.map { heavyCompute(it) }
.flowOn(Dispatchers.Default)
Можно ставить flowOn
несколько раз — и это как раз и позволяет строить гибкие пайплайны, где каждый участок выполняется на подходящем пуле. Главное помнить то, что читаются flowOn
слева направо, и каждый из них влияет только на всё, что выше.
flowOn
— это единственный рекомендуемый способ менять контекст выполнения кода внутри Flow
, без риска нарушить потоковую модель, потерять элементы или нарваться на IllegalStateException
.
Как работает withContext (и почему это не вариант)
withContext
— это suspend‑функция, которая временно переключает выполнение кода в другой CoroutineContext
. Она идеально подходит для задач вроде withContext(Dispatchers.IO) { readFromDisk() }
, когда нужно выполнить единичную тяжёлую операцию в нужном диспатчере. Но внутри Flow
всё совсем не так просто.
Если вы вызываете withContext
внутри тела flow {}
или любого оператора обработки типа map
, вы не просто меняете контекст. Вы вставляете suspend
‑точку на каждый элемент и переключаете тред каждый раз, когда обрабатывается элемент. Это значит, что если у вас тысяча элементов — вы сделаете тысячу переключений тредов. А это уже неплохая такая нагрузка.
val wrongFlow = flow {
(1..1000).forEach { id ->
emit(withContext(Dispatchers.IO) {
loadById(id)
})
}
}
Выглядит невинно, но фактически вы делаете Dispatchers.IO
→ flow
контекст → Dispatchers.IO
… тысячу раз. В многопоточном приложении это может привести к:
лишней нагрузке на пул потоков, особенно если он ограничен;
задержке исполнения, если
withContext
создаёт очередь задач;непредсказуемому поведению и просадке пропускной способности — особенно при комбинировании с другими suspend‑функциями.
Самое неприятное: это может не вызывать ошибок прямо сейчас, но ухудшает масштабируемость и стабильность. Если при этом emit()
происходит с другого контекста, нарушается инвариант потока. Именно об этом предупреждает официальная документация: emit
должен вызываться в том же контексте, что и запуск collect
, если flowOn
не применялся. А withContext
ломает это правило — и вы получаете IllegalStateException: Flow invariant is violated
.
Ещё пример:
flow {
val data = withContext(Dispatchers.IO) {
heavyQuery()
}
emit(data)
}
На первый взгляд безопасно. Но если collect
вызывается в другом контексте (например, в Main
), emit(data)
происходит не в том же контексте, где collect
, и Kotlin Flow выбрасывает исключение.
withContext
— инструмент не для Flow
. Он хорош в точках входа (например, внутри ViewModel), но внутри цепочки его почти всегда нужно заменять на flowOn
. Так вы сохраняете потоковую модель, избегаете лишней нагрузки на планировщик и не рискуете получить баги, которые появятся только в нагрузке или в бою.
Мини-бенчмарк
Упрощённая метрика, чтобы почувствовать проблему переключений:
suspend fun benchmark() {
val flowOnTime = measureTimeMillis {
(1..1_000_000).asFlow()
.map { it + 1 } // CPU
.flowOn(Dispatchers.Default)
.collect()
}
val withContextTime = measureTimeMillis {
(1..1_000_000).asFlow()
.map { withContext(Dispatchers.Default) { it + 1 } }
.collect()
}
println("flowOn: $flowOnTime ms")
println("withContext: $withContextTime ms")
}
flowOn: 150–250 ms
withContext: 3000–6000 ms
В среднем на десктопе flowOn‑вариант быстрее, потому что переключается ровно один раз и держит CPU‑часть отдельной цепочкой.
flowOn(Dispatchers.Default)
один раз переключает апстрим на нужный диспатчер, и вся обработка выполняется поточно, без лишних переключений между тредами.
withContext(Dispatchers.Default)
внутри map
делает миллион тред‑свитчей, по одному на каждый элемент. Каждый withContext
— это suspend, переключение, возврат, и всё это в цепочке. Суммарные накладные расходы колоссальны.
Если дополнительно включить профилировку корутин (-Dkotlinx.coroutines.debug
или setprop debug.coroutines.enable_creation_stack_trace true
), можно увидеть, как создаются миллионы короткоживущих корутин и как они давят на планировщик.
Итоги
flowOn
и withContext
решают схожую задачу, но делают это кардинально разными путями. flowOn
— это декларативное переключение апстрима с сохранением инвариантов и минимальными затратами; withContext
— точечное, иногда дорогое и часто ненужное. Если держать в голове эту разницу и не забывать про backpressure, ваши Flows будут быстрыми и адеквтаными.
Если вы работаете с Java и задумывались о переходе на Kotlin для серверной разработки, приглашаем вас на два открытых урока курса Kotlin Backend Developer. Professional:
12 августа в 20:00 — «Почему все переходят на Kotlin? Секреты успешной миграции с Java для бэкенд‑разработчиков». Разберём подходы к постепенной миграции и реальные сценарии перехода на Kotlin в продакшне.
19 августа в 20:00 — «Нововведения Kotlin 1.9–2.2 для JVM». Обсудим изменения последних версий и их влияние на разработку серверных приложений.
Кроме того, пройдите вступительное тестирование, чтобы оценить свой уровень и узнать, подойдет ли вам программа курса «Kotlin Backend Developer. Professional».
mayorovp
Да с какого перепугу?.. flowOn влияет на всё выше, heavyCompute находится выше, почему на него оно не должно влиять?
Опять-таки, с какого перепугу? Разве withContext не восстанавливает исходный контекст при выходе?