Лето — лучшее время для сплава. Поэтому, если вы пока не в отпуске, давайте устроим короткий сплав по асинхронным потокам данных.

Переход из привычной императивной парадигмы иногда бывает сложным, поэтому сначала поговорим о терминах.

Термины

Kotlin Flow — это API для работы с асинхронными потоками данных, построенное поверх корутин. Kotlin Flow реализует парадигму реактивного программирования.

Реактивное программирование — это парадигма, в которой все данные и события рассматриваются не как единичные значения, а как асинхронные потоки, протекающие через приложение. Каждый компонент «подписан» на поток входных событий и автоматически обновляет своё состояние в ответ на любые изменения.

Реактивное программирование можно рассматривать как синтез трех подходов:

Парадигмы программирования
Парадигмы программирования

Как и в декларативной парадигме, в реактивном программировании описывается «что», а не «как». Вместо явных циклов вы просто собираете цепочку операторов (map, filter, flatMap и т. д.), задавая желаемую трансформацию потока.

Как и в программировании потоков данных, в реактивном программировании каждый поток и каждый оператор образуют «узлы» и «ребра» графа, по которому данные протекают от источников к потребителям.

Как и в событийно-ориентированной парадигме, в реактивном программировании потоки часто реализуют модель публикации-подписки.

Простейший пример реактивной парадигмы мы все видели в Excel, где при обновлении значения ячейки автоматически пересчитываются все остальные ячейки, которые ссылаются на нее через формулы.

Холодные и горячие потоки

Поток данных (stream) — это последовательность значений, которые поступают со временем, зачастую асинхронно.

Потоки данных делятся на горячие и холодные. Это различие описывает, когда именно начинается генерация данных и что происходит при подписке на поток.

Горячие потоки генерируют данные независимо от наличия подписчиков, а холодные потоки ленивы и начинают генерировать их только по запросу.

Даже в 2025 году ChatGPT все еще путается в числе пальцев на руке у человека. К тому же обе руки - правые)
Даже в 2025 году ChatGPT все еще путается в числе пальцев на руке у человека. К тому же обе руки - правые)

Слева — «горячий поток» в виде извергающегося вулкана, непрерывно выбрасывающего данные (желтые листки), а справа — «холодный поток» в виде ледяного крана, из которого данные капают только при повороте вентиля.

«Холодные потоки, горячие каналы»

Так называется статья Романа Елизарова, опубликованная в 2019 году. Дело в том, что в Kotlin Flow есть и горячие (SharedFlow и StateFlow), и холодные (Flow) потоки данных. Кроме того в Kotlin есть еще одна реализация горячих потоков — Channels.

Чтобы не запутаться, давайте посмотрим на схему:

Объект Flow в Kotlin — это реализация холодного потока данных, который производит значения асинхронно и лениво. Таким образом, он берет лучшее и от корутин, и от последовательностей: значения вычисляются «на лету» и при этом используются корутины для асинхронной обработки данных.

За информацией о каналах отсылаю вас к своему курсу по корутинам на Степике. В данной статье мы будем говорить только о Flow и его вариациях.

Создание Flow

Хрестоматийный пример для объяснения работы Flow — вычисление чисел Фибоначчи:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun fibonacciFlow(): Flow<Long> = flow {
    var a = 0L
    var b = 1L
    emit(a)
    delay(100)
    emit(b)
    while (true) {
        val next = a + b
        emit(next)
        a = b
        b = next
        delay(100)
    }
}

fun main() = runBlocking {
    fibonacciFlow().take(10).collect { println(it) }
}

Здесь мы создаем поток данных с помощью билдера flow, получающего на вход лямбду с приемником. Как известно, если последний параметр функции в Kotlin сам является функцией, то ее можно в виде лямбда-выражения вынести за круглые скобки (trailing lambda). Поэтому вместо

flow() { ... }

лямбда-аргумент можно записать внутри фигурных скобок, следующих за именем функции flow, что придает синтаксису выразительность.

Функция emit() отправляет очередное значение в поток, и оно плывет к подписчику — тому, кто запросил это значение. Так как emit() — это suspend-функция, она приостанавливает выполнение корутины после передачи каждого значения до следующего запроса.

Но весь этот механизм лежит мертвым грузом до вызова терминальной операции, которая запускает выполнение потока в корутине и получает результаты его работы. В данном случае эту роль играет функция collect, вызов которой инициирует сбор потока, т.е. генерацию значений и получение результатов. При этом лямбда-функция, переданная в collect, подписывается на поток, то есть становится получателем значений, которые им генерируются.

Структура Flow

Каждый Flow делится на три части:

  • эмиттер — блок, в котором генерируются данные и, как правило, вызывается функция emit() для отправки значений;

  • промежуточные операторы — операторы, которые обрабатывают данные, проходящие через поток;

  • коллектор — терминальный оператор, который запускает сбор потока и получает итоговые значения.

Взаимоотношения между терминальным и промежуточными операторами выглядят примерно так:

Ленивые промежуточные операторы нуждаются в подбадривании
Ленивые промежуточные операторы нуждаются в подбадривании

Возвращаясь к нашему примеру с числами Фибоначчи, эмиттер — это блок внутри flow{}:

    var a = 0L
    var b = 1L
    emit(a)
    delay(100)
    emit(b)
    while (true) {
        val next = a + b
        emit(next)
        a = b
        b = next
        delay(100)
    }

Промежуточный оператор представлен вызовом функции take():

.take(10)

