Привет, Хабр!

В этой статье рассмотрим, как работает Kotlin Flow — инструмент для асинхронной обработки данных. Flow позволяет легко получать данные по мере их готовности, не блокируя основной поток, а также управлять отменой, обработкой ошибок и сменой контекста.

Основные концепции и функции Flow

Начнём с основ. Если вы знакомы с коллекциями, то Flow — это коллекция, которая выдаёт все элементы не сразу, а постепенно, по мере их готовности.

Для сравнения возьмём простой пример коллекции:

fun simpleList(): List<Int> = listOf(1, 2, 3)

fun main() {
    simpleList().forEach { println(it) }
}

Здесь все значения доступны сразу, и цикл отрабатывает синхронно. Но если вычисление элементов требует времени, можно воспользоваться последовательностями:

fun simpleSequence(): Sequence<Int> = sequence {
    for (i in 1..3) {
        Thread.sleep(100) // имитируем длительное вычисление
        yield(i)
    }
}

fun main() {
    simpleSequence().forEach { println(it) }
}

Последовательности позволяют выдавать значения по одному, но при этом остаются синхронными и блокируют поток. Тут хорошо помогут suspend‑функции и, собственно, Flow.

Suspend‑функция позволяет выполнять операции асинхронно, не блокируя поток:

suspend fun simpleSuspend(): List<Int> {
    delay(1000) // имитируем асинхронную операцию
    return listOf(1, 2, 3)
}

fun main() = runBlocking {
    simpleSuspend().forEach { println(it) }
}

Чтобы возвращать значения постепенно и асинхронно, мы используем Flow. Пример базового Flow:

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // не блокируем поток, ждем готовности данных
        emit(i)    // эмитируем значение в поток
    }
}

fun main() = runBlocking {
    simpleFlow().collect { value -> println(value) }
}

Flow — это холодный поток, что означает, что код внутри flow {... } выполняется заново при каждом вызове collect().

Операторы и управление потоками

Flow предлагает богатый набор операторов для трансформации и управления данными, аналогичный тем, что используются в коллекциях, но с поддержкой асинхронных вызовов.

Фильтрация и преобразование данных. Например, если нужно отфильтровать чётные числа и преобразовать их в строки:

fun main() = runBlocking {
    (1..5).asFlow()
        .filter { 
            println("Фильтруем: $it")
            it % 2 == 0 
        }
        .map { 
            println("Мапим: $it")
            "Число: $it" 
        }
        .collect { println("Коллектим: $it") }
}

Оператор transform предоставляет возможность эмитировать произвольное количество значений на основе одного элемента:

fun main() = runBlocking {
    (1..3).asFlow()
        .transform { value ->
            emit("Начинаем обработку $value")
            delay(50) // можем вызывать suspend-функции внутри transform
            emit("Закончили обработку $value")
        }
        .collect { println(it) }
}

Отмена и смена контекста. Flow поддерживает отмену, как и все корутины. Если требуется остановить сбор данных, можно использовать отмену внутри collect(). Например:

fun cancellableFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Эмитим $i")
        emit(i)
    }
}

fun main() = runBlocking {
    cancellableFlow().collect { value ->
        if (value == 3) cancel() // отменяем сбор при достижении определенного значения
        println("Получили $value")
    }
}

Если в цикле нет suspend‑функций, отмена может не сработать мгновенно. Для таких случаев есть оператор cancellable():

fun main() = runBlocking {
    (1..5).asFlow()
        .cancellable()
        .collect { value ->
            if (value == 3) cancel()
            println("Число: $value")
        }
}

Иногда нужно, чтобы эмитирование значений происходило в другом контексте. Не стоит пытаться менять контекст с помощью withContext внутри flow — используйте для этого оператор flowOn():

fun cpuIntensiveFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // эмулируем тяжелую работу
        println("Эмитим $i в потоке ${Thread.currentThread().name}")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking {
    cpuIntensiveFlow().collect { value ->
        println("Получили $value в потоке ${Thread.currentThread().name}")
    }
}

