Процессы в Elixir
(ну и в Erlang
конечно же) идентифицируются с помощью уникального идентификатора процесса — pid.
Мы используем их, чтобы взаимодействовать с процессами. Сообщения посылаются как бы в pid
, а виртуальная машина сама заботится о доставке этих сообщений в правильный процесс.
Иногда, впрочем, чрезмерное доверие к pid
может приводить к значительным проблемам.
К примеру, мы можем хранить pid
уже мёртвого процесса, или мы можем использовать Supervisor
, который абстрагирует создание процессов от нас, поэтому мы даже не знаем, какой у них pid
(пер: а ещё Supervisor
можете перезапустить упавший процесс с другим pid
, и мы об этом не узнаем никак).
Давайте создадим простое приложение и посмотрим: с какими проблемами мы можем столкнуться и как мы эти проблемы будем решать.
Начинаем вообще без реестра
Для первого примера — создадим простой чат. Начнём с создания mix
проекта:
$ mix new chat
Создадим абсолютно стандартный GenServer
, который будем использовать на протяжении всех примеров в этом статье:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link do
GenServer.start_link(__MODULE__, [])
end
def add_message(pid, message) do
GenServer.cast(pid, {:add_message, message})
end
def get_messages(pid) do
GenServer.call(pid, :get_messages)
end
# SERVER
def init(messages) do
{:ok, messages}
end
def handle_cast({:add_message, new_message}, messages) do
{:noreply, [new_message | messages]}
end
def handle_call(:get_messages, _from, messages) do
{:reply, messages, messages}
end
end
Если такой код кажется вам незнакомым или не понятным — почитайте начало работы с Elixir
, в котором есть отличные параграфы об OTP.
Запустим iex
сессию с mix
окружением и попробуем поработать с нашим сервером:
$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Server.add_message(pid, "foo")
:ok
iex> Chat.Server.add_message(pid, "bar")
:ok
iex> Chat.Server.get_messages(pid)
["bar", "foo"]
Код этого этапа — вот в этом коммите
На этом этапе вроде как всё так хорошо, что просто замечательно. Мы получаем pid
процесса, затем для каждого сообщения, которое мы хотим послать (add_message/2
и get_messages/1
) мы передаём этот pid
— и всё работает настолько предсказуемо, что даже скучно.
Впрочем, веселуха начинается тогда, когда мы попробуем добавить Supervisor
...
Очень приятно: я — Supervisor
!
Итак, по какой-то причине наш процесс Chat.Server
умирает. Мы остаёмся одни в пустой и холодной iex
сессии, и у нас нету другого выбора, кроме как запустить новый процесс, получить его pid
и писать сообщения уже на этот новый pid
. Так давайте же создадим Supervisor
— и нам не придётся беспокоится о таких мелочах!
# ./lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
supervise(children, strategy: :one_for_one)
end
end
Ну, создать Supervisor
очень просто. Но у нас теперь проблема, если модель поведения нашего сервера не изменится. Ведь мы не запускаем процесс Chat.Server
сами, Supervisor
делает это за нас. И поэтому мы не имеем никакого доступа к pid
процесса!
Эта не баг, а фича такого OTP
паттерна, как Supervisor
. Мы не можем получить доступ к pid
его дочерних процессов, потому что он может неожиданно (но, естественно, только в случае необходимости) перезапустить процесс, а фактически убить его и создать новый с новым pid
.
Регистрируем имена процессов
Чтобы получить доступ к нашему процессу Chat.Server
нам нужно придумать способ указывать на процесс, другой — не pid
. нам нужен такой указатель, чтобы он сохранялся даже при рестарте процесса через Supervisor
(пер: то есть даже тогда, когда меняется pid
).
И такой указатель называется имя
!
Для начала, изменим Chat.Server
:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
def start_link do
# We now start the GenServer with a `name` option.
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
# And our function doesn't need to receive the pid anymore,
# as we can reference the process with its unique name.
def add_message(message) do
GenServer.cast(:chat_room, {:add_message, message})
end
def get_messages do
GenServer.call(:chat_room, :get_messages)
end
# ...
end
Изменения — вот в этом коммите
Сейчас всё должно работать так же, но только лучше — ведь мы не должны передавать везде этот pid
:
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.94.0>}
iex> Chat.Server.add_message("foo")
:ok
iex> Chat.Server.add_message("bar")
:ok
iex> Chat.Server.get_messages
["bar", "foo"]
Даже если процесс перезапустится — всё равно мы сможем обратиться к нему тем же способом:
iex> Process.whereis(:chat_room)
#PID<0.111.0>
iex> Process.whereis(:chat_room) |> Process.exit(:kill)
true
iex> Process.whereis(:chat_room)
#PID<0.114.0>
iex> Chat.Server.add_message "foo"
:ok
iex> Chat.Server.get_messages
["foo"]
Ну, для наших текущих задач вроде бы проблемы решены, но давайте попробуем сделать что-нибудь посложнее (и более приближённое к реальным задачам).
Динамическое создание процессов
Представьте, что нам необходимо поддерживать несколько чат-комнат. Клиент может создать новую комнату с именем, и он ожидает, что сможет посылать сообщения в ту комнату, которую он хочет. Тогда интерфейс должен быть приблизительно такой:
iex> Chat.Supervisor.start_room("first room")
iex> Chat.Supervisor.start_room("second room")
iex> Chat.Server.add_message("first room", "foo")
iex> Chat.Server.add_message("second room", "bar")
iex> Chat.Server.get_messages("first room")
["foo"]
iex> Chat.Server.get_messages("second room")
["bar"]
Начнём пожалуй сверху, и изменим Supervisor
чтобы всё это поддерживать:
# ./lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
# We are now registering our supervisor process with a name
# so we can reference it in the `start_room/1` function
Supervisor.start_link(__MODULE__, [], name: :chat_supervisor)
end
def start_room(name) do
# And we use `start_child/2` to start a new Chat.Server process
Supervisor.start_child(:chat_supervisor, [name])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
# We also changed the `strategty` to `simple_one_for_one`.
# With this strategy, we define just a "template" for a child,
# no process is started during the Supervisor initialization,
# just when we call `start_child/2`
supervise(children, strategy: :simple_one_for_one)
end
end
И давайте заставим наш Chat.Server
принимать имена в start_link
функции:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# Just accept a `name` parameter here for now
def start_link(name) do
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
#...
end
Изменения — вот в этом коммите
А вот и проблема! У нас же может быть несколько процессов Chat.Server
, и они не могут все быть с именем :chat_room
. Беда...
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_room "foo"
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room "bar"
{:error, {:already_started, #PID<0.109.0>}}
Честно говоря, VM
очень красноречива. Мы пытаемся создать второй процесс, но процесс с таким именем уже существует, о чём нам так ехидно напоминает среда. Надо придумать какой то другой способ, но какой?..
К сожалению, тип аргумента name
определён достаточно чётко. Мы не можем использовать что-то типа {:chat_room, "room name"}
. Давайте обратимся к документации:
Поддерживаемые значения:
atom
— в этом случаеGenServer
регистрируется локально с данным именемatom
с помощьюProcess.register/2
.
{:global, term}
— в этом случаеGenServer
регистрируется глобально с данным именемterm
с помощью функций в модуле:global
.
{:via, module, term}
— в этом случаеGenServer
регистрируется с помощью определённого вmodule
механизма и имени `term.
The supported values are:
anatom
— theGenServer
is registered locally with the given name usingProcess.register/2
.
{:global, term}
— theGenServer
is registered globally with the given term using the functions in the:global
module.
{:via, module, term}
— theGenServer
is registered with the given mechanism and name.
Первую опцию — atom
, мы уже использовали, и точно знаем, что в нашем хитром случае она не подходит.
Вторая опция используется для регистрации процесса глобально в кластере нод. Она использует локальную ETS
таблицу. Кроме того она будет требовать постоянной синхронизации внутри нод в кластере, в связи с чем работа программы будет замедляться. Так что используйте её только когда это действительно нужно.
Третья, и последняя, опция использует в качестве параметра кортеж с :via
, и это как раз то, что нам надо для решения нашей проблемы! Вот что говорит по этому поводу документация:
Опция:via
принимает в качестве параметра модуль, который имеет следующий интерфейс:register_name/2
,unregister_name/1
,whereis_name/1
иsend/2
.
The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.
Вообще ничего не понятно? Мне тоже! Так что посмотрим этот метод в деле.
Используем кортеж :via
Итак, кортеж :via
— это способ сказать Elixir
, что мы собираемся использовать отдельный модуль для регистрации наших процессов. Этот модуль должен делать следующие вещи:
- Регистрировать имя, которым может быть любой
term
, с помощью функцииregister_name/2
; - Удалять имена из регистра, с помощью функции
unregister_name/1
; - Находить
pid
по имени, с помощьюwhereis_name/1
; - Посылать сообщения определённому процессу с помощью
send/2
.
Чтобы это всё работало, вышеперечисленные функции должны передавать ответ в определённом формате, определённом в OTP
— так же как и handle_call/3
и handle_cast/2
подчиняются определённым правилам.
Попробуем определить модуль, которые всё это знает:
# ./lib/chat/registry.ex
defmodule Chat.Registry do
use GenServer
# API
def start_link do
# We register our registry (yeah, I know) with a simple name,
# just so we can reference it in the other functions.
GenServer.start_link(__MODULE__, nil, name: :registry)
end
def whereis_name(room_name) do
GenServer.call(:registry, {:whereis_name, room_name})
end
def register_name(room_name, pid) do
GenServer.call(:registry, {:register_name, room_name, pid})
end
def unregister_name(room_name) do
GenServer.cast(:registry, {:unregister_name, room_name})
end
def send(room_name, message) do
# If we try to send a message to a process
# that is not registered, we return a tuple in the format
# {:badarg, {process_name, error_message}}.
# Otherwise, we just forward the message to the pid of this
# room.
case whereis_name(room_name) do
:undefined ->
{:badarg, {room_name, message}}
pid ->
Kernel.send(pid, message)
pid
end
end
# SERVER
def init(_) do
# We will use a simple Map to store our processes in
# the format %{"room name" => pid}
{:ok, Map.new}
end
def handle_call({:whereis_name, room_name}, _from, state) do
{:reply, Map.get(state, room_name, :undefined), state}
end
def handle_call({:register_name, room_name, pid}, _from, state) do
# Registering a name is just a matter of putting it in our Map.
# Our response tuple include a `:no` or `:yes` indicating if
# the process was included or if it was already present.
case Map.get(state, room_name) do
nil ->
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_cast({:unregister_name, room_name}, state) do
# And unregistering is as simple as deleting an entry
# from our Map
{:noreply, Map.delete(state, room_name)}
end
end
Опять же: в наших руках выбрать, каким образом наш реестр будет внутри работать. Здесь мы используем простую Map
для связи имени и pid
. Этот код абсолютно прост и прямолинеен, особенно если вы хорошо знаете как работает GenServer
. Незнакомыми могут казаться только возвращаемые функциями значения.
Пришло время попробовать наш реестр в iex
сессии:
$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link("room1")
{:ok, #PID<0.107.0>}
iex> Chat.Registry.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Registry.whereis_name("room1")
:undefined
iex> Chat.Registry.register_name("room1", pid)
:yes
iex> Chat.Registry.register_name("room1", pid)
:no
iex> Chat.Registry.whereis_name("room1")
#PID<0.107.0>
iex> Chat.Registry.unregister_name("room1")
:ok
iex> Chat.Registry.whereis_name("room1")
:undefined
5 секунд — полёт отличный! Реестр работает как надо: и регистрирует, и удаляет регистрацию. Попробуем его использовать в наших чатах.
Наша проблема была в том, что у нас были нескольких запущенных серверов Chat.Server
, инициализированных через Supervisor
. Чтобы отправить сообщение в определённую комнату, мы хотели бы вызывать Chat.Server.add_message(“room1”, “my message”)
, поэтому мы должны были бы регистрировать имена серверов как {:chat_room, “room1”}
и {:chat_room, “room2”}
. Вот как это делается через кортеж :via
:
# ./lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link(name) do
# Instead of passing an atom to the `name` option, we send
# a tuple. Here we extract this tuple to a private method
# called `via_tuple` that can be reused in every function
GenServer.start_link(__MODULE__, [], name: via_tuple(name))
end
def add_message(room_name, message) do
# And the `GenServer` callbacks will accept this tuple the
# same way it accepts a pid or an atom.
GenServer.cast(via_tuple(room_name), {:add_message, message})
end
def get_messages(room_name) do
GenServer.call(via_tuple(room_name), :get_messages)
end
defp via_tuple(room_name) do
# And the tuple always follow the same format:
# {:via, module_name, term}
{:via, Chat.Registry, {:chat_room, room_name}}
end
# SERVER (no changes required here)
# ...
end
Изменения — вот в этом коммите
Вот что здесь происходит: каждый раз, когда мы посылаем сообщение в Chat.Server
, передавая имя комнаты, он сам будет находить pid
нужного процесса с помощью того модуля, который мы ему передали в кортеже :via
(в данном случае это Chat.Registry
).
Это решает нашу проблему: теперь мы можем использовать любое количество Chat.Server
процессов (ну, пока не закончится фантазия на имена), и нам никогда не надо знать их pid
. Совсем.
Впрочем, есть ещё одна проблема в таком решении. Догадались?
Именно! Наш реестр не знает о процессах, которые упали, и должны быть перезапущены через Supervisor
. А это значит, что когда такое произойдёт, реестр не даст пересоздать запись с таким же именем, и будет хранить pid
мёртвого процесса.
По идее, решение этой проблемы — не слишком сложное. Мы заставим наш реестр осуществлять мониторинг всех процессов, pid
которых он хранит. Как только такой "наблюдаемый" процесс упадёт — мы его просто удалим из нашего реестра.
# in lib/chat/registry.ex
defmodule Chat.Registry do
# ...
def handle_call({:register_name, room_name, pid}, _from, state) do
case Map.get(state, room_name) do
nil ->
# When a new process is registered, we start monitoring it.
Process.monitor(pid)
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
# When a monitored process dies, we will receive a
# `:DOWN` message that we can use to remove the
# dead pid from our registry.
{:noreply, remove_pid(state, pid)}
end
def remove_pid(state, pid_to_remove) do
# And here we just filter out the dead pid
remove = fn {_key, pid} -> pid != pid_to_remove end
Enum.filter(state, remove) |> Enum.into(%{})
end
end
Изменения — вот в этом коммите
Убедимся в том, что всё работает:
$ iex -S mix
iex> Chat.Registry.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.111.0>}
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]
iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]
Ну, теперь уже совершенно не важно, сколько раз Supervisor
перезапустит процесс Chat.Server
: как только мы посылаем сообщение в комнату — оно будет доставлено по верному pid
.
Упрощаем с помощью gproc
В принципе с нашим чатом мы на этом и закончим, но хочется рассказать ещё об одной фиче, которая упростит для нас регистрацию с помощью кортежа :via
. Это — gproc
, — библиотека Erlang
.
И научим наш Chat.Server
использовать gproc
вместо нашего Chat.Registry
, а потом мы вообще избавимся от Chat.Registry
.
Начнём пожалуй с зависимостей. Для этого добавим gproc
в mix.exs
:
# ./mix.exs
defmodule Chat.Mixfile do
# ...
def application do
[applications: [:logger, :gproc]]
end
defp deps do
[{:gproc, "0.3.1"}]
end
end
Затем подтянем зависимости с помощью:
$ mix deps.get
Теперь мы можем поменять нашу регистрацию с помощью кортежа :via
— пусть использует gproc
, а не Chat.Registry
:
# ./lib/chat/server.ex
defmodule Chat.Server do
# ...
# The only thing we need to change is the `via_tuple/1` function,
# to make it use `gproc` instead of `Chat.Registry`
defp via_tuple(room_name) do
{:via, :gproc, {:n, :l, {:chat_room, room_name}}}
end
# ...
end
gproc
использует ключи-кортежи, состоящие из трёх значений: {type, scope, key}
.
В нашем случае мы используем:
:n
— это значитимя
, то есть не может быть больше одного процесса, зарегистрированного под таким ключом;:l
— это значитlocal
, то есть процесс регистрируется только на нашей ноде;{:chat_room, room_name}
— это сам ключ в виде кортежа.
Дополнительную информацию по возможным настройкам gproc
искать тут.
После таких изменений вообще выкинем наш Chat.Registry
, и проверим что всё продолжает работать в iex
сессии:
$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.190.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.192.0>}
iex> Chat.Supervisor.start_room("room2")
{:ok, #PID<0.194.0>}
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.add_message("room2", "second message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]
iex> Chat.Server.get_messages("room2")
["second message"]
iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]
Изменения — вот в этом коммите
Куда плыть дальше, капитан?
Мы с вами разобрались в куче сложных вопросов. Основные выводы:
- Будьте осторожны, работая с
pid
напрямую: они поменяются как только процесс перезапустится. - Если вам нужно получить ссылку только на один процесс ( как у нас было с единственной комнатой в чате), регистрация процесса с именем в виде атома — достаточная мера;
- Если вам нужно создавать процессы динамически (множество чат-комнат), вы можете использовать кортеж
:via
для предоставления своего собственного реестра; - Подобные реестры уже существуют ( к примеру
gproc
), и если вы их используете — не придётся строить свой велосипед;
Конечно, это ещё не всё. Если вам нужна глобальная регистрация на всех нодах в кластере, другие средства тоже могут быть хороши. У Erlang
есть глобальные модули для глобальных регистраций, pg2
для групп процессов, да и тот же gprc
может вам помочь.
Если эта статья вас заинтересовала — почитайте Sasa Juric. Elixir in Action
.
А вот и репка с сыром)
Комментарии (10)
UA3MQJ
29.07.2016 16:59И за эту статью тоже спасибо! Буквально вчера по всем этим проблемам проходил, запускал sup с именованием процесса.
Почему-то был уверен, что в Erlang не нужно знать Pid для вызова call, а сейчас пересмотрел код и понял, что был не прав:
% erlang -define(SERVER, ?MODULE). ... gen_server:call(?SERVER, {select, Key}). % elixir GenServer.call(__MODULE__, {:select, key})
Вообще тяжело, конечно, после Erlang на Elixir. Дважды атом указывал без двоеточия. И после Erlang называть переменные с маленькой буквы — тоже не привычно. А вот с параметрами местами пока вообще непонятки, типа [type: :set]Virviil
30.07.2016 08:12Все непонятные параметры — это синтаксический сахар к понятным параметрам.
Все очень хорошо описано на сайте elixir в разделе basic types
Strain
Я должен ещё добавить что динамические атомы в Erlang / Elixir это очень плохо — ведёт к memory leak, не используйте динамически созданные атомы для регистрации процессов, используйте gproc. Да, и лучше вообще не создавайте атомы динамически нигде и никогда если на 100% не уверены в том что делаете.
thousandsofthem
Да, все так. Но в статье они не используются или я что-то пропустил?
Имя вида
:chat_room
оно ведь просто в коде написаноStrain
В статье всё ok, прост это частые грабли начинающих OTP разработчиков)