Всем привет.
Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.
Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.
Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:
Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.
Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:
Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:
Получаем данные:
И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:
Заглянем теперь внутрь и посмотрим, как это реализовано.
К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.
И так, основным классом реализации является класс StateFlowImpl:
_state — атомик ссылка для хранения нашего состояния.
sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots
Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:
Плюс методы для смены состояний слота.
Каждый слот может быть в одном из состояний:
null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.
Рассмотрим, что происходит при установке нового значения:
Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.
Можно выделить несколько шагов:
Что же происходит в методе collect:
Основная задача — отправить начальное значение по умолчанию и ждать новые значения:
В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.
Спасибо.
Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.
Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.
Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:
public interface StateFlow<out T> : Flow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
/**
* The current value of this state flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
*/
public override var value: T
}
Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.
Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:
- Более простая и garbage-free внутренняя реализация.
- Необходимость наличия элемента по-умолчанию. Null также возможен.
- Разделение на read-only и read-write интерфейсы.
- Сравнение элементов через equality вместо сравнения ссылок.
Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:
class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Получаем данные:
lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:
Заглянем теперь внутрь и посмотрим, как это реализовано.
К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.
И так, основным классом реализации является класс StateFlowImpl:
private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue) // T | NULL
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0 // number of allocated (!free) slots
private var nextIndex = 0 // oracle for the next free slot index
. . .
}
_state — атомик ссылка для хранения нашего состояния.
sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots
Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:
private val _state = atomic<Any?>(null)
Плюс методы для смены состояний слота.
Каждый слот может быть в одном из состояний:
null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.
Рассмотрим, что происходит при установке нового значения:
public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return // Don't do anything if value is not changing
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
curSequence++ // make it odd
sequence = curSequence
} else {
// update is already in process, notify it, and return
sequence = curSequence + 2 // change sequence to notify, keep it odd
return
}
curSlots = slots // read current reference to collectors under lock
}
/*
Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
Loop until we're done firing all the changes. This is sort of simple flat combining that
ensures sequential firing of concurrent updates and avoids the storm of collector resumes
when updates happen concurrently from many threads.
*/
while (true) {
// Benign race on element read from array
for (col in curSlots) {
col?.makePending()
}
// check if the value was updated again while we were updating the old one
synchronized(this) {
if (sequence == curSequence) { // nothing changed, we are done
sequence = curSequence + 1 // make sequence even again
return // done
}
// reread everything for the next loop under the lock
curSequence = sequence
curSlots = slots
}
}
}
Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.
Можно выделить несколько шагов:
- Установка нового значения.
- Установка маркера sequence — в нечётное значение обозначающее, что мы уже в процессе апдейта.
- makePending() — установка всех состояний слотов(т.е. всех подключений) в PENDING — скоро отправим новое значение.
- Цикл проверки sequence == curSequence, что все задачи выполнены и установка sequence чётным числом.
Что же происходит в методе collect:
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
try {
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// Conflate value emissions using equality
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
Основная задача — отправить начальное значение по умолчанию и ждать новые значения:
- Создаём или переиспользуем слот нового подключения.
- Проверяем состояние на null или на смену состояния. Эмиттим новое значение.
- Проверяем, есть ли слоты готовые для обновления (PENDING состояние) и если нет — подвешиваем слот в ожидании новых значений.
В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.
Спасибо.
gabin8
вопрос — как запустить StateFlow, чтоб поведение было аналогично LiveData c привязкой к lifecycle?
Например, чтоб результат приходил только в промежутке между onStart/onStop, а если приходит в бекграунде, то постился автоматически при переходе в фореграунд?
agent10 Автор
Если кратко, то из коробки никак:)
Сам по себе StateFlow никак не связан с Андроид и не знает о жизненных циклах приложения.
Единственное, что может работать — если collect запускать на скоупе либо lifecycleScope либо на скоупе ViewModel. Тогда "отмена" подписки будет происходить сама на onDestroy/onCleared.
А чтобы сделать аналогично LiveData — только самим ручками, либо возможно Гугл сделает что-то с этим сам.
anegin
Так LiveData сам по себе тоже ничего не знает о сменах состояния lifecycle, пока ему явно не передашь LifecycleOwner, подписавшись на него вызовом метода observe().
Похожая (но немного перевернутая) ситуация и со StateFlow — сам StateFlow ничего не знает о lifecycle, но подписавшись на него вызовом collect() в рамках «нужного» coroutine-контекста мы обеспечим его правильную работу с lifecycle
agent10 Автор
Не совсем так.
StateFlow будет "знать" только о create/destroy. Загляните, например, внутрь реализации lifecycleScope.
LiveData же умеет не оповещать если в фоне..StateFlow такое сам не сможет, скойуп не закэнселится в этом случае.
anegin
Корутины тоже не стоят на месте) Вот так скоуп будет работать только в started-состоянии. Есть еще whenCreated{} и whenResumed{}
И да, StateFlow без coroutine-контекста не знает об изменениях состояния, так же как и LiveData без LifecycleOwner
agent10 Автор
Есть такое, но тут главное не путаться с "будет работать только в started-состоянии"
Лучше сказать, что стартанёт в started(created/resumed) состоянии, но cancel будет только в destroy.
anegin
Не только стартанет. С whenStarted корутина переходит в suspended-состояние, когда lifecycle уходит в onStop, и (State)Flow.collect() перестает получать изменения. Т.е. получаем поведение аналогичное LiveData
agent10 Автор
Вы правы, давно не заглядывал в исходник Lifecycle.
Только там хитрее. Там используется специальный PausingDispatcher через который проходят все события и они либо выполняются, либо если состояние lifecycle иное, то просто стопается вся очередь и копится, а потом отдаётся.
Думал, там ещё тоже самое что и с ViewModel. Гляну и её, может тоже наконец-то что-то изменилось и в ней..
gabin8
Спасибо. Т.е получается что данный код более гибкий:
Т.к. мы можем наблюдать значения только на скоупе onResume-onPause, или onCreate-onDestroy. А LiveData такого не умеет, так как по умолчанию она умеет слушать только в onStart-onStop.
P.S. ещё заметил интересную особенность StateFlow — когда постишь одно и то же значение несколько раз, то collect будет вызван только 1 раз
anegin
да, теперь не сработает костыль с liveData.value = liveData.value, чтобы дернуть подписчиков)
romansl
Если вам надо так делать, то скорее всего в коде что-то не так. Скорее всего нужен не стейт, а отдельная команда для обновления чего-то.
anegin
Я так и написал — костыль) С другой стороны, если вместо стейта использовать команду, то при пересоздании UI мы не получим предыдущий стейт
agent10 Автор
Да, там сравнивается через equality.