С целью увеличить счетчик статей про "редкий" язык Erlang еще на одну, расскажем как на коленке собрать Erlang кластер и запустить на нем распараллеленное вычисление.


Для понимания происходящего, читателю возможно понадобится знание основ Erlang (без OTP). Которые, кстати, можно получить не отходя от кассы не уходя из уютного Хабра, прямо тут:



Но, не будем умничать раньше времени и сначала сделаем...


Лирическое отступление


Как мы сейчас понимаем, по мере развития IT-индустрии совершенно разные языки программирования находят свое применение. Не стал исключением и язык Erlang, изначально разрабатывавшийся как язык для отказоустойчивых систем, но завоевавший свою нишу в нашем мире распределенных многопроцессорных систем благодаря:


  • удобной модели многопоточного программирования (concurrency);
  • удачному расширению этой модели на распределенные системы, состоящие из узлов на разных устройствах (distributed computing);
  • эффективной реализации этой модели, позволяющей малыми усилиями разрабатывать highly available системы;
  • со встроенными механизмами “горячей замены кода” (hot swapping);
  • с большим количеством средств разработки.

И при этом оставаясь:


  • функциональным языком программирования общего назначения,
  • для разработки отказоустойчивых систем (fault-tolerant),
  • со своей методологией, позволяющей эффективно разрабатывать сложные системы.

Перечислим компании и проекты, использующих Erlang в production:


  • WhatsApp — мессенджер,
  • RabbitMQ — брокер сообщений,
  • Bet365 — букмекерская онлайн контора, у которой 20+ млн клиентов,
  • Riak — распределенная NoSQL база,
  • Couchbase — распределенная NoSQL база,
  • Ejabberd — XMPP-сервер,
  • Goldman Sachs — крупный американский банк.

Не будем скрывать от читателя, что, и на наш взгляд, у языка есть слабые места, о которые хорошо описаны у других авторов, например: Erlang в Wargaming


Ну а теперь давайте по-программируем, и напишем функцию, которая делает...


Параллельное вычисление значений функции (с ограничением количества одновременно работающих процессов и таймаутом на вычисление)


Функция map

Сигнатуру этой функции сделаем похожей на lists:map:


map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}, где:


  • Fn — функция, значения которой нужно вычислить,
  • Items — список значений аргументов для этой функции,
  • WorkersN — максимальное количество одноврменно работающих процессов,
  • Timeout — таймаут для вычисления значения функции, который мы готовы ждать,
  • FnResult — вычисленное значение фунции Fn,
  • Reason — причина завершения (exit reason) воркер-процесса.

Реализация функции:


-module(dmap_pmap).
...
map(Fn, Items, WorkersN, Timeout) ->
  Total = length(Items),
  Context = #{fn => Fn, 
              items => Items, 
              results => #{}, 
              counter => 1, 
              total => Total, 
              workers => 0, 
              workers_max => min(WorkersN, Total), 
              pids => #{}, 
              timeout => Timeout},
  Self = self(),
  spawn(fun() ->
    Self ! map_loop(Context)
  end),
  Result = receive
             Any -> Any
           end,
  case get_error(Result) of
    undefined ->
      {ok, Result};
    {'EXIT', Reason} ->
      throw({'EXIT', Reason});
    {error, timeout} ->
      {error, timeout}
  end.

Вот основные моменты в коде:


  • инициализируем значения контекста Context, который будем прокидывать дальше в цикле (map_loop);
  • создаем процесс-супервайзер, который будет запускать процессы-воркеры и ждать от них результата;
  • запускаем функцию-цикл map_loop для выполнение этого супервайзера;
  • ждем результат от супервайзера и возвращаем результат функции;
  • приватная функция get_error — возращает ошибку в вычислениях или undefined, если ошибок не было.

Функция map_loop

Сигнатура этой функции: map_loop(Context) -> [FnResult]


Реализация:


map_loop(#{counter := Counter, 
           total := Total, 
           workers := Workers, 
           workers_max := WorkersMax, 
           fn := Fn, 
           items := Items, 
           pids := PIDs} = Context) when Workers < WorkersMax, Counter =< Total ->
  Self = self(),
  Index = Counter,
  WorkerIndex = Workers + 1,
  PID = spawn(fun() ->
      WorkerPID = self(),
      io:fwrite("{Index, PID, {W, WMax}}: ~p~n", 
                [{Index, WorkerPID, {Workers + 1, WorkersMax}}]),
      Item = lists:nth(Index, Items),
      Self ! {Index, WorkerPID, catch Fn(Item, WorkerIndex)}
    end),
  Context2 = Context#{counter => Counter + 1, 
                      workers => Workers + 1, 
                      pids => PIDs#{PID => Index}},
  map_loop(Context2);