А о коллекторе вы уже догадались по названию.

Многие промежуточные операторы называются так же, как и методы коллекций: map, filter, take и т.д. Однако они адаптированы для работы с асинхронными потоками данных и используют возможности корутин.

Например, создадим поток углов в градусах, затем последовательно применим преобразования с помощью оператора map:

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.math.*

fun main() = runBlocking {
    flowOf(0, 30, 45, 60, 90)
        .map { it.toDouble() } // преобразуем целые числа в Double
        .map { Math.toRadians(it) } // переводим градусы в радианы
        .map { sin(it) } // вычисляем синус угла
        .collect { println(it) } // выводим результат на экран
}

Когда оператор collect запускает выполнение потока в корутине, каждое число генерируется, последовательно проходит по всей цепочке преобразований и передается в лямбду, переданную в collect.

Как в случае и с коллекциями, подобные цепочки реализуют декларативную парадигму программирования: указано, что сделать, а не как сделать.

Пример

Предположим, у нас есть Android-приложение, в котором нам нужно отображать на главном экране актуальное количество записей из какой-то таблицы в БД. Актуальное — то есть при изменении таблицы это значение должно оперативно пересчитываться.

@Dao
interface WordDao {
  @Query("SELECT COUNT(id) FROM words")
  fun count(): Flow<Int>
}

С помощью Flow мы следим за таблицей words и автоматически эмиттим обновленный count при каждом изменении данных.

@Singleton
class WordRepository @Inject constructor(
  private val wordDao: WordDao
) {
  fun countWordsFlow(): Flow<Int> =
    wordDao.count().
      .distinctUntilChanged()
      .flowOn(Dispatchers.IO)
}

Метод countWordsFlow()позволяет подписаться на обновления количества слов в базе. Как только данные изменятся, в соответствующий Flow поступит новое значение — но только если оно реально изменилось (за счет вызова distinctUntilChanged). Все это выполняется асинхронно и не блокирует основной поток приложения. Поэтому нам не приходится каждый раз обращаться к базе данных вручную.

Итог

Flow — мощный инструмент реактивного программирования, позволяющий просто и эффективно обрабатывать асинхронные потоки данных, реагировать на их изменения в реальном времени, а также писать лаконичный, безопасный и читаемый код без необходимости отслеживать изменения вручную.

Больше теории и примеров — в моем курсе по корутинам.

Комментарии (7)


  1. rpc1
    29.06.2025 07:23

    Подскажите зачем нужен вызов delay(100) в fibonacciFlow

    И почему именно 100, а не 10 или 50 или 200? Получается какое-то магическое число неизвестно откуда взятое.


    1. AGalilov
      29.06.2025 07:23

      Похоже, delay нужен чтобы визуально код не сыпал на консоль сразу весь результат работы, а делал это с паузами, давая время заметить, что что-то внутри происходит. По шагам. Ну как-то так.

      @Ioanna Может быть, имеет смысл выдавать для наглядности информацию о каждом шаге из самого цикла while, тогда и delay можно убрать?


      1. Ioanna Автор
        29.06.2025 07:23

        Нет, вызов delay() здесь нужен потому, что это suspend-функция, без которой не будет асинхронности.


    1. Gizcer
      29.06.2025 07:23

      Просто задержка перед повтором операции. Если есть желание повесить поток, то можно убрать.

      В целом для холодного потока ограничение во флоу через delay нормальная практика.

      Если мы посмотрим на метод debounce во флоу, то он позволит пропускать изменения, только если они не менялись в течении определенного времени. Не всегда же обновлять UI , так можно и привиснуть.

      Так же в документации есть замечательные методы для соединения потоков данных (combine и zip), так же для переключения потоков данных flatMapLatest и другие. Возможности выделить из цепочки преобразований данные onEach.

      При желании из холодного потока можно сделать горячий через stateIn. (Каждый подписчик на холодный поток вызывает всю цепочку преобразований) Но если перед передачей к подписчикам сделать его горячим, то холодный будет только один до горячего, а дальше расходиться к каждому подписчику.

      В целом тема обширная и дает очень много возможностей.

      На данный момент лучший выбор для архитектуры UDF


    1. Ioanna Автор
      29.06.2025 07:23

      Я просто процитирую свой же учебный курс по корутинам:

      Указание модификатора suspend само по себе еще не превращает функцию в точку приостановки. Чтобы заслужить это гордое имя, функции придется вызвать одну из функций, которые действительно приостанавливают выполнение (например, delaywithContextyield и т. д.), или реализовать аналогичное поведение самостоятельно с помощью suspendCoroutine или suspendCancellableCoroutine.

      Как уже говорилось, функция delay() выступает как точка приостановки, где корутина засыпает, уступая управление другому коду, и где она затем просыпается для возобновления работы.

      Параметр delay() — число миллисекунд для задержки. Оно может быть и нулевым, и отрицательным. Оба случая просто сведут действие функции на нет.


      1. rpc1
        29.06.2025 07:23

        Ну вы так и не ответили на мой вопрос, откуда взялось число 200 в вашем примере. Как разработчик, я хочу получить преимущества асинхронной обработки, но задержки 200 мс на каждой итерации выглядит не очень. Почему не указать 2 мс? Как вычислить оптимальное значение?


        1. Ioanna Автор
          29.06.2025 07:23

          Для учебного примера число выбрано произвольно. В реальном коде на этом месте будет какая-то другая suspend-функция, возвращающая какие-то данные.