Привет, Хабр!
В этой статье рассмотрим, как работает Kotlin Flow — инструмент для асинхронной обработки данных. Flow позволяет легко получать данные по мере их готовности, не блокируя основной поток, а также управлять отменой, обработкой ошибок и сменой контекста.
Основные концепции и функции Flow
Начнём с основ. Если вы знакомы с коллекциями, то Flow — это коллекция, которая выдаёт все элементы не сразу, а постепенно, по мере их готовности.
Для сравнения возьмём простой пример коллекции:
fun simpleList(): List<Int> = listOf(1, 2, 3)
fun main() {
simpleList().forEach { println(it) }
}
Здесь все значения доступны сразу, и цикл отрабатывает синхронно. Но если вычисление элементов требует времени, можно воспользоваться последовательностями:
fun simpleSequence(): Sequence<Int> = sequence {
for (i in 1..3) {
Thread.sleep(100) // имитируем длительное вычисление
yield(i)
}
}
fun main() {
simpleSequence().forEach { println(it) }
}
Последовательности позволяют выдавать значения по одному, но при этом остаются синхронными и блокируют поток. Тут хорошо помогут suspend‑функции и, собственно, Flow.
Suspend‑функция позволяет выполнять операции асинхронно, не блокируя поток:
suspend fun simpleSuspend(): List<Int> {
delay(1000) // имитируем асинхронную операцию
return listOf(1, 2, 3)
}
fun main() = runBlocking {
simpleSuspend().forEach { println(it) }
}
Чтобы возвращать значения постепенно и асинхронно, мы используем Flow. Пример базового Flow:
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // не блокируем поток, ждем готовности данных
emit(i) // эмитируем значение в поток
}
}
fun main() = runBlocking {
simpleFlow().collect { value -> println(value) }
}
Flow — это холодный поток, что означает, что код внутри flow {... } выполняется заново при каждом вызове collect()
.
Операторы и управление потоками
Flow предлагает богатый набор операторов для трансформации и управления данными, аналогичный тем, что используются в коллекциях, но с поддержкой асинхронных вызовов.
Фильтрация и преобразование данных. Например, если нужно отфильтровать чётные числа и преобразовать их в строки:
fun main() = runBlocking {
(1..5).asFlow()
.filter {
println("Фильтруем: $it")
it % 2 == 0
}
.map {
println("Мапим: $it")
"Число: $it"
}
.collect { println("Коллектим: $it") }
}
Оператор transform
предоставляет возможность эмитировать произвольное количество значений на основе одного элемента:
fun main() = runBlocking {
(1..3).asFlow()
.transform { value ->
emit("Начинаем обработку $value")
delay(50) // можем вызывать suspend-функции внутри transform
emit("Закончили обработку $value")
}
.collect { println(it) }
}
Отмена и смена контекста. Flow поддерживает отмену, как и все корутины. Если требуется остановить сбор данных, можно использовать отмену внутри collect()
. Например:
fun cancellableFlow(): Flow<Int> = flow {
for (i in 1..5) {
println("Эмитим $i")
emit(i)
}
}
fun main() = runBlocking {
cancellableFlow().collect { value ->
if (value == 3) cancel() // отменяем сбор при достижении определенного значения
println("Получили $value")
}
}
Если в цикле нет suspend‑функций, отмена может не сработать мгновенно. Для таких случаев есть оператор cancellable()
:
fun main() = runBlocking {
(1..5).asFlow()
.cancellable()
.collect { value ->
if (value == 3) cancel()
println("Число: $value")
}
}
Иногда нужно, чтобы эмитирование значений происходило в другом контексте. Не стоит пытаться менять контекст с помощью withContext
внутри flow
— используйте для этого оператор flowOn()
:
fun cpuIntensiveFlow(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // эмулируем тяжелую работу
println("Эмитим $i в потоке ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking {
cpuIntensiveFlow().collect { value ->
println("Получили $value в потоке ${Thread.currentThread().name}")
}
}
Для ситуации, когда эмиттер и коллекционер работают с разной скоростью, полезны операторы buffer()
, conflate()
и collectLatest()
. Они позволяют избежать «узких мест» при обработке данных, буферизуя или пропуская промежуточные значения.
Пример интеграции
Допустим, есть у нас UI‑приложение, которое должно регулярно получать обновления с сервера. Вместо того, чтобы создавать бесконечный цикл в активности, можно просто создать Flow, который будет выдавать новые данные каждые несколько секунд.
Представим, что есть два источника новостей, и нужно объединить их потоки, фильтровать дубликаты и выводить данные в реальном времени:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
// Модель данных для новости
data class NewsArticle(
val id: Int,
val source: String,
val headline: String,
val timestamp: Long
)
// Имитация сетевого запроса для получения новостей с заданного источника
object NetworkClient {
suspend fun fetchNews(source: String): List<NewsArticle> {
delay(500) // имитация задержки сети
if (Random.nextInt(0, 10) < 2) {
throw RuntimeException("Ошибка сети при получении новостей от $source")
}
val currentTime = System.currentTimeMillis()
return List(3) {
NewsArticle(
id = Random.nextInt(1000, 9999),
source = source,
headline = "Срочная новость от $source №${Random.nextInt(100)}",
timestamp = currentTime
)
}
}
}
// Функция-генератор потока новостей для заданного источника. Каждые intervalMs миллисекунд производится опрос сервера.
fun newsFlow(source: String, intervalMs: Long): Flow<NewsArticle> = flow {
while (true) {
try {
println("[$source] Опрашиваем источник...")
val articles = NetworkClient.fetchNews(source)
articles.forEach { article -> emit(article) }
} catch (e: Exception) {
println("[$source] Ошибка: ${e.message}")
}
delay(intervalMs)
}
}.flowOn(Dispatchers.IO)
// Основной пример: агрегатор новостей. Объединяем потоки, фильтруем дубликаты и выводим данные.
fun main() = runBlocking {
println("=== Запуск агрегатора новостей ===")
// Создаем потоки для двух источников с разными интервалами опроса
val sourceAFlow = newsFlow("ИсточникA", intervalMs = 3000)
val sourceBFlow = newsFlow("ИсточникB", intervalMs = 5000)
// Объединяем потоки с помощью merge, буферизуем до 50 элементов и фильтруем дубликаты по id
val mergedNewsFlow = merge(sourceAFlow, sourceBFlow)
.buffer(50)
.distinctBy { it.id }
// Запускаем сбор новостей в отдельном Job для возможности отмены
val aggregatorJob = launch {
mergedNewsFlow
.onEach { article ->
// Здесь можно обновлять UI, сохранять в БД и пр.
println("Получена новость: [${article.source}] ${article.headline} (время: ${article.timestamp})")
}
.catch { e -> println("Ошибка обработки новостей: ${e.message}") }
.onCompletion { cause ->
if (cause != null) println("Агрегатор завершился с ошибкой: $cause")
else println("Агрегатор успешно завершил сбор новостей.")
}
.collect()
}
// Даем агрегатору работать 20 секунд, затем корректно останавливаем сбор данных
delay(20000)
println("=== Остановка агрегатора новостей ===")
aggregatorJob.cancelAndJoin()
println("=== Агрегатор остановлен ===")
}
В консоли будут периодически появляться сообщения вроде «[ИсточникA] Опрашиваем источник...» и «Получена новость: [ИсточникB] Срочная новость...». Если при запросе возникнут ошибки, вы увидите соответствующие сообщения. По истечении 20 секунд работы агрегатора появится уведомление об остановке, и сбор новостей корректно завершится.
18 февраля в 19:00 пройдёт открытый урок на тему «Разработка монолитного приложения со Spring».
Поговорим о преимуществах и недостатках монолитной архитектуры и фреймворка Spring, а также об особенностях разработки со Spring в Kotlin. Попрактикуемся в разработке работающего монолитного приложения и разместим его в Docker-контейнере. Записаться