Привет, Хабр!
Сегодня мы рассмотрим, как правильно переключать контексты в Kotlin Flow и почему flowOn — это не то же самое, что withContext.
Зачем вообще переключать контексты
В Kotlin‑корутинах потоки сами по себе недорогие — запускать их можно десятками тысяч. Но сама по себе корутина не даёт гарантий, что она работает в нужном месте. Контекст (то есть CoroutineDispatcher) — это фактический выбор, на каком треде выполнится код. И если вы запустили flow из UI, но не передали тяжёлую логику в Dispatchers.IO, весь код, включая медленные запросы к базе или файловую систему, поедет в главном потоке.
Второй момент — инварианты потока. Kotlin Flow внутри себя строго следит за тем, чтобы всё исполнялось последовательно и в одном контексте. Если внезапно воткнуть withContext или emit() из другого диспатчера — получите IllegalStateException: Flow invariant is violated. Эта ошибка как раз и говорит: «ты нарушил правила — emit пошёл с другого потока». Решается она только одним способом — использовать flowOn, который создаёт буфер и грамотно отделяет upstream от downstream.
Ну и третий момент — гонки данных. Если в процессе эмита вы параллельно переключаете контексты, обрабатываете значения и отправляете их в общий StateFlow, при неправильной синхронизации можно легко поймать гонку: два потока записывают одно значение одновременно. На тестах всё будет хорошо, а в работе может появится редкий баг, который трудно отловить. Поэтому переключать контекст нужно централизованно, понятно и через flowOn — это безопасный, ожидаемый и проверяемый способ.
Как работает flowOn
flowOn — это оператор, который определяет где и на каком контексте будет исполняться апстрим. Апстрим — это всё, что идёт выше по цепочке после flow {} или любого другого оператора, возвращающего Flow. Если вы пишете flowOn(Dispatchers.IO), это значит: «всё, что выше — emit, map, filter и прочее — пусть выполняется на Dispatchers.IO». А вот всё, что ниже, то есть после flowOn, останется в том контексте, где находится коллектор.
Пример:
val usersFlow = flow {
logThread("start emit")
emit(loadUsers())
}.flowOn(Dispatchers.IO)
.map { user -> enrich(user) }
Что произойдёт:
loadUsers()будет вызван на IOmap { enrich(...) }выполнится уже в том контексте, где вы коллектите flow (например,MainилиDefault)
Эта особенность — разделение контекста между апстримом и даунстримом — реализуется через вставку промежуточного буфера. Именно flowOn делает паузу между частями цепочки и не позволяет апстриму блокировать даунстрим. Если loadUsers() вдруг начнёт тормозить, это не заблокирует collect, потому что между ними есть буфер. Именно поэтому flowOn работает адекватно с тяжёлыми операциями.
Также стоит понимать, что flowOn влияет только на цепочку выше себя. Он не захватывает весь флоу целиком. Пример неправильного ожидания:
flow {
emit(doSomething())
}.map { heavyCompute(it) }
.flowOn(Dispatchers.Default)
heavyCompute выполнится не на Dispatchers.Default, а в том же контексте, где идёт collect, потому что flowOn влияет только на emit и всё выше. Если вы хотите, чтобы и emit, и map были на Default, нужно переставить flowOn выше:
flow {
emit(doSomething())
}
.flowOn(Dispatchers.IO)
.map { heavyCompute(it) }
.flowOn(Dispatchers.Default)
Можно ставить flowOn несколько раз — и это как раз и позволяет строить гибкие пайплайны, где каждый участок выполняется на подходящем пуле. Главное помнить то, что читаются flowOn слева направо, и каждый из них влияет только на всё, что выше.
flowOn — это единственный рекомендуемый способ менять контекст выполнения кода внутри Flow, без риска нарушить потоковую модель, потерять элементы или нарваться на IllegalStateException.
Как работает withContext (и почему это не вариант)
withContext — это suspend‑функция, которая временно переключает выполнение кода в другой CoroutineContext. Она идеально подходит для задач вроде withContext(Dispatchers.IO) { readFromDisk() }, когда нужно выполнить единичную тяжёлую операцию в нужном диспатчере. Но внутри Flow всё совсем не так просто.
Если вы вызываете withContext внутри тела flow {} или любого оператора обработки типа map, вы не просто меняете контекст. Вы вставляете suspend‑точку на каждый элемент и переключаете тред каждый раз, когда обрабатывается элемент. Это значит, что если у вас тысяча элементов — вы сделаете тысячу переключений тредов. А это уже неплохая такая нагрузка.
val wrongFlow = flow {
(1..1000).forEach { id ->
emit(withContext(Dispatchers.IO) {
loadById(id)
})
}
}
Выглядит невинно, но фактически вы делаете Dispatchers.IO → flow контекст → Dispatchers.IO… тысячу раз. В многопоточном приложении это может привести к:
лишней нагрузке на пул потоков, особенно если он ограничен;
задержке исполнения, если
withContextсоздаёт очередь задач;непредсказуемому поведению и просадке пропускной способности — особенно при комбинировании с другими suspend‑функциями.
Самое неприятное: это может не вызывать ошибок прямо сейчас, но ухудшает масштабируемость и стабильность. Если при этом emit() происходит с другого контекста, нарушается инвариант потока. Именно об этом предупреждает официальная документация: emit должен вызываться в том же контексте, что и запуск collect, если flowOn не применялся. А withContext ломает это правило — и вы получаете IllegalStateException: Flow invariant is violated.
Ещё пример:
flow {
val data = withContext(Dispatchers.IO) {
heavyQuery()
}
emit(data)
}
На первый взгляд безопасно. Но если collect вызывается в другом контексте (например, в Main), emit(data) происходит не в том же контексте, где collect, и Kotlin Flow выбрасывает исключение.
withContext — инструмент не для Flow. Он хорош в точках входа (например, внутри ViewModel), но внутри цепочки его почти всегда нужно заменять на flowOn. Так вы сохраняете потоковую модель, избегаете лишней нагрузки на планировщик и не рискуете получить баги, которые появятся только в нагрузке или в бою.
Мини-бенчмарк
Упрощённая метрика, чтобы почувствовать проблему переключений:
suspend fun benchmark() {
val flowOnTime = measureTimeMillis {
(1..1_000_000).asFlow()
.map { it + 1 } // CPU
.flowOn(Dispatchers.Default)
.collect()
}
val withContextTime = measureTimeMillis {
(1..1_000_000).asFlow()
.map { withContext(Dispatchers.Default) { it + 1 } }
.collect()
}
println("flowOn: $flowOnTime ms")
println("withContext: $withContextTime ms")
}
flowOn: 150–250 ms
withContext: 3000–6000 ms
В среднем на десктопе flowOn‑вариант быстрее, потому что переключается ровно один раз и держит CPU‑часть отдельной цепочкой.
flowOn(Dispatchers.Default) один раз переключает апстрим на нужный диспатчер, и вся обработка выполняется поточно, без лишних переключений между тредами.
withContext(Dispatchers.Default) внутри map делает миллион тред‑свитчей, по одному на каждый элемент. Каждый withContext — это suspend, переключение, возврат, и всё это в цепочке. Суммарные накладные расходы колоссальны.
Если дополнительно включить профилировку корутин (-Dkotlinx.coroutines.debug или setprop debug.coroutines.enable_creation_stack_trace true), можно увидеть, как создаются миллионы короткоживущих корутин и как они давят на планировщик.
Итоги
flowOn и withContext решают схожую задачу, но делают это кардинально разными путями. flowOn — это декларативное переключение апстрима с сохранением инвариантов и минимальными затратами; withContext — точечное, иногда дорогое и часто ненужное. Если держать в голове эту разницу и не забывать про backpressure, ваши Flows будут быстрыми и адеквтаными.
Если вы работаете с Java и задумывались о переходе на Kotlin для серверной разработки, приглашаем вас на два открытых урока курса Kotlin Backend Developer. Professional:
12 августа в 20:00 — «Почему все переходят на Kotlin? Секреты успешной миграции с Java для бэкенд‑разработчиков». Разберём подходы к постепенной миграции и реальные сценарии перехода на Kotlin в продакшне.
19 августа в 20:00 — «Нововведения Kotlin 1.9–2.2 для JVM». Обсудим изменения последних версий и их влияние на разработку серверных приложений.
Кроме того, пройдите вступительное тестирование, чтобы оценить свой уровень и узнать, подойдет ли вам программа курса «Kotlin Backend Developer. Professional».
mayorovp
Да с какого перепугу?.. flowOn влияет на всё выше, heavyCompute находится выше, почему на него оно не должно влиять?
Опять-таки, с какого перепугу? Разве withContext не восстанавливает исходный контекст при выходе?