Привет, Хабр!

Horovod — это фреймворк для распределенного глубокого обучения, изначально разработанный в Uber. Он позволяет масштабировать обучение моделей на сотни и тысячи GPU, сокращая время тренировки с недель до часов. Horovod поддерживает такие фреймворки, как TensorFlow, Keras, PyTorch и Apache MXNet, и легко интегрируется с существующими кодовыми базами, требуя минимум изменений.

В статье как раз и пойдет речь о том, как масштабировать модельки с помощью Horovod и Kubernetes.

Установим и настроим все необходимое

Docker необходим для контейнеризации приложений и их переносимости.

  1. Установка Docker на Ubuntu:

    sudo apt-get update
    sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common
    curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
    sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
    sudo apt-get update
    sudo apt-get install -y docker-ce
    sudo usermod -aG docker ${USER}

Kubernetes используется для оркестрации контейнеров.

  1. Установка Minikube (локал версия Kubernetes) на Ubuntu:

    curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
    chmod +x minikube
    sudo mv minikube /usr/local/bin/
    minikube start --driver=docker
  2. Установка kubectl:

    curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
    chmod +x kubectl
    sudo mv kubectl /usr/local/bin/

А теперь установим сам Horovod:

  1. Установка Horovod с поддержкой GPU:

    FROM tensorflow/tensorflow:2.16.1-gpu
    
    RUN apt-get update && apt-get install -y \
        libgl1-mesa-glx \
        libglib2.0-0
    
    RUN pip install --upgrade pip
    RUN pip install horovod
    
    COPY . /app
    WORKDIR /app
    
    CMD ["python", "train.py"]

Для развертывания распределенного обучения с Horovod на Kubernetes нужно создать и настроить соответствующие манифесты Kubernetes.

  1. Создадим Docker-образ:

    sudo apt-get update
    sudo apt-get install -y libopenmpi-dev openmpi-bin
  2. Построение и пуш Docker-образа:

    sudo apt-get install -y build-essential devscripts debhelper fakeroot
    sudo apt-get install -y libnccl2 libnccl-dev
  3. Создание манифеста Kubernetes для MPIJob:

    export HOROVOD_GPU_ALLREDUCE=NCCL
    export HOROVOD_GPU_BROADCAST=NCCL
  4. Запуск MPIJob на Kubernetes:

    import horovod.tensorflow as hvd
    import tensorflow as tf
    
    hvd.init()
    
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    
    # код построения модели и обучения

Для адекватной работы Horovod необходимо плюсом настроить MPI и NCCL.

  1. Установка MPI:

    sudo apt-get update
    sudo apt-get install -y libopenmpi-dev openmpi-bin
  2. Установка NCCL:

    sudo apt-get install -y build-essential devscripts debhelper fakeroot
    sudo apt-get install -y libnccl2 libnccl-dev
  3. Конфигурация NCCL:
    Убеждаемся, что среда готова к использованию NCCL:

    export HOROVOD_GPU_ALLREDUCE=NCCL
    export HOROVOD_GPU_BROADCAST=NCCL

Основные возможности horovod

Horovod использует концепции, основанные на MPI. Основные шаги инициализации:

  • hvd.init(): вызов инициализирует Horovod и подготавливает его к работе. Внутри hvd.init() Horovod вызывает MPI_Init для инициализации среды MPI.

  • hvd.size(): возвращает количество процессов (или воркеров), участвующих в распределенном обучении.

  • hvd.rank(): возвращает текущий ранг процесса. Это уникальный идентификатор для каждого воркера.

  • hvd.local_rank(): возвращает ранг процесса на текущей машине.

Horovod использует несколько коллективных операций для синхронизации данных между процессами:

  • hvd.broadcast(): синхронизирует начальное состояние переменных между всеми процессами, передавая данные из одного процесса (обычно с рангом 0) всем остальным.

  • hvd.allreduce(): выполняет операцию сокращения (например, суммирование или нахождение среднего) и распространяет результат обратно всем процессам. Это ключевая операция для усреднения градиентов при распределенном обучении.

  • hvd.allgather(): собирает данные от всех процессов и распространяет объединенный результат каждому процессу. Используется для сбора информации, такой как метрики или веса модели.

