В этом посте исследована механика async await на языке Elixir. Принятая в Elixir модель конкурентности отлично подходит в качестве платформы для реализации такой механики. Тем не менее, не расценивайте этот пост как руководство для разработки реальных приложений на Elixir.

Код к этому посту выложен на GitHub

Введение


Многим из нас впервые довелось столкнуться с async await, когда мы изучали JavaScript. Поскольку в JavaScript предлагается одна из наиболее популярных реализаций этой техники, именно JavaScript-подход обычно закрепляется в памяти как базовый вариант async await. Поэтому мы невольно увязываем фундаментальные принципы асинхронного программирования с конкретной реализацией и конкретными решениями, принятыми в JavaScript.

Одно из наиболее распространённых заблуждений, связанных с async await таково, будто для этой парадигмы требуется неблокирующая однопоточная среда выполнения. Среда выполнения в JavaScript является неблокирующей, прежде всего, именно потому, что она однопоточная. Однако, модели асинхронного программирования вполне реализуемы как в однопоточных, так и в многопоточных средах выполнения, причём, как в блокирующем, так и в неблокирующем виде.

От Sync к Async


Путь от синхронному к асинхронному программированию – это, в сущности, переход от последовательного выполнения к конкурентному. В каждый момент времени мы будем выполнять не одну задачу, а несколько.

image

Попробуем рассуждать о синхронном и асинхронном программировании в терминах пар событий, складывающихся в процессе выполнения. Исходим из того, что в трассировке программы прослеживаются следующие интересующие нас события:

  • При синхронном программировании
oinvoke function и return value.
  • При асинхронном программировании
o invoke function и return promise,
o await promise и return value.

Синхронное выполнение


Синхронное выполнение — это ход в один конец, при котором между вызывающей и вызываемой стороной происходит взаимодействие вида «вызвать функцию и вернуть значение». Вызывающая сторона приостанавливает выполнение до тех пор, пока вызываемая сторона не завершит выполнение своей задачи и не вернёт значение (Рис. 1., слева).

Асинхронное выполнение и промисы


Асинхронное выполнение – это путь туда и обратно, два взаимодействия. Первая операция заключается в том, что мы вызываем и возвращаем промис, а вторая — в том, что мы ожидаем и возвращаем значение от взаимодействия между вызывающей и вызываемой стороной.

Промис может быть интерпретирован либо как представление вызываемой стороны, либо как представление будущего значения (футуры). Промис может находиться либо в подвешенном состоянии (это значит, что вызываемая сторона пока занята работой и не вернула значение), либо как выполненный — в последнем случае вызываемая сторона уже завершила работу и вернула значение.

Если промис оказался в подвешенном состоянии на этапе await, то все вызыватели прекращают выполнение и дожидаются, пока вызываемая сторона не завершит работу и не вернёт значение (рис. 1, в центре). Если промис завершается на этапе await, то вызывающая сторона продолжает работу с возвращённым значением (Рис. 1., справа).

Цикл событий


image


Среду выполнения async await принято называть «цикл событий» (Event Loop). Цикл событий — это планировщик, позволяющий асинхронно выполняемой программе зарегистрировать заинтересованность в конкретном событии. В рамках данной статьи нас интересует лишь завершение промиса. Когда регистрация произошла, выполнение приостанавливается. Когда промис завершён, возобновляется выполнение цикла событий.

Оставшаяся часть этой статьи рассказывает, как реализовать возможности async await и цикл событий на языке Elixir. Почему именно на Elixir? Во-первых, не составляет труда спроецировать async await на Elixir, получается отличная иллюстративная реализация. Даже важнее, что такое отображение async await на Elixir развеивает распространённый миф, будто механизм async await по природе своей неблокирующий. На самом деле можно отображать асинхронные операции на процессы Elixir и целенаправленно блокировать процесс, ожидая промиса.

Краткое введение в Elixir


Elixir — это функциональный язык программирования с динамической типизацией, выполняемый на виртуальной машине Erlang (BEAM).

Ключевая абстракция в языке Elixir называется «процесс». Это мельчайшая независимо выполняемая единица, обладающая уникальным идентификатором. Чтобы узнать идентификатор процесса, выполняемого в настоящий момент, нужно выполнить self(). Весь код выполняется в контексте этого процесса. Процессы в Elixir работают конкурентно, но каждый процесс выполняет свои инструкции последовательно.

# Создаём новый процесс – воспользовавшись идентификатором процесса, можно отправлять этому процессу сообщения 
pid = spawn(fn ->
  # Этот блок кода будет выполняться в отдельном процессе
end)

Процессы обмениваются информацией и координируют действия, обмениваясь сообщениями. Отправка сообщения — это неблокирующая операция, благодаря этому выполнение процесса может продолжаться.

