Глубокие модели становятся всё больше и всё реже помещаются на один компьютер. Решением этой проблемы может быть распределённое обучение, когда используются ресурсы нескольких вычислительных узлов.

Представляю вниманию хабрассобщества перевод поста из блога LambdaLabs, компании, специализирующейся на инфраструктуре для глубокого обучения. Он посвящён распределённому обучению моделей PyTorch на нескольких вычислительных узлах.

PyTorch задуман как фреймворк которым легко пользоваться и который достаточно производителен даже в условиях больших задач. Действительно, он стал самым популярным фреймворком для глубокого обучения среди исследователей, оставив далеко позади остальных. Однако, за исключением длинных официальных руководств и записей в блогах, не всегда понятно, что нужно сделать, чтобы заставить PyTorch учить модель используя несколько вычислительных узлов.

Поэтому цель этого руководства в том, чтобы кратко показать, как в PyTorch написать и запустить распределенные задачи с использованием параллелизма данных. Мы исходим из предположения, что читатель имеет опыт использования PyTorch и обучения нейронных сетей, имеет представление о параллелизме данных и хочет быстро заполнить пробел между запуском задач на одной машине и масштабированием их на нескольких. Исходя из этого, руководство концентрируется на:

  • общих идеях о том, как под капотом работает параллелизм данных в PyTorch;

  • нескольких примерах шаблонного кода для обучения моделей в такой конфигурации;

  • том, как запустить эти примеры с помощью torch.distributed.launch, torchrun иmpirun .

Под капотом у распределённого PyTorch

Идея, лежащая в основе распределённого PyTorch, просто — нужно создать несколько процессов, которые выполняют некоторую простую задачу несколько раз и параллельно. Суть здесь та же самая, как у MPI, то есть, что‑то вроде команды mpirun -n 3 echo hello world, которая трижды повторяет задачу по выводу сообщения «hello world»:

ubuntu@ubuntu-desktop:~$ mpirun -n 3 echo hello world
hello world
hello world
hello world

Но есть две особенности, которые отличают обучение модели в PyTorch от вышеприведённого примера с mpirun.

  • Каждому процессу нужно предоставить доступ к аппаратному ускорителю (например, GPU), чтобы повысить эффективность вычислений при прямом и обратном проходе на каждом шаге обучения.

  • Обеспечить коммуникацию между процессами. Это нужно, чтобы в конце каждого шага обучения можно было аккумулировать значения градиентов и затем синхронно обновить веса модели для всех процессов.

Для решения этих задач PyTorch создаёт группу процессов, которые «привязаны к устройствам» (torch.distributed.init_process_group), объединяет их с помощью быстрой системы коммуникаций (например, nccl), и настраивает ваш конвейер данных и реализацию модели так, чтобы они могли работать поверх нескольких процессов (чаще всего с помощью torch.utils.data.distributed.DistributedSampler и torch.nn.parallel.DistributedDataParallel).

Ещё потребуется несколько переменных окружения, чтобы связать это всё воедино: WORLD_SIZE, WORLD_RANK и LOCAL_RANK . Их можно рассматривать как несколько странные имена для таких понятий как «общее количество GPU в кластере», «глобальный для кластера идентификатор GPU» и «локальный для узла идентификатор GPU». Как вы возможно уже догадались, они служат для идентификации процессов и обеспечения коммуникации между ними на протяжении жизни нашей задачи по обучению.

Пишем распределённое приложение в PyTorch

Теперь посмотрим на ряд примеров приложений PyTorch, распределённых на несколько узлов. Начнём с простого примера передачи сообщений и рассмотрим, как PyTorch Distributed Data Parallel использует переменные окружения чтобы создать процессы на нескольких узлах. После этого обсудим, как перенести полученный опыт на более сложный пример, обучение ResNet.

В целях демонстрации, этот пост использует два экземпляра 2xA600 в Lambda Cloud. Но те же самые подходы можно использовать и для on‑prem кластеров с более чем двумя серверами, например, для кластера Lambda Echelon.

Эти два узла 2xA600 (с адресами 104.171.200.62 и 104.171.200.182) позволяют запустить 4 рабочих процесса (по 2 на каждый узел).

Передача сообщений

Начнём с простого примера, который пересылает тензор между двумя процессами. Пример основан на официальном руководстве по PyTorch, но мы его несколько упростили (чтобы нам не пришлось беспокоится о реализации многопроцессности в PyTorch) и модифицировали, чтобы он работал с несколькими серверами, а не в пределах одного (ведь в этом‑то вся суть, так ведь?).