map_loop(#{workers := Workers, timeout := Timeout, pids := _PIDs} = Context) 
  when Workers > 0 ->
  receive
    {Index, PID, {'EXIT', _Reason} = Result} when is_integer(Index) ->%% error case
      io:fwrite("got error: ~p~n", [{Index, PID, Result}]),
      Context2 = set_worker_result(PID, {Index, Result}, Context),
      Context3 = kill_workers(Context2, error),
      create_result(Context3);

    {Index, PID, Result} when is_integer(Index) -> %% ok case
      io:fwrite("got result: ~p~n", [{Index, PID, Result}]),
      Context2 = set_worker_result(PID, {Index, Result}, Context),
      map_loop(Context2)

  after Timeout -> %% timeout case
      io:fwrite("timeout: ~p~n", [#{context => Context}]),
      Context3 = kill_workers(Context, {error, timeout}),
      create_result(Context3)
  end;
map_loop(#{workers := Workers, pids := PIDs} = Context) 
  when Workers == 0, PIDs == #{} ->
  create_result(Context).

Пройдемся по реализации:


  • первая реализация функции map_loop последовательно запускает не больше WorkersMax воркеров. Воркер запускается через spawn и внутри анонимной функции описана его логика: вычислить значение функции Fn и отправить результат супервайзеру;
  • вторая реализация функции (которая, как мы напомним, вызывается если не выполнились условия для вызова первой реализации): ждет результат от воркеров. При получении результата она принимает решение, что делать дальше: или продолжать цикл или завершить работу;
  • в третью реализацию функции мы приходим, когда больше нет активных воркеров (и это означает, что все вычисления выполнены) и она просто возвращает результат вычислений.

Пробежимся по приватным функциям, которые мы тут использовали:


  • set_worker_result — сохраняет результат вычисления в Context супервайзера;
  • kill_workers — убивает все процессы с воркерами (для случая прерывания работы);
  • create_result — выдает результат вычислений, полученных от воркеров.

Полный листинг функции можно посмотреть на GitHub: тут


Тестирование

Теперь чуть потестируем нашу функцию через Erlang REPL.


1) Запустим вычисление для 2-х воркеров, так чтобы результат от второго воркера пришел раньше, чем от первого:


>catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), 1; (2, _) -> timer:sleep(1000), 2 end, [1, 2], 2, 5000).


В последней строчке — результат вычислений.


{Index, PID, {W, WMax}}: {1,<0.1010.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.1011.0>,{2,2}}
got result: {2,<0.1011.0>,2}
got result: {1,<0.1010.0>,1}
{ok,[1,2]}

2) Запустим вычисление для 2-х воркеров, так, чтобы в первом воркере случился креш:


>catch dmap_pmap:map(fun(1, _) -> timer:sleep(100), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 5000).


{Index, PID, {W, WMax}}: {1,<0.2149.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.2150.0>,{2,2}}
got error: {1,<0.2149.0>,{'EXIT',terrible_error}}
kill: <0.2150.0>
{'EXIT',terrible_error}

3) Запустим вычисление для 2-х воркеров, так, чтобы время вычисления функции превысило разрешенный таймаут:


> catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 1000).


{Index, PID, {W, WMax}}: {1,<0.3184.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.3185.0>,{2,2}}
got result: {2,<0.3185.0>,2}
timeout: #{context =>
               #{counter => 3,fn => #Fun<erl_eval.12.99386804>,
                 items => [1,2],
                 pids => #{<0.3184.0> => 1},
                 results => #{2 => 2},
                 timeout => 1000,total => 2,workers => 1,workers_max => 2}}
kill: <0.3184.0>
{error,timeout}

Ну и наконец...


Вычисления на кластере


Результаты тестирования выглядят разумно, но при чем же тут кластер, может спросить внимательный читатель.
На самом деле, оказывается, что у нас уже есть почти все, что нужно, чтобы запустить вычисления на кластере, где под кластером мы понимаем набор связанных Erlang нод.


В отдельном модуле dmap_dmap заведем еще одну функцию, со следующей сигнатурой:


map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}, где:


  • {M, F} — модуль и имя функции, к которую нужно применить аргументы (аля F:M(Item)),
  • Items — список значений аргументов для этой функции,
  • WorkersNodes — список имен нод, на которых нужно запустить вычисление,
  • Timeout — таймаут для вычисления значения функции, который мы готовы ждать.