Для ситуации, когда эмиттер и коллекционер работают с разной скоростью, полезны операторы buffer(), conflate() и collectLatest(). Они позволяют избежать «узких мест» при обработке данных, буферизуя или пропуская промежуточные значения.

Пример интеграции

Допустим, есть у нас UI‑приложение, которое должно регулярно получать обновления с сервера. Вместо того, чтобы создавать бесконечный цикл в активности, можно просто создать Flow, который будет выдавать новые данные каждые несколько секунд.

Представим, что есть два источника новостей, и нужно объединить их потоки, фильтровать дубликаты и выводить данные в реальном времени:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random

// Модель данных для новости
data class NewsArticle(
    val id: Int,
    val source: String,
    val headline: String,
    val timestamp: Long
)

// Имитация сетевого запроса для получения новостей с заданного источника
object NetworkClient {
    suspend fun fetchNews(source: String): List<NewsArticle> {
        delay(500) // имитация задержки сети
        if (Random.nextInt(0, 10) < 2) {
            throw RuntimeException("Ошибка сети при получении новостей от $source")
        }
        val currentTime = System.currentTimeMillis()
        return List(3) {
            NewsArticle(
                id = Random.nextInt(1000, 9999),
                source = source,
                headline = "Срочная новость от $source №${Random.nextInt(100)}",
                timestamp = currentTime
            )
        }
    }
}

// Функция-генератор потока новостей для заданного источника. Каждые intervalMs миллисекунд производится опрос сервера.
fun newsFlow(source: String, intervalMs: Long): Flow<NewsArticle> = flow {
    while (true) {
        try {
            println("[$source] Опрашиваем источник...")
            val articles = NetworkClient.fetchNews(source)
            articles.forEach { article -> emit(article) }
        } catch (e: Exception) {
            println("[$source] Ошибка: ${e.message}")
        }
        delay(intervalMs)
    }
}.flowOn(Dispatchers.IO)

// Основной пример: агрегатор новостей. Объединяем потоки, фильтруем дубликаты и выводим данные.
fun main() = runBlocking {
    println("=== Запуск агрегатора новостей ===")

    // Создаем потоки для двух источников с разными интервалами опроса
    val sourceAFlow = newsFlow("ИсточникA", intervalMs = 3000)
    val sourceBFlow = newsFlow("ИсточникB", intervalMs = 5000)

    // Объединяем потоки с помощью merge, буферизуем до 50 элементов и фильтруем дубликаты по id
    val mergedNewsFlow = merge(sourceAFlow, sourceBFlow)
        .buffer(50)
        .distinctBy { it.id }

    // Запускаем сбор новостей в отдельном Job для возможности отмены
    val aggregatorJob = launch {
        mergedNewsFlow
            .onEach { article ->
                // Здесь можно обновлять UI, сохранять в БД и пр.
                println("Получена новость: [${article.source}] ${article.headline} (время: ${article.timestamp})")
            }
            .catch { e -> println("Ошибка обработки новостей: ${e.message}") }
            .onCompletion { cause ->
                if (cause != null) println("Агрегатор завершился с ошибкой: $cause")
                else println("Агрегатор успешно завершил сбор новостей.")
            }
            .collect()
    }

    // Даем агрегатору работать 20 секунд, затем корректно останавливаем сбор данных
    delay(20000)
    println("=== Остановка агрегатора новостей ===")
    aggregatorJob.cancelAndJoin()
    println("=== Агрегатор остановлен ===")
}

В консоли будут периодически появляться сообщения вроде «[ИсточникA] Опрашиваем источник...» и «Получена новость: [ИсточникB] Срочная новость...». Если при запросе возникнут ошибки, вы увидите соответствующие сообщения. По истечении 20 секунд работы агрегатора появится уведомление об остановке, и сбор новостей корректно завершится.


18 февраля в 19:00 пройдёт открытый урок на тему «Разработка монолитного приложения со Spring».

Поговорим о преимуществах и недостатках монолитной архитектуры и фреймворка Spring, а также об особенностях разработки со Spring в Kotlin. Попрактикуемся в разработке работающего монолитного приложения и разместим его в Docker-контейнере. Записаться

Комментарии (0)