import os
import argparse

import torch
import torch.distributed as dist

# Environment variables set by torch.distributed.launch
LOCAL_RANK = int(os.environ['LOCAL_RANK'])
WORLD_SIZE = int(os.environ['WORLD_SIZE'])
WORLD_RANK = int(os.environ['RANK'])

def run(backend):
    tensor = torch.zeros(1)
    
    # Need to put tensor on a GPU device for nccl backend
    if backend == 'nccl':
        device = torch.device("cuda:{}".format(LOCAL_RANK))
        tensor = tensor.to(device)

    if WORLD_RANK == 0:
        for rank_recv in range(1, WORLD_SIZE):
            dist.send(tensor=tensor, dst=rank_recv)
            print('worker_{} sent data to Rank {}\n'.format(0, rank_recv))
    else:
        dist.recv(tensor=tensor, src=0)
        print('worker_{} has received data from rank {}\n'.format(WORLD_RANK, 0))

def init_processes(backend):
    dist.init_process_group(backend, rank=WORLD_RANK, world_size=WORLD_SIZE)
    run(backend)

if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--backend", type=str, default="nccl", choices=['nccl', 'gloo'])
    args = parser.parse_args()

    init_processes(backend=args.backend)

Код запускает 4 процесса и worker_0 создаёт тензор на GPU 0 узла 104.171.200.62, а затем рассылает его оставшимся трём процессам. Как только процесс получает тензор, он печатает сообщение:

# output from node 104.171.200.62
worker_0 sent data to Rank 1
worker_0 sent data to Rank 2
worker_0 sent data to Rank 3
worker_1 has received data from rank 0

# output from node 104.171.200.182
worker_2 has received data from rank 0
worker_3 has received data from rank 0

Давайте посмотрим подробнее, что происходит в этом скрипте.

  • Создаем группу процессов вызовом dist.init_process_group(backend, rank=WORLD_RANK, world_size=WORLD_SIZE). Каждый процесс будет обозначен как worker_$WORLD_RANK , всего в группе будет $WORLD_SIZE процессов.

  • Как только процесс создан, запускаем задачу вызовом функции run.

  • Обратите внимание, что WORLD_RANK и WORLD_SIZE используются в вызове dist.init_process_group для создания процесса и что LOCAL_RANK используется в функции run чтобы ассоциировать с процессом конкретный GPU. Такой подход позволяет убедиться, что процессы можно глобально идентифицировать, независимо от того, на каком сервере они выполняются, а также то что им корректно выделяются GPU.

  • Параметр backend указывает, какую библиотеку (nccl, gloo и т. п.) использовать для коммуникации. PyTorch обеспечивается обучение и создаёт распределённые процессы, но вынужден полагаться на эти библиотеки для организации взаимодействия между ними.

  • Отметим также, что параметр local_rank в этом примере никак не используется. Мы его оставим, чтобы наш код был совместим с torch.distributed.launch, который ожидает, что скрипт принимает параметр local_rank.

Вам может быть интересно, где мы задаём значения для этих переменных окружения. Это происходит снаружи скрипта, и мы это увидим позже, в разделе посвящённом запуску приложения. Прежде чем идти дальше, давайте проясним, значения этих переменных в рассмотренном примере.

  • LOCAL_RANK определяет идентификатор процесса в пределах одного узла. В этом примере каждый узел имеет только два GPU, так что LOCAL_RANK может быть только 0 или 1. Благодаря этому мы можем использовать эту переменную чтобы указать, какой GPU должен использовать процесс, используя выражение device = torch.device("cuda:{}".format(LOCAL_RANK)).

  • WORLD_SIZE определяет общее число процессов. У нас два узла и по два процесса на каждом, так что WORLD_SIZE=4. Параметр используется вызовом dist.init_process_group для создания группы процессов. В нашем примере мы также используем это значение чтобы worker_0 мог разослать тензор остальным процессам.

  • RANK (который мы для читаемости переименовали в WORLD_RANK) определяет идентификатор процесса в пределах кластера (всех узлов вместе). Так как WORLD_SIZE у нас 4, то RANK (или WORLD_RANK) может быть 0, 1, 2 или 3.

Обучаем ResNet