Реализация:


-module(dmap_dmap).
...
map({M, F}, Items, WorkersNodes, Timeout) ->
  Fn = fun(Item, WorkerIndex) ->
      Node = lists:nth(WorkerIndex, WorkersNodes),
      rpc:call(Node, M, F, [Item])
    end,
  dmap_pmap:map(Fn, Items, length(WorkersNodes), Timeout).

Логика этой функции очень проста: мы используем функцию dmap_pmap:map из прошлого раздела, в которую подставляем анонимную функцию, которая, в свою очередь, просто выполняет вычисление на нужной ноде.


Для теста в отдельном модуле заведем функцию, которая возвращает имя своей ноды:


-module(dmap_test).

test(X) ->
  {ok, {node(), X}}.

Тестирование


Для тестирования нам нужно в двух терминалах запустить по ноде, например, вот так (из рабочего каталога проекта):


make run NODE_NAME=n1@127.0.0.1


make run NODE_NAME=n2@127.0.0.1


Запустим вычисление на первой ноде:


(n1@127.0.0.1)1> dmap_dmap:map({dmap_test, test}, [1, 2], ['n1@127.0.0.1', 'n2@127.0.0.1'], 5000).


И получим результат:


{Index, PID, {W, WMax}}: {1,<0.1400.0>,{1,2}}
{Index, PID, {W, WMax}}: {2,<0.1401.0>,{2,2}}
got result: {1,<0.1400.0>,{ok,{'n1@127.0.0.1',1}}}
got result: {2,<0.1401.0>,{ok,{'n2@127.0.0.1',2}}}
{ok,[{ok,{'n1@127.0.0.1',1}},{ok,{'n2@127.0.0.1',2}}]}

Как можно заменить, результаты прилетели от двух нод, как мы и заказывали.


Вместо заключения


Наш простой пример показывает, что в своей сфере применения Erlang позволяет относительно просто решать полезные задачи (которые не так просто решить с помощью других языков программирования).


Из-за короткого формата статьи могут быть вопросы относительно кода и сборки библиотеки, которые остались за кадром.


Какие-то детали можно посмотреть в GitHub: тут.


Остальные детали мы обещаем осветить в следующих статьях.

