Привет, Хабр!

Сегодня поговорим с вами на тему распределённых вычислений на Elixir. Мы больше не можем позволить себе писать приложения, которые работают только на одной машине. Пришло время создавать системы, которые могут выдержать любую нагрузку и работать бесперебойно даже в случае выхода из строя отдельных компонентов.

Elixir — это язык программирования, который вырос на основе мощной виртуальной машины BEAM, используемой в Erlang. Это сразу даёт нам ряд фич: хорошую масштабируемость, встроенную поддержку конкурентности и возможность строить распределённые системы.

В этой статье мы рассмотри основные инструменты для реализации распределённых вычислений Elxir.

Основы

Начнём с главного героя — виртуальной машины BEAM. BEAM — это сердце как Erlang, так и Elixir. В отличие от большинства виртуальных машин, BEAM изначально разрабатывалась с учётом потребностей телекоммуникационных систем, где требования к отказоустойчивости и низкой задержке важны.

Основные фичи BEAM:

  1. Лёгкие процессы: BEAM управляет миллионами лёгких процессов с минимальной накладной. Эти процессы изолированы.

  2. Многопоточность и параллелизм: BEAM автоматически распределяет процессы по доступным ядрам процессора.

  3. Модель акторов: В Elixir и Erlang процессы не разделяют память и общаются между собой исключительно через сообщения.

  4. Горячая замена кода: BEAM поддерживает горячее обновление кода.

Чтобы понять, насколько лёгким является процесс на BEAM, посмотрим на простой пример:

spawn(fn -> IO.puts("Hello from a process!") end)

Этот код создаёт новый процесс, который выводит сообщение в консоль. Создание и завершение процесса практически не требует ресурсов.

Теперь рассмотрим три основных компонента, на которых строятся распределённые системы в Elixir: Node, Process и Message Passing.

Узлы

Node — это независимая экземпляра виртуальной машины BEAM. В контексте распределённых систем узел представляет собой отдельную ноду, которая может работать на одном сервере или на множестве различных серверов.

Для подключения одного узла к другому используется команда:

Node.connect(:"node2@localhost")

Можно апускать несколько узлов на одном или разных хостах, и каждый узел будет взаимодействовать с другими через сообщения. Все узлы в сети должны использовать один и тот же cookie для аутентификации.

Процессы

Процессы в Elixir — это фундаментальные строительные блоки, на которых базируются распределённые системы. Каждый процесс изолирован и общается с другими процессами только через сообщения.

Пример простого процесса, который может принимать сообщения:

defmodule SimpleProcess do
  def start do
    spawn(fn -> loop() end)
  end

  defp loop do
    receive do
      {:msg, from, text} ->
        IO.puts("Received message: #{text}")
        send(from, {:ok, "Message processed"})
        loop()
    end
  end
end

# Запуск процесса
pid = SimpleProcess.start()

# Отправка сообщения
send(pid, {:msg, self(), "Hello, process!"})

# Получение ответа
receive do
  {:ok, reply} -> IO.puts("Reply: #{reply}")
end

Процесс может получать и обрабатывать сообщения, а также отправлять ответы.

Передача сообщений

Передача сообщений в Elixir осуществляется с помощью операторов send и receive. Эта модель взаимодействия называется моделью акторов и позволяет создавать системы, в которых процессы не блокируют друг друга.

Как уже упоминалось ранее, процессы в Elixir изолированы друг от друга и не имеют общей памяти, что снижает вероятность ошибок, связанных с многопоточностью.

Теперь рассмотрим как настроить и запустить распределённый кластер на Elixir.

Запуск нескольких нод — это первый шаг к созданию распределённого кластера.

Так можно запустить несколько нод на локальном ПК:

# Запуск первой ноды
iex --sname node1 --cookie my_secret_cookie

# Запуск второй ноды в новом терминале
iex --sname node2 --cookie my_secret_cookie

После того как ноды запущены, их можно подключить друг к другу:

# Подключение node1 к node2
Node.connect(:"node2@localhost")

# Проверка подключения
Node.list() # должно вывести [:node2@localhost]

После того как ноды подключены, можно организовать взаимодействие между процессами на разных нодах. Для этого достаточно использовать те же команды send и receive, но с указанием PID процесса на другой ноде.

Пример отправки сообщения на другой ноде:

# На node1
pid = Node.spawn(:"node2@localhost", fn ->
  receive do
    {:msg, text} -> IO.puts("Message from node1: #{text}")
  end
end)

# На node2
send(pid, {:msg, "Hello from node1!"})

Процесс создаётся на node2, но отправка сообщения осуществляется с node1.

Важно не только организовать взаимодействие между процессами, но и обеспечить их надёжность. Это достигается с помощью супервизоров — компонентов OTP, которые автоматически перезапускают процессы в случае их сбоя.

Пример простого супервизора, управляющего процессами на разных нодах:

