Лето — лучшее время для сплава. Поэтому, если вы пока не в отпуске, давайте устроим короткий сплав по асинхронным потокам данных.
Переход из привычной императивной парадигмы иногда бывает сложным, поэтому сначала поговорим о терминах.
Термины
Kotlin Flow — это API для работы с асинхронными потоками данных, построенное поверх корутин. Kotlin Flow реализует парадигму реактивного программирования.
Реактивное программирование — это парадигма, в которой все данные и события рассматриваются не как единичные значения, а как асинхронные потоки, протекающие через приложение. Каждый компонент «подписан» на поток входных событий и автоматически обновляет своё состояние в ответ на любые изменения.
Реактивное программирование можно рассматривать как синтез трех подходов:

Как и в декларативной парадигме, в реактивном программировании описывается «что», а не «как». Вместо явных циклов вы просто собираете цепочку операторов (map, filter, flatMap и т. д.), задавая желаемую трансформацию потока.
Как и в программировании потоков данных, в реактивном программировании каждый поток и каждый оператор образуют «узлы» и «ребра» графа, по которому данные протекают от источников к потребителям.
Как и в событийно-ориентированной парадигме, в реактивном программировании потоки часто реализуют модель публикации-подписки.
Простейший пример реактивной парадигмы мы все видели в Excel, где при обновлении значения ячейки автоматически пересчитываются все остальные ячейки, которые ссылаются на нее через формулы.
Холодные и горячие потоки
Поток данных (stream) — это последовательность значений, которые поступают со временем, зачастую асинхронно.
Потоки данных делятся на горячие и холодные. Это различие описывает, когда именно начинается генерация данных и что происходит при подписке на поток.
Горячие потоки генерируют данные независимо от наличия подписчиков, а холодные потоки ленивы и начинают генерировать их только по запросу.

Слева — «горячий поток» в виде извергающегося вулкана, непрерывно выбрасывающего данные (желтые листки), а справа — «холодный поток» в виде ледяного крана, из которого данные капают только при повороте вентиля.
«Холодные потоки, горячие каналы»
Так называется статья Романа Елизарова, опубликованная в 2019 году. Дело в том, что в Kotlin Flow есть и горячие (SharedFlow и StateFlow), и холодные (Flow) потоки данных. Кроме того в Kotlin есть еще одна реализация горячих потоков — Channels.
Чтобы не запутаться, давайте посмотрим на схему:

Объект Flow в Kotlin — это реализация холодного потока данных, который производит значения асинхронно и лениво. Таким образом, он берет лучшее и от корутин, и от последовательностей: значения вычисляются «на лету» и при этом используются корутины для асинхронной обработки данных.
За информацией о каналах отсылаю вас к своему курсу по корутинам на Степике. В данной статье мы будем говорить только о Flow и его вариациях.
Создание Flow
Хрестоматийный пример для объяснения работы Flow — вычисление чисел Фибоначчи:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun fibonacciFlow(): Flow<Long> = flow {
var a = 0L
var b = 1L
emit(a)
delay(100)
emit(b)
while (true) {
val next = a + b
emit(next)
a = b
b = next
delay(100)
}
}
fun main() = runBlocking {
fibonacciFlow().take(10).collect { println(it) }
}
Здесь мы создаем поток данных с помощью билдера flow
, получающего на вход лямбду с приемником. Как известно, если последний параметр функции в Kotlin сам является функцией, то ее можно в виде лямбда-выражения вынести за круглые скобки (trailing lambda). Поэтому вместо
flow() { ... }
лямбда-аргумент можно записать внутри фигурных скобок, следующих за именем функции flow
, что придает синтаксису выразительность.
Функция emit()
отправляет очередное значение в поток, и оно плывет к подписчику — тому, кто запросил это значение. Так как emit()
— это suspend-функция, она приостанавливает выполнение корутины после передачи каждого значения до следующего запроса.
Но весь этот механизм лежит мертвым грузом до вызова терминальной операции, которая запускает выполнение потока в корутине и получает результаты его работы. В данном случае эту роль играет функция collect
, вызов которой инициирует сбор потока, т.е. генерацию значений и получение результатов. При этом лямбда-функция, переданная в collect
, подписывается на поток, то есть становится получателем значений, которые им генерируются.
Структура Flow
Каждый Flow делится на три части:
эмиттер — блок, в котором генерируются данные и, как правило, вызывается функция
emit()
для отправки значений;промежуточные операторы — операторы, которые обрабатывают данные, проходящие через поток;
коллектор — терминальный оператор, который запускает сбор потока и получает итоговые значения.
Взаимоотношения между терминальным и промежуточными операторами выглядят примерно так:

