С целью увеличить счетчик статей про "редкий" язык Erlang еще на одну, расскажем как на коленке собрать Erlang кластер и запустить на нем распараллеленное вычисление.
Для понимания происходящего, читателю возможно понадобится знание основ Erlang (без OTP). Которые, кстати, можно получить не отходя от кассы не уходя из уютного Хабра, прямо тут:
- Erlang для самых маленьких. Глава 1. Типы данных, переменные, списки и кортежи
- Глава 2: Модули и функции
- Глава 3: Базовый синтаксис функций
- Глава 4: Система типов
Но, не будем умничать раньше времени и сначала сделаем...
Лирическое отступление
Как мы сейчас понимаем, по мере развития IT-индустрии совершенно разные языки программирования находят свое применение. Не стал исключением и язык Erlang, изначально разрабатывавшийся как язык для отказоустойчивых систем, но завоевавший свою нишу в нашем мире распределенных многопроцессорных систем благодаря:
- удобной модели многопоточного программирования (concurrency);
- удачному расширению этой модели на распределенные системы, состоящие из узлов на разных устройствах (distributed computing);
- эффективной реализации этой модели, позволяющей малыми усилиями разрабатывать highly available системы;
- со встроенными механизмами “горячей замены кода” (hot swapping);
- с большим количеством средств разработки.
И при этом оставаясь:
- функциональным языком программирования общего назначения,
- для разработки отказоустойчивых систем (fault-tolerant),
- со своей методологией, позволяющей эффективно разрабатывать сложные системы.
Перечислим компании и проекты, использующих Erlang в production:
- WhatsApp — мессенджер,
- RabbitMQ — брокер сообщений,
- Bet365 — букмекерская онлайн контора, у которой 20+ млн клиентов,
- Riak — распределенная NoSQL база,
- Couchbase — распределенная NoSQL база,
- Ejabberd — XMPP-сервер,
- Goldman Sachs — крупный американский банк.
Не будем скрывать от читателя, что, и на наш взгляд, у языка есть слабые места, о которые хорошо описаны у других авторов, например: Erlang в Wargaming
Ну а теперь давайте по-программируем, и напишем функцию, которая делает...
Параллельное вычисление значений функции (с ограничением количества одновременно работающих процессов и таймаутом на вычисление)
Функция map
Сигнатуру этой функции сделаем похожей на lists:map
:
map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
, где:
Fn
— функция, значения которой нужно вычислить,Items
— список значений аргументов для этой функции,WorkersN
— максимальное количество одноврменно работающих процессов,Timeout
— таймаут для вычисления значения функции, который мы готовы ждать,FnResult
— вычисленное значение фунцииFn
,Reason
— причина завершения (exit reason) воркер-процесса.
Реализация функции:
-module(dmap_pmap).
...
map(Fn, Items, WorkersN, Timeout) ->
Total = length(Items),
Context = #{fn => Fn,
items => Items,
results => #{},
counter => 1,
total => Total,
workers => 0,
workers_max => min(WorkersN, Total),
pids => #{},
timeout => Timeout},
Self = self(),
spawn(fun() ->
Self ! map_loop(Context)
end),
Result = receive
Any -> Any
end,
case get_error(Result) of
undefined ->
{ok, Result};
{'EXIT', Reason} ->
throw({'EXIT', Reason});
{error, timeout} ->
{error, timeout}
end.
Вот основные моменты в коде:
- инициализируем значения контекста
Context
, который будем прокидывать дальше в цикле (map_loop
); - создаем процесс-супервайзер, который будет запускать процессы-воркеры и ждать от них результата;
- запускаем функцию-цикл
map_loop
для выполнение этого супервайзера; - ждем результат от супервайзера и возвращаем результат функции;
- приватная функция
get_error
— возращает ошибку в вычислениях илиundefined
, если ошибок не было.
Функция map_loop
Сигнатура этой функции: map_loop(Context) -> [FnResult]
Реализация:
map_loop(#{counter := Counter,
total := Total,
workers := Workers,
workers_max := WorkersMax,
fn := Fn,
items := Items,
pids := PIDs} = Context) when Workers < WorkersMax, Counter =< Total ->
Self = self(),
Index = Counter,
WorkerIndex = Workers + 1,
PID = spawn(fun() ->
WorkerPID = self(),
io:fwrite("{Index, PID, {W, WMax}}: ~p~n",
[{Index, WorkerPID, {Workers + 1, WorkersMax}}]),
Item = lists:nth(Index, Items),
Self ! {Index, WorkerPID, catch Fn(Item, WorkerIndex)}
end),
Context2 = Context#{counter => Counter + 1,
workers => Workers + 1,
pids => PIDs#{PID => Index}},
map_loop(Context2);
map_loop(#{workers := Workers, timeout := Timeout, pids := _PIDs} = Context)
when Workers > 0 ->
receive
{Index, PID, {'EXIT', _Reason} = Result} when is_integer(Index) ->%% error case
io:fwrite("got error: ~p~n", [{Index, PID, Result}]),
Context2 = set_worker_result(PID, {Index, Result}, Context),
Context3 = kill_workers(Context2, error),
create_result(Context3);
{Index, PID, Result} when is_integer(Index) -> %% ok case
io:fwrite("got result: ~p~n", [{Index, PID, Result}]),
Context2 = set_worker_result(PID, {Index, Result}, Context),
map_loop(Context2)
after Timeout -> %% timeout case
io:fwrite("timeout: ~p~n", [#{context => Context}]),
Context3 = kill_workers(Context, {error, timeout}),
create_result(Context3)
end;
map_loop(#{workers := Workers, pids := PIDs} = Context)
when Workers == 0, PIDs == #{} ->
create_result(Context).
Пройдемся по реализации:
- первая реализация функции
map_loop
последовательно запускает не большеWorkersMax
воркеров. Воркер запускается черезspawn
и внутри анонимной функции описана его логика: вычислить значение функцииFn
и отправить результат супервайзеру; - вторая реализация функции (которая, как мы напомним, вызывается если не выполнились условия для вызова первой реализации): ждет результат от воркеров. При получении результата она принимает решение, что делать дальше: или продолжать цикл или завершить работу;
- в третью реализацию функции мы приходим, когда больше нет активных воркеров (и это означает, что все вычисления выполнены) и она просто возвращает результат вычислений.
Пробежимся по приватным функциям, которые мы тут использовали:
set_worker_result
— сохраняет результат вычисления вContext
супервайзера;kill_workers
— убивает все процессы с воркерами (для случая прерывания работы);create_result
— выдает результат вычислений, полученных от воркеров.
Полный листинг функции можно посмотреть на GitHub: тут
Тестирование
Теперь чуть потестируем нашу функцию через Erlang REPL.
1) Запустим вычисление для 2-х воркеров, так чтобы результат от второго воркера пришел раньше, чем от первого:
>catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), 1; (2, _) -> timer:sleep(1000), 2 end, [1, 2], 2, 5000).
В последней строчке — результат вычислений.
{Index, PID, {W, WMax}}: {1,<0.1010.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.1011.0>,{2,2}}
got result: {2,<0.1011.0>,2}
got result: {1,<0.1010.0>,1}
{ok,[1,2]}
2) Запустим вычисление для 2-х воркеров, так, чтобы в первом воркере случился креш:
>catch dmap_pmap:map(fun(1, _) -> timer:sleep(100), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 5000).
{Index, PID, {W, WMax}}: {1,<0.2149.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.2150.0>,{2,2}}
got error: {1,<0.2149.0>,{'EXIT',terrible_error}}
kill: <0.2150.0>
{'EXIT',terrible_error}
3) Запустим вычисление для 2-х воркеров, так, чтобы время вычисления функции превысило разрешенный таймаут:
> catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 1000).
{Index, PID, {W, WMax}}: {1,<0.3184.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.3185.0>,{2,2}}
got result: {2,<0.3185.0>,2}
timeout: #{context =>
#{counter => 3,fn => #Fun<erl_eval.12.99386804>,
items => [1,2],
pids => #{<0.3184.0> => 1},
results => #{2 => 2},
timeout => 1000,total => 2,workers => 1,workers_max => 2}}
kill: <0.3184.0>
{error,timeout}
Ну и наконец...
Вычисления на кластере
Результаты тестирования выглядят разумно, но при чем же тут кластер, может спросить внимательный читатель.
На самом деле, оказывается, что у нас уже есть почти все, что нужно, чтобы запустить вычисления на кластере, где под кластером мы понимаем набор связанных Erlang нод.
В отдельном модуле dmap_dmap
заведем еще одну функцию, со следующей сигнатурой:
map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
, где:
{M, F}
— модуль и имя функции, к которую нужно применить аргументы (аляF:M(Item)
),Items
— список значений аргументов для этой функции,WorkersNodes
— список имен нод, на которых нужно запустить вычисление,Timeout
— таймаут для вычисления значения функции, который мы готовы ждать.
Реализация:
-module(dmap_dmap).
...
map({M, F}, Items, WorkersNodes, Timeout) ->
Fn = fun(Item, WorkerIndex) ->
Node = lists:nth(WorkerIndex, WorkersNodes),
rpc:call(Node, M, F, [Item])
end,
dmap_pmap:map(Fn, Items, length(WorkersNodes), Timeout).
Логика этой функции очень проста: мы используем функцию dmap_pmap:map
из прошлого раздела, в которую подставляем анонимную функцию, которая, в свою очередь, просто выполняет вычисление на нужной ноде.
Для теста в отдельном модуле заведем функцию, которая возвращает имя своей ноды:
-module(dmap_test).
test(X) ->
{ok, {node(), X}}.
Тестирование
Для тестирования нам нужно в двух терминалах запустить по ноде, например, вот так (из рабочего каталога проекта):
make run NODE_NAME=n1@127.0.0.1
make run NODE_NAME=n2@127.0.0.1
Запустим вычисление на первой ноде:
(n1@127.0.0.1)1> dmap_dmap:map({dmap_test, test}, [1, 2], ['n1@127.0.0.1', 'n2@127.0.0.1'], 5000).
И получим результат:
{Index, PID, {W, WMax}}: {1,<0.1400.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.1401.0>,{2,2}}
got result: {1,<0.1400.0>,{ok,{'n1@127.0.0.1',1}}}
got result: {2,<0.1401.0>,{ok,{'n2@127.0.0.1',2}}}
{ok,[{ok,{'n1@127.0.0.1',1}},{ok,{'n2@127.0.0.1',2}}]}
Как можно заменить, результаты прилетели от двух нод, как мы и заказывали.
Вместо заключения
Наш простой пример показывает, что в своей сфере применения Erlang позволяет относительно просто решать полезные задачи (которые не так просто решить с помощью других языков программирования).
Из-за короткого формата статьи могут быть вопросы относительно кода и сборки библиотеки, которые остались за кадром.
Какие-то детали можно посмотреть в GitHub: тут.
Остальные детали мы обещаем осветить в следующих статьях.
Комментарии (14)
samsonoveu
29.04.2018 10:47Было бы полезно, если бы кто-нибудь написал похожую статью, но используя Elixir.
eoffsock
29.04.2018 12:04Там все значительно проще. Мне кажется, даже статья не понадобится. Смотрите.
Это parallel map на локальной ноде
defmodule Parallel do def pmap(collection, func) do collection |> Enum.map(&(Task.async(fn -> func.(&1) end))) |> Enum.map(&Task.await/1) end end
Чтобы пускать таски на удаленных нодах, нам нужен TaskSupervisor вместо обычного Task
# On the remote node Task.Supervisor.start_link(name: MyApp.DistSupervisor) # On the client Task.Supervisor.async({MyApp.DistSupervisor, :remote@local}, MyMod, :my_fun, [arg1, arg2, arg3])
Естественно, супервизор надо будет в supervision tree добавить или пускать через start_link, и модуль на нодах написать, который будет дергаться через TaskSupervisor. Остальное сделает эликсир сам.
Ссылки, куда посмотреть можно:
hexdocs.pm/elixir/Task.Supervisor.html
elixir-recipes.github.io/concurrency/parallel-map
elixir-lang.org/getting-started/mix-otp/distributed-tasks-and-configuration.htmlady1981 Автор
01.05.2018 17:12Ничего не имею против вашего решения, но действительно ли это решение эквивалентно
нашему?
1) Есть ли ограничение по количеству одновременно запущенных процессов-воркеров?
2) Есть ли остановка в случае креша в любом из воркеров?
3) Есть ли остановка в случае таймаута при ожидании ответа от воркера?
4) Можете привести примеры тестов, которые у нас приведены для `dmap_pmap:map`?
Elixir компилирует в Erlang VM, поэтому на вскидку не совсем понятно, зачем писать отдельное решение для dmap на Elixir, если можно просто использовать готовое решение на Erlang, которое просто синтегрировать со своим Elixir кодом :).eoffsock
01.05.2018 20:36Это не эквивалентное решение. Это решение тех же задач. Да и вообще, я не собираюсь меряться кодом. Человек спросил, как на Elixir сделать аналогичную штуку, я ответил. В общих чертах, но по документации легко будет разобраться в тонкостях.
1. Сколько запустите, столько и будет. Ноды назначаются явно. Разумеется, в случае реального боевого проекта это можно легко автоматизировать.
Однако если я захочу ограничить количество параллельных процессов, я не буду писать велосипед, а использую poolboy.
Да и вообще, poolboy прекрасно решает большинство проблем по распределению задач между процессами.
2. Креш Task уронит вызывающую функцию. Строго говоря, несмотря на парадигму let it crash, я не считаю это хорошей идеей. Но механизм есть.
3. await(task, timeout \\ 5000)
Но опять же, я не считаю остановку в случае вылета по таймауту хорошей идеей. У меня на проекте есть такой вариант:
ret = case Task.yield(task, 60000) do {:ok, result} -> result nil -> Task.shutdown(task) :timeout end
По крайней мере я хочу получить результаты, которые есть. С проблемами разберемся позже. Но у меня такой род приложений, что частичная потеря данных простительна, а вот лишние задержки — не очень хорошо.
4. Нет, тестов нет. При случае сделаю, пока задачи запускать таски на удаленных нодах не было. pmap используем, но на локальных нодах.
У нас проекты на Elixir, и я не вижу смысла писать на Erlang и потом интегрировать код с Elixir, если могу сразу все сделать на Elixir. Даже учитывая то, что мне все равно часто приходится возиться с кодом на Erlang.
Evengining
Отличная стать, хорошо что еще есть отличные авторские работы по отличным языкам!
mspain
На мой взгляд работа станет неплохой если результат хотя бы минимально сравнят с альтернативными решениями. А то получился сферический эрланг в вакууме. У меня например есть опасение, что кластер на эрланге будет работать медленнее, чем однопоточный вариант на си.
mspain
А программа-минимум, бенчмарки относительно самого себя. Сейчас доступно море технологий дающих линейное масштабирование, является ли эрланг одной из них? С 1000 воркеров скорость в 1000 раз больше? Если поставить 50 нод, вырастет ли в 50 раз производительность?
ai212983
в статье предполагается, что читатель в курсе, что такое кластер, распределенное вычисление, и зачем они нужны. если у вас есть задача, которая быстрее решается в один поток на си — делайте в один поток на си.
ady1981 Автор
Понимаю поинт, однако:
1) Если в Erlang не делать вычислений, о которых известно, что они работают в Erlang медленно, то скорость работы Erlang приложения будет не хуже, чем аналогичного приложения на Java.
2) В Erlang есть проработанный механизм, как подключать код на C. :)
3) Мы не бьемся за проценты от скорости работы приложения. Для нас важнее скорость разработки, простота поддержки и мультиплатформенность.
mspain
Судя по всему поинт таки не поняли. Например в java есть stream api, там есть примерно тот же самый map и очень похожий parallel (который сам делает всё волшебство). Загвоздка в том, что на многих задачах это приводит к тормозам, даже когда в теории вроде как должно параллелиться. Так и тут — без бенчмарков неубедительно.
JC_IIB
А это… когда оно там появилось, сколько лет назад?
ady1981 Автор
Я отвечал на вопрос: Erlang cluster VS однопоточный вариант на С :).
Что касается тестирования Erlang кластера как кластера, то, да, тестирование тут очень важно. Мы тут не привнесли ничего нового: кластер, который мы поднимаем — стандартный Erlang кластер, для которого бенчмарки, по идее, уже должны быть сделаны другими исследователями до нас :). Вопрос действительно важный, я скину ссылки на эти исследования, если найду.
Спасибо за предложение сравнить Erlang кластер с другими решениями, возможно в будущем мы сделаем такие бенчмарки и опубликуем их (это не тривиальная задача).
ady1981 Автор
Спасибо.
Erlang — язык со своими плюсами и минусами.
Некоторые задачи решаются в Erlang очень красиво, а следовательно — быстро и надежно.