defmodule ClusterSupervisor do
  use Supervisor

  def start_link(_) do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    children = [
      {Task, fn -> Node.spawn(:"node1@localhost", MyWorker, :start_link, []) end},
      {Task, fn -> Node.spawn(:"node2@localhost", MyWorker, :start_link, []) end}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

# Запуск супервизора
ClusterSupervisor.start_link(nil)

Спервизор управляет двумя процессами, запущенными на разных нодах. Если какой-либо из этих процессов завершится с ошибкой, супервизор автоматически его перезапустит.

Полезные инструменты

Distributed task

Task — это базовая абстракция в Elixir для управления асинхронными задачами, которая упрощает запуск процессов и обработку результатов. В контексте распределённых систем Task позволяет запускать задачи на разных нодах, делая код более параллельным и, соответственно, более быстрым.

Встроенный модуль Task можно использовать для создания распределённых задач, выполняемых на удалённых нодах.

Допустим, есть задача, которая занимает много времени и ресурсоёмка для одного компьютера. Можно распределить эту задачу между несколькими узлами в кластере, используя Task:

defmodule DistributedTaskExample do
  def start(nodes) do
    nodes
    |> Enum.map(&Task.async(fn -> perform_task(&1) end))
    |> Enum.map(&Task.await/1)
  end

  defp perform_task(node) do
    Node.spawn(node, fn ->
      # симуляция длительной операции
      Process.sleep(1000)
      IO.puts("Task completed on node: #{Node.self()}")
    end)
  end
end

# Использование:
nodes = [:"node1@localhost", :"node2@localhost"]
DistributedTaskExample.start(nodes)

Создаем и запускаем задачи на нескольких нодах, используя Task.async/1 и Task.await/1. Каждый узел выполняет свою задачу, а результаты синхронно собираются на ноде, откуда был инициирован запуск.

Task хорош для относительно простых задач, которые можно легко параллелить, не требуя сложного управления состоянием или обработки ошибок.

GenServer

GenServer — это компонент в экосистеме OTP на Elixir, представляющий собой серверный процесс с возможностью обработки запросов от других процессов. Он имеет набор абстракций для построения устойчивых серверов, работающих как на одной ноде, так и в распределённой системе.

GenServer – отличный выбор, когда нужно поддерживать состояние между вызовами, обрабатывать сообщения, асинхронные запросы или управлять сложными вычислениями.

Для того чтобы начать использовать GenServer, создадим простой пример распределённого сервера, который будет поддерживать общее состояние между всеми подключёнными нодами:

defmodule DistributedCounter do
  use GenServer

  # API
  def start_link(initial_value) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end

  def increment() do
    GenServer.call(__MODULE__, :increment)
  end

  def get() do
    GenServer.call(__MODULE__, :get)
  end

  # Callbacks
  def init(initial_value) do
    {:ok, initial_value}
  end

  def handle_call(:increment, _from, state) do
    {:reply, state + 1, state + 1}
  end

  def handle_call(:get, _from, state) do
    {:reply, state, state}
  end
end

# запуск на одной ноде
{:ok, _pid} = DistributedCounter.start_link(0)

# увеличение счётчика с другой ноды
Node.connect(:"node2@localhost")
:rpc.call(:"node2@localhost", DistributedCounter, :increment, [])

# получение значения счётчика
DistributedCounter.get() # должно вернуть 1

DistributedCounter является распределённым счётчиком, который увеличивает своё значение при вызове метода increment/0. :rpc.call/4позволяет вызывать функции на удалённых нодах.

Swarm

Swarm — это библиотека для динамического распределения процессов в кластере Elixir. Она автоматически управляет распределением процессов между узлами, обеспечивая равномерную нагрузку и высокую отказоустойчивость. Swarm хорошо интегрируется с GenServer и другими компонентами OTP.

Предположим, нужно создать сервис, который обрабатывает множество задач, и хочется, чтобы эти задачи распределялись по всем доступным узлам. Используя Swarm, можно легко добиться этого:

defmodule MyWorker do
  use GenServer
  use Swarm, strategy: :gossip

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: :my_worker)
  end

  def handle_call(:do_work, _from, state) do
    # Симуляция работы
    Process.sleep(1000)
    {:reply, :ok, state}
  end
end

# Запуск работы
Swarm.register(:my_worker, MyWorker, [])
GenServer.call(:my_worker, :do_work)

Swarm автоматически распределит процесс MyWorker на доступные узлы.

Horde

Horde — это библиотека для создания распределённых супервизоров и реестров.

Пример использования Horde для создания распределённого супервизора, который управляет набором процессов:

defmodule MyApp.DistributedSupervisor do
  use Horde.DynamicSupervisor

  def start_link(_) do
    Horde.DynamicSupervisor.start_link(
      name: __MODULE__,
      strategy: :one_for_one,
      members: :auto
    )
  end

  def start_worker(worker_module, args) do
    Horde.DynamicSupervisor.start_child(__MODULE__, {worker_module, args})
  end
end

defmodule MyWorker do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: :my_worker)
  end

  def handle_call(:do_work, _from, state) do
    Process.sleep(1000)
    {:reply, :ok, state}
  end
end

# Запуск супервизора
{:ok, _pid} = MyApp.DistributedSupervisor.start_link(nil)

# Добавление worker в распределённый супервизор
MyApp.DistributedSupervisor.start_worker(MyWorker, [])

Horde управляет супервизором, который распределяет процессы по узлам. Если один из узлов выходит из строя, Horde автоматически переместит процессы на другие узлы.


В заключение приглашаем всех системных аналитиков и тех, кто интересуется этой областью, на открытый урок 20 августа — на нем рассмотрим отличия пользовательских сценариев (Use Cases) и пользовательских историй (User Story). Записаться на урок можно бесплатно на странице онлайн-курса.

Комментарии (1)


  1. napolskih
    17.08.2024 17:40

    Спасибо за статью по Elixir и OTP!

    Нужно нести людям такие прекрасные технологии.