Последнее время меня часто спрашивают о паттерне "fire-and-forget": как его применить в Cats Effect и какие потенциальные проблемы могут возникнуть. Поэтому я решил написать небольшой пост. Надеюсь, вам понравится!
Подробнее о Cats Effect и конкурентности читайте в моей книге Essential Effects.
Что значит «выстрелить и забыть»?
Чтобы понять принцип «выстрелить и забыть» (fire-and-forget), давайте сначала посмотрим на простой синхронный вызов метода или функции. При синхронной операции мы вызываем функцию и ожидаем, когда она вернет значение. Вычисления продолжаем только после получения результата.
def doSomething(i: Int): String = {
println(s"[${Thread.currentThread.getName}] doSomething($i)")
s"$i is a very nice number"
}
def doSomethingElse(): Int = {
println(s"[${Thread.currentThread.getName}] doSomethingElse()")
12
}
val doBoth = {
val result = doSomething(12)
println(s"[${Thread.currentThread.getName}] doSomething(12) produced: $result")
doSomethingElse
}
Обратите внимание: все println
выполняются в одном потоке.
Теперь давайте разбираться с «выстрелить и забыть». Что значит «выстрелить»? Выстрелить — это запустить вычисление без ожидания результата, т.е. асинхронно. (Название «выстрелить и забыть» пришло от военных — запуск ракеты.)
Какая должна быть сигнатура типа для такой асинхронной операции? Она требует входные данные, и сразу же возвращает управление в точку вызова. Но здесь мы не получаем «фактический» результат вычисления — только сообщаем: «Вычисления запущены, вот вам некое значение, которое позволит получить конечный результат, когда он будет вычислен». Смоделируем это в виде трейта с методом fire, который возвращает «конечный результат» EventualResult
(в виде функции):
trait Asynchronous {
type EventualResult[A] = () => A
def fire[A](run: => A): EventualResult[A]
}
Этот интерфейс можно реализовать, используя тип Future
:
object Asynchronous {
import scala.concurrent._
import scala.concurrent.duration._
val global =
new Asynchronous {
implicit val ec = scala.concurrent.ExecutionContext.global
def fire[A](run: => A): EventualResult[A] = {
val res = Promise[A].completeWith(Future(run)) // start the computation
() => Await.result(res.future, Duration.Inf) // wait on demand
}
}
}
Затем преобразуем наш синхронный код для использования интерфейса Asynchronous
:
val doBothAsync = {
val await = Asynchronous.global.fire(doSomething(12))
val result = await()
println(s"[${Thread.currentThread.getName}] doSomething(12) produced: $result")
doSomethingElse
}
Обратите внимание, что println
в doSomething
и doSomethingElse
выполняются в разных потоках.
Теперь нам нужно "забыть" (forget). То есть после того, как мы начали вычисление (fire), мы ничего не делаем для обработки конечного результата. Мы "забываем" (forget) его:
trait Asynchronous {
type EventualResult[A] = () => A
def fire[A](run: => A): EventualResult[A]
def fireAndForget[A](run: => A): Unit =
fire(run) // игнорируем результат
}
object Asynchronous {
import scala.concurrent.
import scala.concurrent.duration.
val global =
new Asynchronous {
implicit val ec = scala.concurrent.ExecutionContext.global
def fire[A](run: => A): EventualResult[A] = {
val res = PromiseA.completeWith(Future(run)) // старт вычисления
() => Await.result(res.future, Duration.Inf) // ожидание
}
}
}
Метод fireAndForget
вызывает метод fire
, запускающий асинхронное вычисление, но игнорирующий возвращаемый EventualResult
. То есть мы вообще не ждем никакого результата.
Давайте обновим наш пример:
val doBothFireAndForget = {
Asynchronous.global.fireAndForget(doSomething(12))
doSomethingElse
}
Обратите внимание, что здесь такое же асинхронное выполнение, как и в примере с "только fire", но мы не выводим никаких промежуточных результатов, потому что решили их не ждать.
Используем Cats Effect
Для реализации паттерна "выстрелить и забыть" в Cats Effect необходимо выполнить следующее:
Определить синхронное вычисление как эффект.
Запустить выполнение эффекта асинхронно ("выстрелить").
Игнорировать возвращаемое значение ("забыть").
Давайте сделаем это:
import cats.effect._
import cats.implicits._
class Something extends IOApp {
def run(args: List[String]): IO[ExitCode] =
doBoth.as(ExitCode.Success)
def doSomething(i: Int): IO[String] = // <1>
for {
_ <- IO(println(s"[${Thread.currentThread.getName}] doSomething($i)"))
} yield s"$i is a very nice number"
def doSomethingElse(): IO[Int] =
for {
_ <- IO(println(s"[${Thread.currentThread.getName}] doSomethingElse()"))
} yield 12
val doBoth: IO[Int] =
for {
_ <- doSomething(12).start // <2> <3>
i <- doSomethingElse
} yield i
}
new Something().main(Array.empty)
Мы изменили тип возвращаемого значения со
String
наIO[String]
, чтобы определить (синхронный) эффект.И запустили эффект IO асинхронно с помощью start. Он возвращает тип
IO[Fiber[IO, String]]
(Cats Effect 3 — IO[FiberIO[String]]
), который позволяет управлять асинхронным выполнением и получением конечного результата.Мы также игнорируем
Fiber
, созданныйstart
, "анонимизируя" его имя как .
Безопасно ли «забыть»?
Когда мы "стреляли и забывали" с помощью Cats Effect, мы намеренно не отслеживали асинхронную операцию. Но безопасно ли это?
Чтобы ответить на этот вопрос, давайте вспомним, какие операции есть у файбера: можно дождаться его завершения (с помощью join
) или отменить (с помощью cancel
). Поскольку мы ожидаем асинхронного выполнения, то join
нам не нужен, и мы поговорим о cancel
. Потребуется ли нам когда-нибудь отменять вычисление, которое мы "запустили и забыли"?
Несомненно, нам это пригодится! Задача может быть очень долгоживущей. Например, цикл обработки событий. Здесь вы можете подумать: "О, этот эффект выполняется вечно, его не нужно отменять", поэтому просто "выстрелим и забудем". Но отмена может потребоваться! Асинхронный эффект существует в некотором контексте и важно убедиться, что он отменяется при отмене его "родительского" контекста:
val businessLogic =
for {
_ <- longLivedEffect.start
_ <- doSomeOtherStuff
} yield ()
В этом примере после завершения businessLogic
, возможно, стоит отменить и longLivedEffect
, но подход "выстрелить и забыть" исключает такую возможность. Нам нужно как-то связать время жизни этого долгоживущего эффекта с окружающей бизнес-логикой. Один из вариантов — использовать guarantee
:
val businessLogic =
for {
fiber <- longLivedEffect.start
_ <- doSomeOtherStuff.guarantee(fiber.cancel) // <1>
} yield ()
Отмена файбера при успешном завершении дополнительной бизнес-логики, а также при ошибке или отмене.
Но использование guarantee не масштабируется на несколько эффектов. Например, мы хотим, чтобы файбер отменялся, если отменяется какой-либо эффект внутри businessLogic
. В такой ситуации мы можем использовать Resource. Resource — структура, которая управляет аллокацией и деаллокацией ресурсов:
val longLived: Resource[IO, IO[Unit]] =
longLivedEffect.background // <1>
val businessLogic =
longLived.use { join => // <2>
doSomeOtherStuff
}
Вместо
start
мы используем методbackground
, возвращающийResource
, который управляет асинхронным файбером. Управляемое состояние само по себе является действиемIO[Unit]
, которое позволяет вам при необходимости выполнить join к нижележащему файберу.Используя
Resource
черезuse
, мы получаем доступ к управляемому состоянию, где можем выполнять другие эффекты. Он определяет статическую область, в которой доступно состояние, при этом за пределами этой области гарантируется корректное получение и освобождение состояния. В случае ресурса, возвращаемогоbackground
, эффект получения состояния — это "запуск файбера", а эффект освобождения — "отмена файбера".
Резюме
Паттерн «выстрелить и забыть» (fire-and-forget) состоит из трех частей:
Определение синхронного эффекта.
Асинхронный запуск эффекта.
Контроль асинхронного выполнения эффекта: ожидание результата или отмена.
Вы можете смело запускать и забывать короткоживущие асинхронные эффекты! Но помните, что выполнение всегда происходит в каком-то контексте, и у вас есть такие методы, как background
, для контроля асинхронного жизненного цикла.
Материал подготовлен в рамках курса «Scala-разработчик».
Всех желающих приглашаем на demo-занятие «Scala и парсер-комбинаторы». На занятии мы:
— Познакомимся с парсер-комбинаторами на Scala;
— Будем парсить описание REST API написанное с помощью markdown.
>> РЕГИСТРАЦИЯ