Что это и зачем
К сожалению, простое последовательное комбинирование 2х и более асинхронных операций (загрузить конфиг, а потом картинку по адресу из конфига как пример) — это все, на что способны обычные Future-ы в качестве монад. Они не позволяют ни сохранять состояние, ни делать циклы из асинхронных операций, ни выдавать несколько (или бесконечно много) значений. Вот этими недостатками мы сейчас и займемся.
Давайте для определенности представим некий виджет. Он ждет конфиг, который обновляется с определенной периодичностью, загружает по адресу из конфига значение (например, температуру), и рисует на экране текущее значение, минимум, максимум, среднее и так далее. И все это делает в цикле, да еще и асинхронно.
Применив знания из этой статьи, мы сможем этот процесс описать примерно так:
// Про 'FState' - далее, пока же просто примем, что это - такая необычная Future
def getNextConfig: FState[Config]
def getTemperature(from: String): FState[Int]
case class State(temperature: Int, sumTemp: Long, count: Int) {
def isGood = ...
}
// Как видим, получается единый асинхронный алгоритм с состоянием,
// которое извне этого алгоритма не видно
val handle =
while_ ( _.isGood)
{ for (
config <- getNextConfig();
if (config.isDefined); // пустой конфиг - прекращаем выполнение
nextValue <- getTemperature(config().source); // грузим значение температуры
state <- gets[State]; // тут мы берем текущее состояние
newState = State(nextValue, state.sumTemp + nextValue, state.count + 1);
_ <- puts(newState); // .. и меняем его
_ <- runInUiThread { drawOnScreen(newState) }
) yield() }
Или вот так:
val configs: AsyncStream[Config] = ... // получаем откуда-то stream конфигов
def getTemperature(from: String): FState[Int]
case class State(temperature: Int, sumTemp: Long, count: Int)
// Получается то же самое, только вместо зависимости 'getNextConfig'
// мы, по сути, передаем сами данные - stream из конфигов
val handle =
foreach(configs) {
config => for (
nextValue <- getTemperature(config().source); // грузим значение температуры
state <- gets[State]; // тут мы берем текущее состояние
newState = State(nextValue, state.sumTemp + nextValue, state.count + 1);
_ <- puts(newState); // .. и меняем его
_ <- runInUiThread { drawOnScreen(newState) }
) yield()
}
Всех, кто заинтересовался, прошу под кат.
Stateful asynchronous computations
Это такая Future, которая позволяет сохранять и менять состояние псевдоалгоритма внутри for-конструкции, те самые gets[State] и puts[State]. И то, что придает этому самому псевдоалгоритму некую корутинообразность.
Давайте рассмотрим вот такую вот интересную сущность:
// S - тип внешнего состояния, A - тип результата
case class FState[S, +A](func: S => Future[(A, S)]) {
def apply(s: S) = func(s)
}
Как видим, это простая обертка над функцией, принимающей текущее состояние и возвращающей Future на результат в паре с новым состоянием. Вид этой сущности получен простым комбинированием монад Future и State (а то, что мы сейчас делаем, называется монадным трансформером).
Давайте научим эту сущность быть монадой. В принципе, нам достаточно определить у этой сущности операции unit и flatMap (а еще map, которая выражается через первые две), но мы пойдем сразу тернистым путем scalaz, и получим бонусом целую алгебру операций, определенную в терминах вот этих двух.
class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] {
type F[X] = FState[S, X]
override def point[A](a: => A): F[A]
override def bind[A, B](m: F[A])(f: A => F[B]): F[B]
}
class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] {
type F[X] = FState[S, X]
override def point[A](a: => A): F[A] = FState((s: S) => Future((a, s)))
override def bind[A, B](m: F[A])(f: A => F[B]): F[B] =
FState((s: S) => m(s) flatMap { pair => f(pair._1)(pair._2) })
}
Например, мы только что бесплатно получили вот такую совершенно великолепную операцию:
// Цикл с монадным условием и монадным телом - красота!
def whileM_[A](p: F[Boolean], body: => F[A]): F[Unit]
А как же менять состояние внутри псевдоалгоритма? Мы можем заметить, что в комбинаторе bind оно не фигурирует. Пишем:
def gets[S](): FState[S, S]
def puts[S](news: S): FState[S, S]
// Как видим, чтобы нам получить текущее состояние, его достаточно
// просто "поднять" в значение.
def gets[S](): FState[S, S] = FState((s: S) => Future((s, s)))
// А изменить состояние еще проще
def puts[S](news: S): FState[S, S] = FState((_: S) => Future((news, news)))
Вот, в принципе, и все! Теперь мы можем написать что-то вроде:
implicit val m = new FStateMonad[Int] // состояние - обычный Int
// И эта жуть просто считает до 10...
val algo = for(
_ <- m.whileM_(gets[Int] map (_ < 10), for(
i <- gets[Int];
_ <- puts(i + 1)
) yield(()));
v1 <- gets[Int]
) yield (v1)
// algo(0)() should be ((10, 10))
Но мы можем написать себе столько синтаксического сахара, сколько захотим, и закончить примерно на этом:
implicit val m = new FStateMonad[Int] // состояние - обычный Int
val algo = for(
// почти обычный for, с условием, модификатором состояния и телом
_ <- m.forM_ ( _ < 10, _ + 1) {
// тут любой FState
};
v1 <- gets[Int]
) yield (v1)
// algo(0)() should be ((10, 10))
Асинхронные stream-ы
Это как раз такая Future-а, которая способна возвращать больше одного значения асинхронно. Во втором примере в начале статьи мы просто по этому stream-у итерируемся stateful-псевдоалгоритмом. Однако со stream-ом можно делать еще несколько разных интересностей, но давайте по порядку. Начнем с построения stream-а.
Что такое stream? Упрощенно — список с хвостом, вычисляемым лениво. Вследствие этого может быть бесконечным. Наш асинхронный AsyncStream будет выглядеть примерно так:
class Pair[A, B](fp: A, sp: => B) {
val first = fp
lazy val second = sp
}
object Pair {
def apply[A, B](first: A, second: => B) = new Pair[A, B](first, second)
}
// Конец stream-а обозначим как Future(null), это обязательно должна быть
// какая-то конечная Future-а, возвращающая какое-то пустое значение.
// Мы не будем возиться тут с Option-ом, а возьмем просто null.
case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]])
Итак, вроде бы все просто: асинхронно возвращаем значение и хвост, а ленивость хвоста должна уберечь нас от stackoverflow при пользовании бесконечными stream-ами.
Но stream-ы обладают одной очень притягательной особенностью: их можно сворачивать! Пишем:
// Почти стандартный foldLeft, только возвращает Future[B]
// - другого мы ничего вернуть не сможем
def foldLeft[B](start: B)(f: (B, A) => B): Future[B]
def foldLeft[B](start: B)(f: (B, A) => B): Future[B] = {
// Стандартно, определим функцию с аккумулятором, которой
// потом передадим data
def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
d flatMap (pair => {
if (pair eq null) acc
else impl(pair.second.data, acc map (b => f(b, pair.first)))
})
impl(data, Future(start))
}
Можно еще вот так свернуть:
def flatten : Future[List[A]]
def flatten : Future[List[A]] =
foldLeft[List[A]](Nil)((list, el) => el :: list) map (_.reverse)
Еще парочка полезных функций, в основном для работы с бесконечными stream-ами:
def takeWhile(p: A => Boolean): AsyncStream[A]
def take(n: Int): AsyncStream[A]
def takeWhile(p: A => Boolean): AsyncStream[A] =
new AsyncStream[A](data map (pair => {
if (pair eq null) null
else if (!p(pair.first)) null
else Pair(pair.first, pair.second.takeWhile(p))
}))
def take(n: Int): AsyncStream[A] =
if (n <= 0) nil
else new AsyncStream[A](data map (pair => {
if (pair eq null) null
else Pair(pair.first, pair.second.take(n - 1))
}))
Сворачивать-то мы научились, а вот строить такой stream пока что неудобно. Исправим это и напишем универсальный генератор stream-а:
// 'gen' и является функцией-генератором, она должна возвратить
// Future(null), если больше значений сгенерировать не получается
def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A]
def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] =
new AsyncStream[A](
gen(start) match {
case _: NoFuture => Future(null)
case future => future map (pair => { // Future[Pair[A, AsyncStream]]
if (pair eq null) null
else Pair(pair._1, genAsyncStream(pair._2)(gen))
})})
Кстати, с помощью foldLeft + genAsyncStream stream-ы можно копировать.
Stream-ы можно соединять:
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A]
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] =
new AsyncStream[A](s1.data flatMap (pair => {
if (pair eq null) s2.data
else Future(Pair(pair.first, concat(pair.second, s2)))
}))
А еще асинхронный stream — это тоже монада:
class AsyncStreamMonad extends Monad[AsyncStream] {
override def point[A](a: => A): AsyncStream[A]
override def bind[A, B](
ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B]
}
class AsyncStreamMonad extends Monad[AsyncStream] {
override def point[A](a: => A): AsyncStream[A] = unit(a)
override def bind[A, B](
ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] =
new AsyncStream[B](ma.data flatMap (pair => {
if (pair eq null) Future(null)
else f(pair.first).data map (
pair2 => Pair(pair2.first, concat(pair2.second, bind(pair.second)(f))))
}))
}
В основном, построение асинхронного stream-а на этом можно закончить.
FState + AsyncStream = ?
На самом деле, они прекрасно уживаются. Давайте посмотрим на функцию-генератор в genAsyncStream, ничего не напоминает? Да это же FState!
А теперь давайте научимся итерироваться по stream-у:
def foreach[A, S]
(stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit]
// Сделаем это foldLeft-ом
def foreach[A, S]
(stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] =
FState(s => {
stream.foldLeft(Future(s))(
(futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2))).flatten.map( ((), _) )
})
То есть мы совершенно спокойно можем написать генератор, который yield-ит значения в асинхронный stream, потом этот stream куда-то передать, и по нему уже итерироваться другим алгоритмом — довольно удобно, имхо, когда надо передать данные из одного модуля программы в другой и не хочется внедрять зависимости.
Итог
Как итог, мы получили пару сущностей, расширяющие возможности обычных Future-ов. Надеюсь, было интересно.
Спасибо за внимание)
Код здесь.
Комментарии (12)
mityukov
11.04.2016 16:27>Future — это сущность, описывающая результат некоторых вычислений, который мы получим не сразу, но в будущем.
По-моему, такая штука называется Promise. Хотя, возможно этот шаблон принято называть по-разному в разных ЯП.iboltaev
11.04.2016 16:36+2да нет, это Future. Очень грубо разница между Future и Promise: в Promise пишем, из Future читаем. Словосочетание «получим результат» больше на чтение намекает.
grossws
11.04.2016 18:56Местами выглядит как переизобретение reactive streams и akka streams, да)
PHmaster
12.04.2016 02:14Или даже reactivex.io (там есть не только RxScala/RxJava, но и JS, и .NET, и Swift).
Saffron
12.04.2016 10:45Совершенно не похоже. Указанные *-streams, например, имеют контроль back-pressure, и построены не на функциональном, а на реактивном принципе. Для полноты сравнения ещё можно посмотреть императивный подход: github.com/scala/async
iboltaev
12.04.2016 11:13насчет back-pressure, буферизации и прочих нужных вещей не заморачивался, тут чисто построение показано и минимальный api. Предполагается, что тот, кому это нужно, реализует все это в функции-генераторе gen:
def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A]
iboltaev
12.04.2016 11:08на reactive streams с первого взгляда вроде не очень похоже, akka streams — вроде да)
Только тут не только стримы, еще и stateful async computation до кучи, ну и в связке это все показаноgrossws
12.04.2016 12:03В akka-streams по аналогии есть materealized values, которые позволяют делать хранить дополнительный state и обмениваться им.
mafia8
12.04.2016 12:46В начале был пример про виджет с температурой, потом пошёл код. Этот код относится к этому примеру или к другой ситуации? К какой? И какую задачу выполняют эти сущности (Future)? Что было бы без этих сущностей?
iboltaev
12.04.2016 13:23+21. Да, относится. Код показывает, как с помощью простой Future построить эти 2 сущности, с помощью которых был написан пример про виджет.
2. Future выполняют задачу отложенного предоставления результата вычислений. FState позволяет писать асинхронные псевдоалгоритмы с состоянием. AsyncStream позволяет формировать асинхронные последовательности значений.
3. Без Future будет плохо) Без вот этих конкретных FState и AsyncStream мы как-то живем и сейчас
ImLiar
А почему не использовать StateT из scalaz или cats?
iboltaev
можно и их. Пока так.