Теперь мы знаем основы написания приложений PyTorch, распределённых по нескольким узлам. Теперь обратимся к популярному коду для обучения ResNet от Lei Mao. Мы не будем копировать его в этот пост целиком, вместо этого мы сравним его подходы с тем, что мы уже видели выше в примере с передачей сообщений и обратим внимание на дополнительные примочки, используемые для обучения глубоких нейронных сетей.

Распространённые подходы для распределённых приложений в PyTorch.

  • torch.distributed.init_process_group(backend="nccl") скрипт для ResNet использует ту же саму функцию для создания процессов. Однако здесь не указаны ни rank ни world_size. Действительно, согласно документации PyTorch они не обязательны, значения будут получены из переменных окружения WORLD_SIZE и RANK. Так что никакой разницы.

  • device = torch.device("cuda:{}".format(local_rank)) : локальный идентификатор процесса используется чтобы выбрать GPU для процесса. В скрипте ResNet и примере с передачей сообщений это работает одинаково.

Но в сравнении с нашим примером передачи сообщений есть несколько дополнительных шагов, чтобы обучение работало.

  • ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank): скрипт ResNet использует распространённую в мире PyTorch практику создаёт обёртку над ResNet, чтобы она могла быть использована в распределённой среде.

  • train_sampler = DistributedSampler(dataset=train_set): ещё одна распространённая практика заключается в том, чтобы создать обертку над датасетом, чтобы его можно было использовать в распределённой среде.

На этом всё! Всё, что нужно сделать, это создать группу процессов (с помощью WORLD_SIZE и WORLD_RANK) назначить процессам GPU (используя LOCAL_RANK) и обернуть модель и датасет в DistributedDataParallel и DistributedSampler для работы в распределённой среде.

Запускаем распределённое приложение PyTorch на нескольких узлах

К этому моменту мы уже обсудили как написать распределённое приложение PyTorch так, чтобы оно работало на нескольких узлах. О чём мы пока не говорили, так это о том, как его запустить. Но теперь время запуска пришла, также, наконец, пока раскрыть тайну, как устанавливаются значения переменных окружения WORLD_SIZE, WORLD_RANK и LOCAL_RANK.

Есть разные способы «запустить» распределённые приложения на нескольких серверах, реализованные разными разработчиками, вроде ветеранов мира HPC (Open MPI), ведущими фреймворками глубокого обучения (PyTorch) или open‑source сообществом (Horovod).

В оставшейся части этого поста мы обсудим использование distributed.launch иtorchrun из PyTorch и mpirun из Open MPI. Мы выбрали эти методы из‑за их популярности взаимозаменяемости и потому, что они из коробки поддерживаются PyTorch. Мы не будем рассматривать Horovod потому что он требует установки дополнительных компонентов и внесения изменений в наш код, при том что мы можем достичь тех же результатов, используя mpirunбез дополнительной работы.

torch.distributed.launch

Чтобы использовать torch.distributed.launch для запуска нашей задачи по обучению ResNet, нам нужно зайти на наши серверы по ssh и выполнить команды.

# On 104.171.200.62 (the master node)
python3 -m torch.distributed.launch \
--nproc_per_node=2 --nnodes=2 --node_rank=0 \
--master_addr=104.171.200.62 --master_port=1234 \
main.py \
--backend=nccl --use_syn --batch_size=8192 --arch=resnet152

# On 104.171.200.182 (the worker node)
python3 -m torch.distributed.launch \
--nproc_per_node=2 --nnodes=2 --node_rank=1 \
--master_addr=104.171.200.62 --master_port=1234 \
main.py \
--backend=nccl --use_syn --batch_size=8192 --arch=resnet152

# Output from node 104.171.200.62
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...

# output from node 104.171.200.182
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
  • nproc_per_node задаёт количество процессов на каждый узел. Значение параметра должно быть равно количеству GPU на каждом узле, то есть, в нашем случае, 2.

  • nnodes задаёт общее количество узлов. В нашем случае тоже 2.

  • master_addr и master_port задают IP и порт мастер‑сервера. Нам нужно указать эти параметры чтобы PyTorch не ругался.

Эти параметры одинаковы для обоих наших узлов, но node_rank задает ранг узла. Мы должны использовать разные значения в разных командах. На мастер‑сервере используем 0, а на втором сервер — 1. Если у мастер‑сервера ранг не 0, то всё зависнет.