# Отправить сообщение процессу с идентификатором pid (неблокирующая операция)
send(pid, {:Hello, "World"})

Напротив, получение сообщения — это блокирующая операция, и выполнение процесса будет приостановлено, пока не поступит подходящее сообщение.

# Получить сообщение (блокирующая операция)
receive do
  {:Hello, name} -> 
    IO.puts("Hello, #{name}!")
end

Пожалуй, наиболее популярная абстракция в Elixir — это GenServer. GenServer — это процесс, такой же, как и любой другой процесс в Elixir. GenServer абстрагирует весь шаблонный код, который нужен, чтобы построить сервер, работающий с сохранением состояния.

defmodule Counter do
  use GenServer

  # Клиентский API

  # Запускает GenServer
  def start_link() do
    GenServer.start_link(__MODULE__, 0, name: __MODULE__)
  end

  # Синхронный вызов, при помощи которого мы получаем текущее значение счётчика 
  def value do
    GenServer.call(__MODULE__, :value)
  end

  # Асинхронный вызов для инкремента счётчика
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end

  # Обратные вызовы сервера

  # Инициализируем GenServer с исходным значением
  def init(value) do
    {:ok, value}
  end
  
  # Обрабатываем синхронные вызовы
  def handle_call(:value, _from, state) do
    {:reply, state, state}
  end

  # Обрабатываем асинхронные сообщения
  def handle_cast(:increment, state) do
    {:noreply, state + 1}
  end
end

Async Await в Elixir


При помощи модуля Async Await разработчик может выразить конкурентную структуру вычислений, тогда как цикл событий реализует конкурентную структуру вычислений.

Отобразим асинхронное выполнение функции на процесс Elixir, выполняющий эту функцию. Воспользовавшись идентификатором процесса (pid), удобно ссылаться как на выполнение данного процесса, так и на промис, анонсирующий такое выполнение.

image

Мы стремимся выполнить нечто подобное:

# outer относится к pid внешнего процесса Elixir 
outer = Async.invoke(fn ->
 
  # inner относится к pid внутреннего процесса Elixir 
  inner = Async.invoke(fn ->

    42
  
  end)

  # Воспользуемся pid, и с его помощью будем ожидать внутреннего промиса 
  v = Async.await(inner)

  2 * v

end)

# Воспользуемся pid, и с его помощью будем ожидать внешнего промиса
IO.puts(Async.await(outer))

Библиотека


Начнём с простого компонента — библиотеки. Напомню, у нас есть всего два взаимодействия, при первом мы вызываем функцию и возвращаем промис, а во втором ожидаем промис и возвращаем значение. При операции Invoke (вызов) работа вызывающей стороны не приостанавливается, но при await может быть приостановлена, если промис пока не разрешился.

defmodule Async do
  def invoke(func, args \\ []) do
    # вызвать функцию, вернуть промис
    # ни в коем случае не заблокирует вызывателя
    GenServer.call(EventLoop, {:invoke, func, args})
  end

  def await(p) do
    # ожидать промис, вернуть значение
    # может заблокировать вызывателя
    GenServer.call(EventLoop, {:await, p})
  end
end

В терминах Elixir:

  • GenServer.call(EventLoop, {:invoke, func, args}) — это блокирующий вызов. Но, как видим, этот метод всегда возвращается сразу же, поэтому, он ни в коем случае не может приостановить работу вызывающей стороны.
  • GenServer.call(EventLoop, {:await, p}) — это блокирующий вызов. Как мы увидим ниже, функция не во всех случаях возвращается сразу же; следовательно, вызов может приостановить работу вызывающей стороны.

Цикл событий


Перейдём к более сложному компоненту — циклу событий.

Состояние


В цикле событий отслеживаются сущности двух видов: промисы и ожидатели.