Комментарии (14)


  1. Evengining
    28.04.2018 17:58

    Отличная стать, хорошо что еще есть отличные авторские работы по отличным языкам!


    1. mspain
      29.04.2018 04:59

      На мой взгляд работа станет неплохой если результат хотя бы минимально сравнят с альтернативными решениями. А то получился сферический эрланг в вакууме. У меня например есть опасение, что кластер на эрланге будет работать медленнее, чем однопоточный вариант на си.


      1. mspain
        29.04.2018 05:33

        А программа-минимум, бенчмарки относительно самого себя. Сейчас доступно море технологий дающих линейное масштабирование, является ли эрланг одной из них? С 1000 воркеров скорость в 1000 раз больше? Если поставить 50 нод, вырастет ли в 50 раз производительность?


      1. ai212983
        29.04.2018 10:44

        в статье предполагается, что читатель в курсе, что такое кластер, распределенное вычисление, и зачем они нужны. если у вас есть задача, которая быстрее решается в один поток на си — делайте в один поток на си.


      1. ady1981 Автор
        29.04.2018 10:47

        Понимаю поинт, однако:
        1) Если в Erlang не делать вычислений, о которых известно, что они работают в Erlang медленно, то скорость работы Erlang приложения будет не хуже, чем аналогичного приложения на Java.
        2) В Erlang есть проработанный механизм, как подключать код на C. :)
        3) Мы не бьемся за проценты от скорости работы приложения. Для нас важнее скорость разработки, простота поддержки и мультиплатформенность.


        1. mspain
          29.04.2018 21:45

          Судя по всему поинт таки не поняли. Например в java есть stream api, там есть примерно тот же самый map и очень похожий parallel (который сам делает всё волшебство). Загвоздка в том, что на многих задачах это приводит к тормозам, даже когда в теории вроде как должно параллелиться. Так и тут — без бенчмарков неубедительно.


          1. JC_IIB
            30.04.2018 11:02

            Например в java есть stream api


            А это… когда оно там появилось, сколько лет назад?


          1. ady1981 Автор
            01.05.2018 17:03

            Я отвечал на вопрос: Erlang cluster VS однопоточный вариант на С :).
            Что касается тестирования Erlang кластера как кластера, то, да, тестирование тут очень важно. Мы тут не привнесли ничего нового: кластер, который мы поднимаем — стандартный Erlang кластер, для которого бенчмарки, по идее, уже должны быть сделаны другими исследователями до нас :). Вопрос действительно важный, я скину ссылки на эти исследования, если найду.
            Спасибо за предложение сравнить Erlang кластер с другими решениями, возможно в будущем мы сделаем такие бенчмарки и опубликуем их (это не тривиальная задача).


    1. ady1981 Автор
      29.04.2018 10:36

      Спасибо.
      Erlang — язык со своими плюсами и минусами.
      Некоторые задачи решаются в Erlang очень красиво, а следовательно — быстро и надежно.


  1. samsonoveu
    29.04.2018 10:47

    Было бы полезно, если бы кто-нибудь написал похожую статью, но используя Elixir.


    1. eoffsock
      29.04.2018 12:04

      Там все значительно проще. Мне кажется, даже статья не понадобится. Смотрите.

      Это parallel map на локальной ноде

      defmodule Parallel do
        def pmap(collection, func) do
          collection
          |> Enum.map(&(Task.async(fn -> func.(&1) end)))
          |> Enum.map(&Task.await/1)
        end
      end


      Чтобы пускать таски на удаленных нодах, нам нужен TaskSupervisor вместо обычного Task
      # On the remote node
      Task.Supervisor.start_link(name: MyApp.DistSupervisor)
      
      # On the client
      Task.Supervisor.async({MyApp.DistSupervisor, :remote@local},
                            MyMod, :my_fun, [arg1, arg2, arg3])


      Естественно, супервизор надо будет в supervision tree добавить или пускать через start_link, и модуль на нодах написать, который будет дергаться через TaskSupervisor. Остальное сделает эликсир сам.

      Ссылки, куда посмотреть можно:
      hexdocs.pm/elixir/Task.Supervisor.html
      elixir-recipes.github.io/concurrency/parallel-map
      elixir-lang.org/getting-started/mix-otp/distributed-tasks-and-configuration.html


      1. samsonoveu
        29.04.2018 18:52

        Спасибо за подробный ответ.


      1. ady1981 Автор
        01.05.2018 17:12

        Ничего не имею против вашего решения, но действительно ли это решение эквивалентно
        нашему?
        1) Есть ли ограничение по количеству одновременно запущенных процессов-воркеров?
        2) Есть ли остановка в случае креша в любом из воркеров?
        3) Есть ли остановка в случае таймаута при ожидании ответа от воркера?
        4) Можете привести примеры тестов, которые у нас приведены для `dmap_pmap:map`?

        Elixir компилирует в Erlang VM, поэтому на вскидку не совсем понятно, зачем писать отдельное решение для dmap на Elixir, если можно просто использовать готовое решение на Erlang, которое просто синтегрировать со своим Elixir кодом :).


        1. eoffsock
          01.05.2018 20:36

          Это не эквивалентное решение. Это решение тех же задач. Да и вообще, я не собираюсь меряться кодом. Человек спросил, как на Elixir сделать аналогичную штуку, я ответил. В общих чертах, но по документации легко будет разобраться в тонкостях.

          1. Сколько запустите, столько и будет. Ноды назначаются явно. Разумеется, в случае реального боевого проекта это можно легко автоматизировать.
          Однако если я захочу ограничить количество параллельных процессов, я не буду писать велосипед, а использую poolboy.

          Да и вообще, poolboy прекрасно решает большинство проблем по распределению задач между процессами.

          2. Креш Task уронит вызывающую функцию. Строго говоря, несмотря на парадигму let it crash, я не считаю это хорошей идеей. Но механизм есть.

          3. await(task, timeout \\ 5000)
          Но опять же, я не считаю остановку в случае вылета по таймауту хорошей идеей. У меня на проекте есть такой вариант:

          ret = case Task.yield(task, 60000) do
                {:ok, result} ->
                  result
                nil ->
                  Task.shutdown(task)
                  :timeout
              end

          По крайней мере я хочу получить результаты, которые есть. С проблемами разберемся позже. Но у меня такой род приложений, что частичная потеря данных простительна, а вот лишние задержки — не очень хорошо.

          4. Нет, тестов нет. При случае сделаю, пока задачи запускать таски на удаленных нодах не было. pmap используем, но на локальных нодах.

          У нас проекты на Elixir, и я не вижу смысла писать на Erlang и потом интегрировать код с Elixir, если могу сразу все сделать на Elixir. Даже учитывая то, что мне все равно часто приходится возиться с кодом на Erlang.