Последнее время меня часто спрашивают о паттерне "fire-and-forget": как его применить в Cats Effect и какие потенциальные проблемы могут возникнуть. Поэтому я решил написать небольшой пост. Надеюсь, вам понравится!

Подробнее о Cats Effect и конкурентности читайте в моей книге Essential Effects.

Что значит «выстрелить и забыть»?

Чтобы понять принцип «выстрелить и забыть» (fire-and-forget), давайте сначала посмотрим на простой синхронный вызов метода или функции. При синхронной операции мы вызываем функцию и ожидаем, когда она вернет значение. Вычисления продолжаем только после получения результата.

def doSomething(i: Int): String = {
  println(s"[${Thread.currentThread.getName}] doSomething($i)")
  s"$i is a very nice number"
}

def doSomethingElse(): Int = {
  println(s"[${Thread.currentThread.getName}] doSomethingElse()")
  12
}

val doBoth = {
  val result = doSomething(12)
  println(s"[${Thread.currentThread.getName}] doSomething(12) produced: $result")
  doSomethingElse
}

Обратите внимание: все println выполняются в одном потоке.

Теперь давайте разбираться с «выстрелить и забыть». Что значит «выстрелить»? Выстрелить — это запустить вычисление без ожидания результата, т.е. асинхронно. (Название «выстрелить и забыть» пришло от военных — запуск ракеты.)

Какая должна быть сигнатура типа для такой асинхронной операции? Она требует входные данные, и сразу же возвращает управление в точку вызова. Но здесь мы не получаем «фактический» результат вычисления — только сообщаем: «Вычисления запущены, вот вам некое значение, которое позволит получить конечный результат, когда он будет вычислен». Смоделируем это в виде трейта с методом fire, который возвращает «конечный результат» EventualResult (в виде функции):

trait Asynchronous {
  type EventualResult[A] = () => A

  def fire[A](run: => A): EventualResult[A]
}

Этот интерфейс можно реализовать, используя тип Future:

object Asynchronous {
  import scala.concurrent._
  import scala.concurrent.duration._

  val global = 
    new Asynchronous {
      implicit val ec = scala.concurrent.ExecutionContext.global

      def fire[A](run: => A): EventualResult[A] = {
        val res = Promise[A].completeWith(Future(run)) // start the computation
        () => Await.result(res.future, Duration.Inf)   // wait on demand
      }
    }
}

Затем преобразуем наш синхронный код для использования интерфейса Asynchronous:

val doBothAsync = {
  val await = Asynchronous.global.fire(doSomething(12))
  val result = await()
  println(s"[${Thread.currentThread.getName}] doSomething(12) produced: $result")
  doSomethingElse
}

Обратите внимание, что println в doSomething и doSomethingElse выполняются в разных потоках.

Теперь  нам нужно "забыть" (forget). То есть после того, как мы начали вычисление (fire), мы ничего не делаем для обработки конечного результата.  Мы "забываем" (forget) его:

trait Asynchronous {
 type EventualResult[A] = () => A
 def fire[A](run: => A): EventualResult[A]
 def fireAndForget[A](run: => A): Unit =
   fire(run) // игнорируем результат
}
object Asynchronous {
 import scala.concurrent.
 import scala.concurrent.duration.
 val global =
   new Asynchronous {
     implicit val ec = scala.concurrent.ExecutionContext.global
     def fire[A](run: => A): EventualResult[A] = {
       val res = PromiseA.completeWith(Future(run)) // старт вычисления
       () => Await.result(res.future, Duration.Inf) // ожидание
     }
   }
}

Метод fireAndForget вызывает метод fire, запускающий асинхронное вычисление, но игнорирующий возвращаемый EventualResult. То есть мы вообще не ждем никакого результата.

Давайте обновим наш пример:

val doBothFireAndForget = {
  Asynchronous.global.fireAndForget(doSomething(12))
  doSomethingElse
}

Обратите внимание, что здесь такое же асинхронное выполнение, как и в примере с "только fire", но мы не выводим никаких промежуточных результатов, потому что решили их не ждать.

Используем Cats Effect

Для реализации паттерна "выстрелить и забыть" в Cats Effect необходимо выполнить следующее:

  1. Определить синхронное вычисление как эффект.

  2. Запустить выполнение эффекта асинхронно ("выстрелить").

  3. Игнорировать возвращаемое значение ("забыть").

Давайте сделаем это:

import cats.effect._
import cats.implicits._

class Something extends IOApp {

  def run(args: List[String]): IO[ExitCode] =
    doBoth.as(ExitCode.Success)