Возвращаясь к нашему примеру с числами Фибоначчи, эмиттер — это блок внутри flow{}:
var a = 0L
var b = 1L
emit(a)
delay(100)
emit(b)
while (true) {
val next = a + b
emit(next)
a = b
b = next
delay(100)
}
Промежуточный оператор представлен вызовом функции take()
:
.take(10)
А о коллекторе вы уже догадались по названию.
Многие промежуточные операторы называются так же, как и методы коллекций: map
, filter
, take
и т.д. Однако они адаптированы для работы с асинхронными потоками данных и используют возможности корутин.
Например, создадим поток углов в градусах, затем последовательно применим преобразования с помощью оператора map
:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.math.*
fun main() = runBlocking {
flowOf(0, 30, 45, 60, 90)
.map { it.toDouble() } // преобразуем целые числа в Double
.map { Math.toRadians(it) } // переводим градусы в радианы
.map { sin(it) } // вычисляем синус угла
.collect { println(it) } // выводим результат на экран
}
Когда оператор collect
запускает выполнение потока в корутине, каждое число генерируется, последовательно проходит по всей цепочке преобразований и передается в лямбду, переданную в collect
.
Как в случае и с коллекциями, подобные цепочки реализуют декларативную парадигму программирования: указано, что сделать, а не как сделать.
Пример
Предположим, у нас есть Android-приложение, в котором нам нужно отображать на главном экране актуальное количество записей из какой-то таблицы в БД. Актуальное — то есть при изменении таблицы это значение должно оперативно пересчитываться.
@Dao
interface WordDao {
@Query("SELECT COUNT(id) FROM words")
fun count(): Flow<Int>
}
С помощью Flow мы следим за таблицей words
и автоматически эмиттим обновленный count
при каждом изменении данных.
@Singleton
class WordRepository @Inject constructor(
private val wordDao: WordDao
) {
fun countWordsFlow(): Flow<Int> =
wordDao.count().
.distinctUntilChanged()
.flowOn(Dispatchers.IO)
}
Метод countWordsFlow()
позволяет подписаться на обновления количества слов в базе. Как только данные изменятся, в соответствующий Flow поступит новое значение — но только если оно реально изменилось (за счет вызова distinctUntilChanged
). Все это выполняется асинхронно и не блокирует основной поток приложения. Поэтому нам не приходится каждый раз обращаться к базе данных вручную.
Итог
Flow — мощный инструмент реактивного программирования, позволяющий просто и эффективно обрабатывать асинхронные потоки данных, реагировать на их изменения в реальном времени, а также писать лаконичный, безопасный и читаемый код без необходимости отслеживать изменения вручную.
Больше теории и примеров — в моем курсе по корутинам.
rpc1
Подскажите зачем нужен вызов
delay(100) в fibonacciFlow
И почему именно 100, а не 10 или 50 или 200? Получается какое-то магическое число неизвестно откуда взятое.
AGalilov
Похоже, delay нужен чтобы визуально код не сыпал на консоль сразу весь результат работы, а делал это с паузами, давая время заметить, что что-то внутри происходит. По шагам. Ну как-то так.
@Ioanna Может быть, имеет смысл выдавать для наглядности информацию о каждом шаге из самого цикла
while
, тогда и delay можно убрать?Ioanna Автор
Нет, вызов delay() здесь нужен потому, что это suspend-функция, без которой не будет асинхронности.
Gizcer
Просто задержка перед повтором операции. Если есть желание повесить поток, то можно убрать.
В целом для холодного потока ограничение во флоу через delay нормальная практика.
Если мы посмотрим на метод debounce во флоу, то он позволит пропускать изменения, только если они не менялись в течении определенного времени. Не всегда же обновлять UI , так можно и привиснуть.
Так же в документации есть замечательные методы для соединения потоков данных (combine и zip), так же для переключения потоков данных flatMapLatest и другие. Возможности выделить из цепочки преобразований данные onEach.
При желании из холодного потока можно сделать горячий через stateIn. (Каждый подписчик на холодный поток вызывает всю цепочку преобразований) Но если перед передачей к подписчикам сделать его горячим, то холодный будет только один до горячего, а дальше расходиться к каждому подписчику.
В целом тема обширная и дает очень много возможностей.
На данный момент лучший выбор для архитектуры UDF
Ioanna Автор
Я просто процитирую свой же учебный курс по корутинам:
rpc1
Ну вы так и не ответили на мой вопрос, откуда взялось число 200 в вашем примере. Как разработчик, я хочу получить преимущества асинхронной обработки, но задержки 200 мс на каждой итерации выглядит не очень. Почему не указать 2 мс? Как вычислить оптимальное значение?
Ioanna Автор
Для учебного примера число выбрано произвольно. В реальном коде на этом месте будет какая-то другая suspend-функция, возвращающая какие-то данные.