Не трудно догадаться, что torch.distributed.launch создаст параметры окружения WORLD_SIZE, WORLD_RANK и LOCAL_RANK для всех процессов используя значения параметров nproc_per_nod, nnodes иnode_rank .

torchrun

Разработанный командой PyTorch, torchrun работает похожим образом, что и torch.distributed.launch , но имеет дополнительные возможности по обработке сбоев процессов или изменению количество работающих процессов на лету. Фактически, torchrun можно запустить теми же командами, которые мы использовали для torch.distributed.launch:

# On 104.171.200.62 (the master node)
torchrun \
--nproc_per_node=2 --nnodes=2 --node_rank=0 \
--master_addr=104.171.200.62 --master_port=1234 \
main.py \
--backend=nccl --use_syn --batch_size=8192 --arch=resnet152

# On 104.171.200.182 (the worker node)
torchrun \
--nproc_per_node=2 --nnodes=2 --node_rank=1 \
--master_addr=104.171.200.62 --master_port=1234 \
main.py \
--backend=nccl --use_syn --batch_size=8192 --arch=resnet152

Обратите внимание, torchrun поддерживается начиная с PyTorch 1.10. Нужно также убедиться, что он добавлен в $PATH или мы получим ошибку, о том, что команда не найдена. Мы проверили torchrun в Lambda Cloud, создав virtual environment и установив последнюю стабильную версию PyTorch 1.12.1:

virtualenv -p /usr/bin/python3.8 venv-torchrun

. venv-torchrun/bin/activate

pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu116

mpirun

Хотя способ с torch.distributed.launch работает из коробки, являясь родным для PyTorch, нам приходится выполнять команды на каждом из узлов, соответствующим образом их адаптируя. Такой подход трудно масштабировать на большие кластеры и при его использовании легко ошибиться. Вместо этого, мы можем добиться того же результата, просто запустив mpirun всего один раз, на мастер‑сервере,

MPI, или Message Passing Interface, это стандартизированный способ передачи сообщений между процессами, принадлежащими распределенному приложению, запущенному на нескольких компьютерах. В случае обучения модели в PyTorch, «сообщение» содержит градиент, вычисленный каждым GPU, а обмен сообщениями позволяет обеспечить сбор этих градиентов со всех GPU и синхронное обновление весов нейронной сети. Стандарт MPI был разработан в середине девяностых и изначально был предназначен для высокопроизводительных кластеров, состоящих из большого числа компьютеров (преимущественно, считающих на CPU). Но благодаря последним достижениями в CUDA и появлению GPU‑кластеров, стали появляться новые библиотеки, такие как NCCL (NVIDIA Collective Communication Library), способные обеспечить быстрое взаимодействие между GPU, в том числе с учётом топологии кластера (используя технологии вроде RMDA поверх RoCE или InfiniBand). На практике, NCCL может быть легко интегрирован в приложения, использующие MPI. В этом случае MPI становится «фронтендом», ответственным за запуск параллельных задач, а NCCL становится «бекендом», обеспечивающим коммуникацию.

Есть две разновидности реализаций MPI — OpenMPI и MPICH. Нам подойдёт любая, но в рамках этого поста мы будем использовать OpenMPI. Прежде чем продолжать, вам нужно установить OpenMPI и NCCL. Но если вы используете Lambda Cloud, то этап установки можно пропустить, так как оба эти фреймворка входят в Lambda stack.

Итак, вот как запустить задачу на обучение с помощью команды mpirun, выполненной на мастер‑сервере.

# On 104.171.200.62 (the master node)
mpirun -np 4 \
-H 104.171.200.62:2,104.171.200.182:2 \
-x MASTER_ADDR=104.171.200.62 \
-x MASTER_PORT=1234 \
-x PATH \
-bind-to none -map-by slot \
-mca pml ob1 -mca btl ^openib \
python3 main.py --backend=nccl --use_syn --batch_size=8192 --arch=resnet152


# Output on 104.171.200.62
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 1, Epoch: 1, Training ...
Local Rank: 0, Epoch: 1, Training ...
Local Rank: 0, Epoch: 1, Training ...
Local Rank: 1, Epoch: 1, Training ...
num_steps_per_gpu: 19, avg_step_time: 1.4266
Local Rank: 1, Epoch: 2, Training ...
num_steps_per_gpu: 19, avg_step_time: 1.4266
Local Rank: 0, Epoch: 2, Training ...
num_steps_per_gpu: 19, avg_step_time: 1.4266
Local Rank: 0, Epoch: 2, Training ...
num_steps_per_gpu: 19, avg_step_time: 1.4266

