Дисклеймер: данная статья — это попытка разобраться в сути вопроса, в том числе совместно с читателями. Если вы заметили неточность/ошибку в проводимых экспериментах или у вас есть чем поделиться по данной теме, буду рад вашим советам и мыслям в комментариях.
Всем привет! На связи Александр Гирев, Mobile Lead в команде ценообразования портала для продавцов в Wildberries.
В последнее время всё чаще на собеседованиях мы встречаем кандидатов, которые пользуются «услугами» нейросетей или суфлёров. В целом это грустная ситуация, но и у неё нашлись некоторые плюсы. Так, если нейросеть подсказывает кандидату, то его ответ порой изобилует такими техническими подробностями, что у интерьвюеров появляется возможность углубить экспертизу прямо во время собеседования. Или наметить вопросы, которые можно изучить после.
Однажды на собеседовании мы обсуждали с кандидатом кусочек кода, связанный с Kotlin Flow. Кандидату нужно было найти причину бага. И при помощи нейросети или своих глубоких познаний наш визави стал говорить о синхронности и асинхронности emit и tryEmit, которые используются для передачи новой порции данных в SharedFlow.
К сути проблемы его размышления не относились, однако я получил сигнал, что здорово было бы поглубже разобраться, как это работает. Собственно, по этому поводу мы здесь и собрались: SharedFlow и его операторы для эмиссии данных, а также параметры, которые можно задать при создании MutableSharedFlow: replay, bufferCapacity, bufferOverflow.
Статья предполагает, что читатель уже знаком с концепцией Kotlin Coroutines и Kotlin Flow и имеет общее представление об их использовании в Android-приложении. Если это не так, то рекомендую перед дальнейшим чтением ознакомиться с соответствующими материалами. Например, с этими.
Абстракции для простоты общения
Давайте для простоты обсуждения будем использовать определённую абстракцию, приближенную к миру людей. Идеологически Kotlin Flow представляет собой прокачанную реализацию паттерна «Наблюдатель». Вкратце напомню: суть паттерна сводится к тому, чтобы служить источником некоторых данных и своевременно сообщать подписчикам, что для них появилась новая информация.
Данный сценарий очень похож на процесс распространения печатных изданий среди читателей, давайте попробуем использовать эту традиционную для таких случаев аналогию. Представим, что нам нужно написать программу, которая имитирует процесс издания и распространения электронных книг от издательства к читателям посредством интернет-сайта, в дальнейшем будем строить примеры и эксперименты вокруг этой абстракции.
Метод emit(): «Не торопитесь, я подожду»
Итак, сайт для электронных книг. Предположим, у нас есть серия книг, которые мы распространяем через сайт издательства, наших читателей уведомляем о новой части произведения через электронную почту. В данном случае будем использовать горячий источник данных, а именно MutableSharedFlow.
У нас есть один подписчик, который сразу получает новую часть книги после отправки, ведь электронная почта работает мгновенно. Для отправки событий будем использовать метод suspend fun emit(value: T). Мы будем публиковать по одной книге в неделю, для имитации периодичности применим функцию delay():
class Book(
val part: Int
)
private val bookService = MutableSharedFlow<Book>()
suspend fun subscribeToBookShop() {
bookService.collect { newBook ->
println("get book part №${newBook.part}")
}
}
suspend fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
bookService.emit(Book(i))
delay(ONE_WEEK)
}
}
// Вывод программы
// publish new book part №1
// get book part №1
// publish new book part №2
// get book part №2
// publish new book part №3
// get book part №3
Как мы видим, в текущей реализации читатель сразу после публикации получает новую часть книги.
А что если мы замедлим читателя? Например, он будет готов получить новую часть книги только после того, как поставит на сайте отметку «Прочитано» у прошлой книги? Предположим, что новая часть книги по-прежнему выходит раз в неделю, но читателю нужен 1 месяц, чтобы прочитать книгу. Код нашей программы будет выглядеть следующим образом:
private val bookService = MutableSharedFlow<Book>()
suspend fun subscribeToBookShop() {
bookService.collect { newBook ->
println("read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
suspend fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
emailService.emit(newBook)
delay(ONE_WEEK)
}
}
// Вывод программы
// publish new book part №1
// read book part №1
// publish new book part №2
// read book part №2
// publish new book part №3
// read book part №3
Кажется, что мы существенно изменили условия задачи, однако вывод программы не изменился. У нас получилось, что издательство ждёт, пока читатель дочитает предыдущую часть книги, а только потом публикует новую. Но ведь это не соответствует нашей бизнес-логике и здравому смыслу!
Объяснить происходящее можно посредством концепции backpressure. Вспомним, что Kotlin Flow — это герой из супервселенной корутин, а значит, здесь работает suspend-магия с прерыванием функций. Давайте вспомним аргументы функции MutableSharedFlow():

Пока что обратим внимание только на аргумент onBufferOverflow, который имеет значение по умолчанию BufferOverflow.SUSPEND. Трактовать этот момент можно следующим образом: если в буфере у источника не будет свободного места, а подписчик не будет готов принять новую порцию данных, то функция, ответственная за производство новых данных, будет приостановлена. Поскольку emit() — это suspend-функция, то так всё и произойдёт.
Для любителей заглянуть под капот — данная логика описана внутри функции emitSuspend(), обёрткой которой выступает emit().
Смотреть исходный код

Подобное поведение с приостановкой источника было бы идеальным, если бы мы в коде описывали процесс завтрака. Например, мама не предлагает новую ложку манной каши, пока ребёнок не справится с предыдущей. Мы же решаем задачу для книжного издательства, а книги — это при всём уважении не манная каша, нам такой подход не подойдёт.
Так давайте просто поменяем onBufferOverflow, может предложить находчивый читатель. Звучит логично, но если мы это сделаем, не изменив два других параметра (а мы пока этого делать не хотим, это не входит в наши бизнес-планы), то мы получим краш приложения и вот такую красивую ошибку в логах:

Из логов следует, что другие типы BufferOverflow можно использовать только с extraBufferCapacity > 0.
Что же делать? Давайте попробуем использовать второй метод для отправки данных — tryEmit.
Метод tryEmit(): «Отправь значение или упади с ошибкой»
Для начала посмотрим на сигнатуру функции tryEmit():

Это не suspend-функция, что уже обнадёживает. Правда, зачем-то присутствует возвращаемое boolean-значение и смущает префикс try. Но давайте пока не будем обращать внимание на такие мелочи, сосредоточимся на выполнении задачи.
Заменим emit на tryEmit в нашем коде и попробуем снова запустить программу.
private val bookService = MutableSharedFlow<Book>()
suspend fun subscribeToBookShop() {
bookService.collect { newBook ->
println("read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
// было
//emailService.emit(newBook)
// стало
emailService.tryEmit(newBook)
delay(ONE_WEEK)
}
}
// Вывод программы
// publish new book part №1
// publish new book part №2
// publish new book part №3
Кажется, мы сделали только хуже. Теперь наш читатель остался совсем без книг. Посмотрим, в чём причина. Как мы видели выше, метод tryEmit() возвращает Boolean. Из документации к методу и исходного кода следует, что метод возвращает false, если новое значение отправить не удалось. Кажется, это как раз наш случай, убедимся в этом посредством вывода возвращаемого значения:
...
fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i)
val isPublished = emailService.tryEmit(newBook)
println("№${newBook.part} isPublished $isPublished")
delay(ONE_WEEK)
}
)
// Вывод программы
// №1 isPublished false
// №2 isPublished false
// №3 isPublished false
Попробуем разобраться, в чем причина неудачи, для этого заглянем под капот tryEmit. Исходя из кода функции, возвращаемое значение будет false, при следующих параметрах у функции MutableSharedFlow():
bufferOverFlow == BufferOverflow.SUSPEND && replay == 0 || bufferCapacity == 0
Подробный код функции в файле SharedFlow.kt:
Внутри функции tryEmit() вызывается tryEmitLocked, в ней-то и описана вся магия.

