Elixir и Erlang идеально подходят для создания распределенных приложений, выполняющих параллельно несколько, возможно схожих задач. Поддержка многих конкурентных процессов работающих в изоляции была одним из основных аспектов при разработке виртуальной машины языка Erlang.
Постараемся проверить эту возможность использовать потенциал многоядерного процессора на простом примере. Подсчитаем сколько раз встечается слово "лошадь" в рассказах писателя О. Генри размещенных в текстовых файлах в одной директории. Технически, мы будем считать количество вхождения последавательности символов "лошадь", а не слова, и только в нижнем регистре.
Подcчет вхождений подстроки в файлах
Начнем с функции подсчета количества вхождений подстроки в содержимом текстового файла.
word_count = fn(file, word) ->
{:ok, content} = File.read(file)
length(String.split(content, word)) - 1
end
Читаем содержимое файла и возвращаем количество упоминаний слова. Обработка ошибок опущена, для простоты.
Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.
word_count = fn(file, word) ->
:timer.sleep(1000)
{:ok, content} = File.read(file)
count = length(String.split(content, word)) - 1
IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
count
end
Теперь посчитаем количество подстроки в каждом файле и выведем сумму.
Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts
И заодно замерим время выолнения всей программы.
# sync_word_count.exs
start_time = :os.system_time(:milli_seconds)
word_count = fn(file, word) ->
:timer.sleep(1000)
{:ok, content} = File.read(file)
count = length(String.split(content, word)) - 1
IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
count
end
Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts
end_time = :os.system_time(:milli_seconds)
IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"
Всего у меня 12 файлов и ждать пришлось около 12-ти секунд, секунда за секундой созерцая как на мониторе появляется результат подсчета для каждого файла.
iex sync_word_count.exs
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
32
Finished in 12.053 seconds
Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)
Асинхронное выполнение задач
Для подсчета количества вхождений подстроки асинхронно воспользуемся методом создания процесса spawn
(порождать) и методами send
and receive
для отправки и получения сообщения, соответственно.
Для каждого файла будем создавать отдельный процесс
async_word_count = fn(file, word) ->
caller = self
spawn(fn ->
send(caller, {:result, word_count.(file, word)})
end)
end
self
— это текущий процесс. Создаем переменную caller
с тем же значением, что и self
. Порожденный процесс вызывает функцию word_count/2
и посылает результат обратно родительскому процессу.
Чтобы получить значение обрано, в родительком процессе нужно использовать receive
(столько же раз, сколько процессов будет создано). Создадим для этого метод get_result/0
.
get_result = fn ->
receive do
{:result, result} -> result
end
end
Обновим программу.
# async_word_count.exs
start_time = :os.system_time(:milli_seconds)
word_count = fn(file, word) ->
:timer.sleep(1000)
{:ok, content} = File.read(file)
count = length(String.split(content, word)) - 1
IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
count
end
async_word_count = fn(file, word) ->
caller = self
spawn(fn ->
send(caller, {:result, word_count.(file, word)})
end)
end
get_result = fn ->
receive do
{:result, result} -> result
end
end
Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
|> Enum.map(fn(_) -> get_result.() end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts
end_time = :os.system_time(:milli_seconds)
IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"
Проверим.
iex async_word_count.exs
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
32
Finished in 1.014 seconds
Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)
Вывод
Приключения и лошади раньше были неотделимы друг от друга, сейчас уже, возможно, это не совсем так.
Ссылки
» http://elixir-lang.org/getting-started/processes.html
» http://culttt.com/2016/07/27/understanding-concurrency-parallelism-elixir/
» https://elixirschool.com/lessons/advanced/concurrency/
» Код и текстовые файлы (папка OGENRI)
Комментарии (10)
Virviil
25.09.2016 22:45+2Тема действительно интересная, я сам недавно сталкивался с распараллеливанием вычислений в Elixir, поэтому сделал небольшой рефакторинг и добавил основную основную фичу, значительно облегчающую распараллеливание — Task.
Читать по ссылке, а пока предлагаю взглянуть на код:
Весь длинный код работающего скриптаdefmodule Wordcount do @main_word "лошадь" defp count(text, word) do length(String.split(text, word)) - 1 end defp do_proceed(file_path) do File.read!(file_path) |> count(@main_word) |> (fn (count) -> IO.puts "Found #{count} occurrence(s) of the word in file #{file_path}" end).() end def proceed(:async) do Path.wildcard("./OGENRI/*.txt") |> Enum.map(&Task.async(fn -> do_proceed(&1) end)) |> Enum.map(&Task.await/1) end def proceed(:sync) do Path.wildcard("./OGENRI/*.txt") |> Enum.map(&do_proceed/1) end ### Example: # benchmark # benchmark(:sync) def benchmark(type \\ :async) do start_time = :os.system_time(:milli_seconds) proceed(type) end_time = :os.system_time(:milli_seconds) IO.puts "Finished in #{(end_time - start_time)} miliseconds" end end
erlyvideo
26.09.2016 01:24+11что будет, если взять презерватив и долго учиться его натягивать на кактус?
Такая же искусственная конструкция.
Создание и управление процессами в эрланге — это достаточно нетривиальная штука, потому что надо следить как за ошибками в них, так и за используемыми ресурсами, да и поддерживать целостным дерево процессов что бы можно было остановить всю задачу за корневой.
Просто бездумно форкаться в ожидании успеха по процессу на каждый элемент списка — это годится только для статьи на хабр, но никак не для чего-то, что можно выложить на продакшн.
Path.wildcard("/data/OGENRI/*.txt") |> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
вы здесь создаете уйму неуправляемых процессов на каждый файл. Вы моментально схлопочете emfile, но узнать об этом не получится, потому что вы даже spawn_link не делаете.
К сожалению, вы демонстрируете очень вредный и неряшливый подход на неверных примерах.nwalker
26.09.2016 20:20Это вообще беда elixir-комьюнити. Я смотрю за статьями на хабре, смотрю в elixir subreddit, и это какой-то ад — толковых постов почти нет, везде какой-то треш и полное непонимание того, как надо писать на Erlang.
roman_kashitsyn
26.09.2016 14:26Ваш результат лишь показывает, что спать асинхронно в 12 потоков быстрее, чем спать 12 раз синхронно в одном.
Игрушечный пример на одной машине не показывает оверхеда от передачи данных по сети (который может быть очень значительным), управления задачами (что если нода упадёт?) и т.п.
Кроме того, "классический" MapReduce устроен посложнее, чем у вас написано: редьюсеров обычно больше одного и результаты разделяются между редьюсерами по хэшу ключа и сортируются, это как раз одна из ключевых идей. Такой подход позволяет распараллелить фазу редьюса, избежать перегрузки машины, которая выполняет редьюс (все ключи могут просто не помещаться в память), и терять меньше работы в случае аварийного завершения редьюсера.
Перемножить большие матрицы на небольшом кластере или сделать JOIN независимых датасетов по какому-нибудь общему ключу было бы гораздо более показательно.
a-motion
27.09.2016 12:34Для этих целей Jose? предлагает использовать https://hexdocs.pm/gen_stage/Experimental.Flow.html
Подробнее: http://elixir-lang.org/blog/2016/07/14/announcing-genstage/
На последней конференции он два часа про GenStage говорил.erlyvideo
27.09.2016 18:21интересная может быть штука
a-motion
28.09.2016 10:52Почему «может быть»? Я ее уже в продакшене в хвост и в гриву гоняю. Валим специально оговорился, что
Experimental
там только из-за возможных коллизий с именами/переименованиями, код — production-ready.erlyvideo
28.09.2016 11:03практически всё с чем мы работаем (кроме самой beam.smp и основ OTP) требует допила.
Хосе Валим хороший программист, но он из рельс, а там очень и очень принято делать proof of concept, а потом как срастется.
Как срастется в случае с эрлангом означает OOM и рассыпающиеся штуки под нагрузкой.
Какие у вас цифры на хвосте и гриве? Сколько трафика, сколько событий, как часто срабатывает система защиты от перегрузки внутри пайплайна, какая у вас запланированная реакция системы на перегруз внутренних частей пайплайна?
third112
Спасибо за интересное исследование. Поэтому возник вопрос, т.к. неинтересные публикации вопросов не вызывают:
Этот момент вызвал сомнение. Значительное время тест не работает, а спит. Т.о. тестируем сон, а не только работу.Sirikid
Очевидно считать лошадей и спать секунду это модель тяжелого вычисления.