Рассмотрим подробнее, как именно мы вызываем mpirun.

  • np: задаёт количество процессов (то, что раньше было WORLD_SIZE).

  • H: задаёт IP и количество процессов для каждого сервера. Если список слишком велик, то можно использовать hostfile.

  • -bind-to none: указывает, что OpenMPI не должен выделять процессу конкретный CPU (это может повредить производительности).

  • -map-by ob1 -mca btl ^openib: заставляет использовать для коммуникации между процессами TCP, вместо RDMA, с которым у OpenMPI есть ряд проблем, приводящих, обычно к segfault.

Есть ещё пара вещей, которые надо учитывать, используя mpirun с несколькими серверами.

  • Мастер‑сервер должен иметь не требующий пароля доступ ко всем серверам. Для этого вы можете выполнить ssh-keygen на мастер‑сервере, а затем добавить сгенерированный публичный ключ в ~/.ssh/authorized_keys на все остальные серверы (или запустить ssh-copy-id -i master-public-key username@worker-ip если сервер допускает доступ с паролем).

  • Также, как и torch.distributed.launch, mpirun создаёт необходимые переменные окружения, правда, под другими именами:

LOCAL_RANK = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
WORLD_SIZE = int(os.environ['OMPI_COMM_WORLD_SIZE'])
WORLD_RANK = int(os.environ['OMPI_COMM_WORLD_RANK'])

Другими словами, один и тот же скрипт может быть запущен как при помощи torch.distributed.launch так и mpirun , при условии что он понимает, какие переменные окружения использовать.

Производительность в Lambda Cloud

При запуске обучения в распределённом режиме важно учитывать общую эффективность масштабирования. В идеальном случае пропускная способность обучения (то есть количество изображений, обработанных в секунду) должна расти линейно с увеличением количества GPU. На практике чаще всего это не так из‑за того, что узким местом становится взаимодействие GPU.

Сервер в Lambda Cloud, использованный в этом посте, поддерживает до 100Gbps при взаимодействии между серверами. В итоге мы можем достичь удовлетворительного эффекта от роста числа серверов, обучая ResNet152 на CIFAR10.

Пропускная способность процесса обучения, изображений/сек.
Пропускная способность процесса обучения, изображений/сек.

Пропускная способность для графика выше вычислена как отношение глобального размера батча (8192 x 4 = 32768) к среднему времени обработки (1.4266 секунды для 2-х процессов на 2-х серверах). Синий сегмент графика показывает, что пропускная способность увеличивается практически линейно при увеличении числа процессов с 1 до 4-х. Светло‑синий сегмент в правой части показывает разницу между фактической и расчётной пропускной способностью, как если бы эффект от масштабирования был идеален. Обратите внимание, что такой эффект был достигнут при использовании батчей большого размера (8192 изображения на одни GPU), довольно большой модели (ResNet152) и всего двух узлов. Батчи меньшего размера, более простая модель или большее число узлов потребовали бы большей скорости взаимодействия между GPU, чтобы добиться эффективного обучения. Серверы Lambda Cloud Reserved или кластер Lambda Echelon (и то и то предоставляет до 1600 Gbps) могут быть хорошим вариантом для такой задачи.

Заключение

Этот пост даёт краткий обзор того, как «под капотом» работает PyTorch при распределении работы на несколько узлов и того, как написать и как запустить распределённые приложения в PyTorch.

Код всех примеров в репозитории.

Комментарии (3)


  1. Nabusteam
    13.08.2023 06:46

    У wilowrid.com можно заюзать транскрайбер и в нем же конвертнуть на рус, побыстрее можно такого рода статьи паблишить.


  1. Kwent
    13.08.2023 06:46

    Спасибо за статью, а были эксперименты бОльших масштабов, не 2х2, а, у условно, 10х8? Если да, то там сохраняется почти линейное ускорение с тем же коэффициентом или лучше/хуже?


    1. IvaYan Автор
      13.08.2023 06:46

      Это перевод, так что я не могу ответить на этот вопрос. Автор пишет, что цель поста заполнить промежуток между запуском на одной машине и документацией PyTorch где вопрос распределённого обучения обсуждается в широком смысле, так что эксперименты они проводили только на двух машинах.