Всем привет.

Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.

Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.

Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.

Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:

  • Более простая и garbage-free внутренняя реализация.
  • Необходимость наличия элемента по-умолчанию. Null также возможен.
  • Разделение на read-only и read-write интерфейсы.
  • Сравнение элементов через equality вместо сравнения ссылок.

Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Получаем данные:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:



Заглянем теперь внутрь и посмотрим, как это реализовано.

К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.

И так, основным классом реализации является класс StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state — атомик ссылка для хранения нашего состояния.

sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots

Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:

private val _state = atomic<Any?>(null)

Плюс методы для смены состояний слота.

Каждый слот может быть в одном из состояний:

null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.

Рассмотрим, что происходит при установке нового значения:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.

Можно выделить несколько шагов:

  1. Установка нового значения.
  2. Установка маркера sequence — в нечётное значение обозначающее, что мы уже в процессе апдейта.
  3. makePending() — установка всех состояний слотов(т.е. всех подключений) в PENDING — скоро отправим новое значение.
  4. Цикл проверки sequence == curSequence, что все задачи выполнены и установка sequence чётным числом.

Что же происходит в методе collect:

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

Основная задача — отправить начальное значение по умолчанию и ждать новые значения:

  1. Создаём или переиспользуем слот нового подключения.
  2. Проверяем состояние на null или на смену состояния. Эмиттим новое значение.
  3. Проверяем, есть ли слоты готовые для обновления (PENDING состояние) и если нет — подвешиваем слот в ожидании новых значений.

В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.

Спасибо.