  def doSomething(i: Int): IO[String] = // <1>
    for {
      _ <- IO(println(s"[${Thread.currentThread.getName}] doSomething($i)"))
    } yield s"$i is a very nice number"

  def doSomethingElse(): IO[Int] =
    for {
      _ <- IO(println(s"[${Thread.currentThread.getName}] doSomethingElse()"))
    } yield 12

  val doBoth: IO[Int] =
    for {
      _ <- doSomething(12).start // <2> <3>
      i <- doSomethingElse
    } yield i
}

new Something().main(Array.empty)
  1. Мы изменили тип возвращаемого значения со String на IO[String], чтобы определить (синхронный) эффект.

  2. И запустили эффект IO асинхронно с помощью start. Он возвращает тип IO[Fiber[IO, String]] (Cats Effect 3 — IO[FiberIO[String]]), который позволяет управлять асинхронным выполнением и получением конечного результата.

  3. Мы также игнорируем Fiber, созданный start, "анонимизируя" его имя как .

Безопасно ли «забыть»?

Когда мы "стреляли и забывали" с помощью Cats Effect, мы намеренно не отслеживали асинхронную операцию. Но безопасно ли это?

Чтобы ответить на этот вопрос, давайте вспомним, какие операции есть у файбера: можно дождаться его завершения (с помощью join) или отменить (с помощью cancel). Поскольку мы ожидаем асинхронного выполнения, то join нам не нужен, и мы поговорим о cancel. Потребуется ли нам когда-нибудь отменять вычисление, которое мы "запустили и забыли"? 

Несомненно, нам это пригодится! Задача может быть очень долгоживущей. Например, цикл обработки событий. Здесь вы можете подумать: "О, этот эффект выполняется вечно, его не нужно отменять", поэтому просто "выстрелим и забудем". Но отмена может потребоваться! Асинхронный эффект существует в некотором контексте и важно убедиться, что он отменяется при отмене его "родительского" контекста:

val businessLogic =
  for {
    _ <- longLivedEffect.start
    _ <- doSomeOtherStuff
  } yield ()

В этом примере после завершения businessLogic, возможно, стоит отменить и longLivedEffect, но подход "выстрелить и забыть" исключает такую возможность. Нам нужно как-то связать время жизни этого долгоживущего эффекта с окружающей бизнес-логикой. Один из вариантов — использовать guarantee:

val businessLogic =
  for {
    fiber <- longLivedEffect.start
    _ <- doSomeOtherStuff.guarantee(fiber.cancel) // <1>
  } yield ()
  1. Отмена файбера при успешном завершении дополнительной бизнес-логики, а также при ошибке или отмене.

Но использование guarantee не масштабируется на несколько эффектов. Например, мы хотим, чтобы файбер отменялся, если отменяется какой-либо эффект внутри businessLogic. В такой ситуации мы можем использовать Resource. Resource — структура, которая управляет аллокацией и деаллокацией ресурсов:

val longLived: Resource[IO, IO[Unit]] =
  longLivedEffect.background // <1>

val businessLogic =
  longLived.use { join => // <2>
    doSomeOtherStuff
  }
  1. Вместо start мы используем метод background, возвращающий Resource, который управляет асинхронным файбером. Управляемое состояние само по себе является действием IO[Unit], которое позволяет вам при необходимости выполнить join к нижележащему файберу.

  2. Используя Resource через use, мы получаем доступ к управляемому состоянию, где можем выполнять другие эффекты. Он определяет статическую область, в которой доступно состояние, при этом за пределами этой области гарантируется корректное получение и освобождение состояния. В случае ресурса, возвращаемого background, эффект получения состояния — это "запуск файбера", а эффект освобождения — "отмена файбера".

Резюме

Паттерн «выстрелить и забыть» (fire-and-forget) состоит из трех частей:

  1. Определение синхронного эффекта.

  2. Асинхронный запуск эффекта.

  3. Контроль асинхронного выполнения эффекта: ожидание результата или отмена.

Вы можете смело запускать и забывать короткоживущие асинхронные эффекты! Но помните, что выполнение всегда происходит в каком-то контексте, и у вас есть такие методы, как background, для контроля асинхронного жизненного цикла.


Материал подготовлен в рамках курса «Scala-разработчик».

Всех желающих приглашаем на demo-занятие «Scala и парсер-комбинаторы». На занятии мы:
— Познакомимся с парсер-комбинаторами на Scala;
— Будем парсить описание REST API написанное с помощью markdown.
>> РЕГИСТРАЦИЯ

Комментарии (0)