Horovod тесно интегрируется с NCCL для того, чтобы коммуникации между GPU были особо высокопроизводительными.

  • HOROVOD_GPU_ALLREDUCE :переменная окружения, указывающая на использование NCCL для операций allreduce.

  • HOROVOD_GPU_BROADCAST: переменная окружения, указывающая на использование NCCL для операций broadcast.

Horovod поддерживает асинхронные коммуникации и конвейеры для минимизации времени простоя GPU. Это достигается через использование неблокирующих вызовов MPI и стратегий оверлапа вычислений и передачи данных:

# пример использования асинхронной передачи градиентов
hvd.allreduce_async(tensor, name)

В Horovod также есть механизмы для обнаружения сбоев и повторной инициализации процессов. При сбое одного из процессов Horovod может автоматом перезапустить его, сохранив текущее состояние обучения:

try:
    hvd.init()
except Exception as e:
    hvd.shutdown()
    hvd.init()

Примеры применения

Horovod можно легко интегрировать в код TensorFlow для организации распределенного обучения. Простой пример обучения модельки на нескольких GPU:

import tensorflow as tf
import horovod.tensorflow as hvd

# инициализация Horovod
hvd.init()

# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# построение модели
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0

# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]

# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)

Horovod также поддерживает PyTorch:

import torch
import horovod.torch as hvd
from torchvision import datasets, transforms
from torch import nn, optim

# Инициализация Horovod
hvd.init()

# Настройка устройств
torch.cuda.set_device(hvd.local_rank())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# загрузка данных
train_dataset = datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, sampler=train_sampler)

# определение модели
model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(28 * 28, 128),
    nn.ReLU(),
    nn.Linear(128, 10)
).to(device)

# определение оптимизатора
optimizer = optim.Adam(model.parameters(), lr=0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# синхронизация начальных состояний
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# обучение модели
model.train()
for epoch in range(5):
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.CrossEntropyLoss()(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0 and hvd.rank() == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')

Horovod можно использовать с Keras:

import tensorflow as tf
import horovod.tensorflow.keras as hvd

# инициализация Horovod
hvd.init()

# установка видимых устройств GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# построение модели
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())

# обертывание оптимизатора
optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# загрузка данных
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# трансформация данных
train_images = train_images / 255.0
test_images = test_images / 255.0

# Callback для синхронизации начальных состояний
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]

# обучение модели
model.fit(train_images, train_labels, batch_size=128, epochs=5, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)

Horovod также можно использовать с Apache MXNet:

import mxnet as mx
import horovod.mxnet as hvd

# инициализация Horovod
hvd.init()

# настройка устройств
context = mx.gpu(hvd.local_rank()) if mx.context.num_gpus() > 0 else mx.cpu()

# загрузка данных
train_data = mx.io.MNISTIter(
    image="train-images-idx3-ubyte",
    label="train-labels-idx1-ubyte",
    input_shape=(784,),
    batch_size=64,
    shuffle=True,
    flat=True
)

# определение модели
data = mx.sym.var('data')
fc1 = mx.sym.FullyConnected(data=data, num_hidden=128)
act1 = mx.sym.Activation(data=fc1, act_type="relu")
fc2 = mx.sym.FullyConnected(data=act1, num_hidden=10)
softmax = mx.sym.SoftmaxOutput(data=fc2, name='softmax')

# обертывание оптимизатора
optimizer_params = {'learning_rate': 0.01 * hvd.size()}
optimizer = mx.optimizer.create('adam', **optimizer_params)
optimizer = hvd.DistributedOptimizer(optimizer)

# настройка модели
model = mx.mod.Module(symbol=softmax, context=context)
model.fit(train_data, optimizer=optimizer, num_epoch=5)

Больше про фреймворки и инструменты Data Science и Machine Learning эксперты OTUS рассказывают в рамках практических онлайн-курсов. С полным каталогом курсов можно ознакомиться по ссылке.

Комментарии (1)


  1. erley
    30.05.2024 06:12

    Как вовремя статья появилась, да ещё и с примерами использования!
    Только собирался сесть с этим поразбираться...