Т.е нам нужно изменить replay или bufferCapacity на ненулевое положительное значение? Давайте проверим. Поочередно изменим указанные параметры на 1, остальной код оставим без изменений.
// вариант 1
private val bookService = MutableSharedFlow<Book>(
extraBufferCapacity = 1
)
// вариант 2
private val bookService = MutableSharedFlow<Book>(
replay = 1
)
suspend fun subscribeToBookShop() {
bookService.collect { newBook ->
println("read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
emailService.tryEmit(newBook)
delay(ONE_WEEK)
}
}
// Вывод программы
// publish new book part №1
// №1 isPublished true
// read book part №1
// publish new book part №2
// №2 isPublished true
// publish new book part №3
// №3 isPublished false
// read book part №2
Уже лучше, но наш читатель всё равно остался без последней части книги. Почему так произошло? Для ответа на вопрос нужно разобраться, за что отвечают входные параметры replay и bufferCapacity.
Параметр replay: распишитесь за последний свежий выпуск, пожалуйста
Параметр replay: Int означает, что именно столько последних значений, произошедших до момента подписки, получит каждый новый подписчик. К примеру, в понедельник мы выпустили первую часть книги, во вторник — вторую, в среду — третью. Читатель подписался только на получение книги в четверг. Вывод программы будет таким:
private val bookService = MutableSharedFlow<Book>(
replay = 1
)
fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
val isPublished = emailService.tryEmit(newBook)
println("№${newBook.part} isPublished $isPublished")
delay(ONE_DAY)
)
suspend fun subscribeToBookShop() {
delay(FOUR_DAYS)
bookService.collect { newBook ->
println("read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
// Вывод программы
// publish new book part №1
// №1 isPublished true
// publish new book part №2
// №2 isPublished true
// publish new book part №3
// №3 isPublished true
// read book part №3
Значение в буфере, обеспечивающем replay, будет каждый раз перезаписываться на новое: когда в четверг подписчик придёт за новой книгой, его будет ждать третья часть книги, а не первая. А если он вдруг заглянет во вторник, то успеет получить вторую часть:
...
suspend fun subscribeToBookShop() {
delay(2_DAYS)
bookService.collect { newBook ->
println("read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
// Вывод программы
// publish new book part №1
// №1 isPublished true
// publish new book part №2
// №2 isPublished true
// read book part №2
// publish new book part №3
// №3 isPublished true
// read book part №3
Наверное, вы уже догадались, что будет, если у нас заведётся второй читатель, при этом один придёт на сайт во вторник, а другой — в четверг:
private val bookService = MutableSharedFlow<Book>(
replay = 1
)
fun publishNewBook() {
for(i in 1..3) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
val isPublished = emailService.tryEmit(newBook)
println("№${newBook.part} isPublished $isPublished")
delay(ONE_DAY)
)
suspend fun subscribeFirstReader() {
delay(2_DAYS)
bookService.collect { newBook ->
println("Reader №1 read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
suspend fun subscribeSecondReader() {
delay(4_DAYS)
bookService.collect { newBook ->
println("Reader №2 read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
// Вывод программы
// publish new book part №1
// publish new book part №2
// Reader №1 read book part №2
// publish new book part №3
// Reader №2 read book part №3
// Reader №1 read book part №3
С replay, кажется, разобрались. Этот параметр позволяет нам как бы оставить последнее полученное значение для нового подписчика, чтобы он не простаивал без данных, в нашем случае, не скучал без книги. Теперь давайте разберёмся со следующим параметром — bufferCapacity.
BufferCapacity: я оставлю для вас пару книг, они будут ждать вас
Зачем же нужен bufferCapacity? Это число событий (в нашем случае, книг), которые получит подписчик помимо количества, указанного в replay. Здесь есть нюансы, но о них позже. Давайте доработаем наш пример, будем с понедельника по пятницу каждый день выпускать новую книгу, а читатель придёт к нам во вторник.
private val bookService = MutableSharedFlow<Book>(
replay = 1,
)
fun publishNewBook() {
for(i in 1..5) {
val newBook = Book(i).apply {
println("publish new book part $i")
}
emailService.tryEmit(newBook)
delay(ONE_DAY)
)
}
suspend fun subscribeFirstReader() {
delay(TWO_DAYS)
bookService.collect { newBook ->
println("Reader №1 read book part №${newBook.part}")
delay(ONE_MONTH)
}
}
// Вывод программы
// publish new book part №1
// publish new book part №2
// Reader №1 read book part №2
// publish new book part №3
// publish new book part №4
// publish new book part №5
// Reader №1 read book part №3
Мы успели выпустить 2 книги, пока к нам пришёл читатель. Потом он на месяц погрузился в чтение, а мы за это время успели выпустить ещё 3 книги. При этом придержали для читателя 1 экземпляр, т.е. replay автоматически сформировал буфер в 1 элемент даже без bufferCapacity.
Кстати на этот буфер будет распространяться власть параметра bufferOverflow. Так, если мы добавим в пример выше параметр BufferOverflow.DROP_OLDEST, то после прочтения второй книги читатель получит часть №5, а не часть №3, как до этого.
private val bookService = MutableSharedFlow<Book>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
...
// Вывод программы
// publish new book part №1
// publish new book part №2
// Reader №1 read book part №2
// publish new book part №3
// publish new book part №4
// publish new book part №5
// Reader №1 read book part №5
Что изменится, в выводе программы, если мы добавим bufferCapacity? Давайте проверим:
private val bookService = MutableSharedFlow<Book>(
replay = 1,
extraBufferCapacity = 1
)
...
// Вывод программы
// publish new book part №1
// publish new book part №2
// Reader №1 read book part №2
// publish new book part №3
// publish new book part №4
// publish new book part №5
// Reader №1 read book part №3
// Reader №1 read book part №4
Мы дали возможность нашему читателю ознакомиться с частью №4, которая попала в буфер. При этом читатель так и не добрался до части №1, хотя я ожидал из описания параметра в документации, что на старте подписчик получит 2 последних значения. Но видимо, слово buffer имелось именно как буфер после подписки, а не как дополнительное значение, которое повторится для нового подписчика на старте.

А будет ли отличаться вывод программы между replay = 1 и extraBufferCapacity = 1? По началу мне показалось, что разницы нет, но она есть, и существенная:
Вывод программы для extraBufferCapacity = 1
// publish new book part №1
// publish new book part №2
// publish new book part №3
// Reader №1 read book part №3
// publish new book part №4
// publish new book part №5
// Reader №1 read book part №4
Вывод программы для replay = 1
// publish new book part №1
// publish new book part №2
// Reader №1 read book part №2
// publish new book part №3
// publish new book part №4
// publish new book part №5
// Reader №1 read book part №3
В обоих случаях читателю достанутся 2 книги. Но с extraBufferCapacity = 1 читатель начнёт получать только книги, выпущенные после подписки, а в случае с replay = 1 ему будет доступна и 1 книга, выпущенная ранее.
Следовательно, extraBufferCapacity, как и следует из названия — это некий буфер для значений, в который попадают порции данных, полученные уже после момента подписки. И они станут доступны подписчику, когда он будет готов обрабатывать очередные данные. И да, документацию следует читать внимательнее.
Итоги экспериментов с книгами и небольшой бонус для самых любознательных
Давайте кратко обобщим всё, что нам удалось выяснить:
метод emit() — это suspend-функция, способна приостанавливать выполнение (читай: производство данных), пока подписчик не освободится от предыдущей эмиссии;
метод tryEmit() — обычная функция, имеет возвращаемое значение Boolean, которое символизирует успех предприятия по отправке сообщения. Если не задать для MutableSharedFlow() параметры replay или bufferCapacity, то данные до подписчика не доберутся. Использовать tryEmit() следует в случаях, когда приостановить производство данных не представляется возможным. Например, при обработке нажатий на элементы UI или ввод текста пользователем;
параметр replay: Int — именно такое количество данных получит подписчик, если до момента подписки уже были какие-либо эмиссии. Пример из реального мира: опаздавший студент заходит в аудиторию, преподаватель повторяет специально для него последний прозвучавший тезис лекции;
параметр bufferCapacity: Int — количество данных, которое будет «отложено» для подписчика после момента подписки, пока он будет разбираться с текущей порцией данных. На эмиссии данных до подписки параметр не распространяется, т.е. с количеством replay-элементов не суммируется. Пример из реального мира: клиент заходит в парикмахерскую, где есть только один мастер. Пока мастер стрижёт первого клиента, заходят ещё два. В нашей парикмахерской есть кофе-зона на 2 персоны, мы отправляем наших клиентов туда ожидать, пока освободится мастер. Т.е. bufferCapacity нашей парикмахерской будет 2.
Кажется, успели обсудить все запланированные моменты. Можно ли добавить еще что-то? Оказалось, что можно. При изучении исходников SharedFlow выяснилось, что метод tryEmit — это своего рода быстрый способ обновить значение, когда нет внешних препятствий. Авторы библиотеки корутин не скрывают этого, о чём свидетельствует комментарий прямо в коде класса SharedFlowImpl:

Следовательно, используя emit, мы так или иначе используем tryEmit, а в случае неудачи уже можем обращаться к механизму с прерыванием источника данных.
Можно ли сказать, что в таком случае можно просто использовать emit и радоваться жизни и стабильности приложения? Думаю что нет, ведь не всегда можно обновить источник данных из suspend-функции. Например, не будем же мы запускать корутину только ради того, чтобы прокинуть в поток данных клик от пользователя? Если у вас есть дополнительные мысли по поводу того, в каких случаях предпочтительна каждая функция, поделитесь в комментариях, пожалуйста.
Ниже я оставлю заготовку кода, который мы рассматривали в статье, если у вас возникнет желание тоже поэкспериментировать с потоками данных.
Код для экспериментов
private const val LOG_TAG = "TEST_FLOW_TAG"
private const val ONE_DAY = 100L
private const val TWO_DAYS = 200L
private const val ONE_MONTH = 3000L
class MainActivity : AppCompatActivity() {
private val bookService = MutableSharedFlow<Book>(replay = 1)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
launch {
publishNewBook()
}
launch {
subscribeFirstReader()
}
}
}
suspend fun publishNewBook() {
for (i in 1..5) {
val newBook = Book(i).apply {
Log.d(LOG_TAG,"publish new book part $i")
}
bookService.tryEmit(newBook)
delay(ONE_DAY)
}
}
suspend fun subscribeFirstReader() {
delay(TWO_DAYS)
bookService.collect { newBook ->
Log.d(LOG_TAG, "Reader №1 read book part №${newBook.part}")
delay(ONE_DAY)
}
}
}
class Book(
val part: Int
)P.S. Благодарю за рецензию и предварительное чтение статьи Алексея Грачёва, Абакара Магомедова и Арсения Шпилевого.
Кстати Абакар и Арсений пишут очень крутые статьи на Хабре:
статьи Абакара:
- Почему моё Android-приложение крашится?
- Погружаемся в недра Retrofit
- Корутины с точки зрения компилятора
статьи Арсения:
- Типобезопасная передача результатов между экранами в Compose с Jetpack Navigation
- Compose + Koin + Jetpack Navigation: что мы поняли за 2 года