%State{
  promises: %{#PID<0.269.0> => :pending, #PID<0.270.0> => {:completed, 42}},
  awaiters: %{
    #PID<0.269.0> => [
      # При помощи этой структуры данных можно отложить отклик на запрос 
      # см. GenServer.reply
      {#PID<0.152.0>,
       [:alias | #Reference<0.0.19459.4203495588.2524250117.118052>]}
    ],
    #PID<0.270.0> => []
  }
}

Промисы

promises ассоциирует идентификатор промиса со статусом асинхронного выполнения той функции, которая представлена в промисе. Промис может находиться в одном из двух состояний:

  • :pending, означает, что выполнение до сих пор продолжается, или
  • {:completed, result}, указывает, что выполнение завершилось с получением result.

Ожидатели

awaiters ассоциирует идентификатор промиса со списком идентификаторов выполнения, ожидающих, пока промис разрешится. Каждый pid в списке соответствует процессу, который выполнил операцию await над промисом и в настоящий момент приостановлен, дожидается, пока будет выполнен промис. Когда промис разрешится, ожидающие процессы об этом уведомляются, и их выполнение может продолжаться, но при этом уже с учётом результата промиса.

Поведение


При отслеживании актуального состояния каждой асинхронно выполняемой операции через промисы, а также зависимостей между операциями выполнения через ожидатели нам на помощь приходит цикл событий. Он помогает оркестровать конкурентное выполнение кода. Для этого нам потребуются всего три метода:

defmodule EventLoop do
  use GenServer

  alias State

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
  end

  def init(state) do
    {:ok, state}
  end

  def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
    # ...
  end

  def handle_call({:await, promise}, {caller, _} = from, state) do
    # ...
  end

  def handle_call({:return, callee, result}, {caller, _} = _from, state) do
    # ...
  end

end

Вызов

Метод invoke (вызов) порождает новый процесс Elixir и использует при этом идентификатор процесса, присвоенного callee как идентификатор промиса. Процесс выполняет функцию при помощи apply(func, args) а затем вызывает метод возврата, относящийся к циклу событий. С помощью этого метода возвращается результат функции, при помощи которого выполняется промис.

def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
    # Здесь мы используем id процесса, одновременно являющийся id промиса 
    callee =
      spawn(fn ->
        GenServer.call(EventLoop, {:return, self(), apply(func, args)})
      end)

    new_state =
      state
      |> State.add_promise(callee)

    {:reply, callee, new_state}
  end

Ожидание

Это вся суть цикла событий. При вызове await следует различать два случая:

  • Если промис разрешился, то мы сразу же отвечаем вызывающей стороне (не приостанавливая её работу) и возвращаем результат.
  • Если промис приостановлен, то немедленного ответа вызывающей стороне не следует (работа вызывающей стороны приостанавливается), и вызывающая сторона регистрируется как единица, ожидающая промис.

def handle_call({:await, promise}, {caller, _} = from, state) do
  # Центральный if-оператор
  case State.get_promise(state, promise) do
    # Промис приостановлен, отклик откладывается до его завершения 
    :pending ->
      new_state =
        state
        |> State.add_awaiter(promise, from)

      {:noreply, new_state}

    # Промис завершён, сразу же отвечаем
    {:completed, result} ->
      {:reply, result, state}
  end
end

Возврат

Когда процесс завершается, мы перебираем список ожидателей и откликаемся на запрос (возобновляя работу вызывающей стороны), а далее возвращаем результат. Кроме того, мы обновляем состояние промиса: из ожидающего он превращается в завершённый.

def handle_call({:return, callee, result}, {caller, _} = _from, state) do
    Enum.each(State.get_awaiter(state, callee), fn {cid, _} = caller ->
      GenServer.reply(caller, result)
    end)

    new_state =
      state
      |> State.set_promise(callee, result)

    {:reply, nil, new_state}
  end

Запуск приложения


Теперь всё готово к выполнению приложения. На случай, если вам не удастся сразу воспроизвести мои результаты, оставлю вам код в виде Elixir Livebook, он лежит на GitHub.

IO.inspect(self())

outer = Async.invoke(fn ->
  
  IO.inspect(self())

  inner = Async.invoke(fn ->

    IO.inspect(self())

    42
  
  end)

  v = Async.await(inner)

  2 * v

end)

IO.puts(Async.await(outer))

В результате работы приложения получим примерно такой вывод:

#PID<0.152.0>
#PID<0.269.0>
#PID<0.270.0>
84

Кроме того, посмотрите диаграмму сущностей (entity diagram) и диаграмму последовательностей (sequence diagram) на которых проиллюстрирована вся структура и все варианты поведения при выполнении функций и промисов.

Диаграмма сущностей

image


Диаграмма последовательностей

image


Обзор


В этой статье мы исследовали базовую механику async await, но этим данная тема не исчерпывается. Например, существуют механизмы комбинирования промисов, такие как Promise.all (дождаться, пока будут выполнены все промисы из списка) или Promise.one (дождаться выполнения хотя бы одного промиса из списка). Ещё одна интересная тема — связывание промисов, когда функция возвращает не значение, а промис. Эти темы изучите самостоятельно.

Заключение


Async await — это модель программирования, в которой конкурентность становится первостепенной сущностью. При помощи async await разработчику удобно выразить конкурентную структуру вычислений, в то время как выполнять эти вычисления будет цикл событий.

P.S.
Обращаем ваше внимание на то, что у нас на сайте проходит распродажа.

Комментарии (1)


  1. devunion
    24.03.2024 06:54

    Назовите статью правильно: "Механика Async Await в Elixir". Или можете сократить до "Механика". Так даже лучше получится.