Когда кто-то занимается машинным обучением, его задачи часто представляют так: обработать данные, провести несколько десятков экспериментов с разными архитектурами моделей и выбрать ту, которая даёт самые высокие метрики. Действительно, все эти шаги — неотъемлемая часть работы ML-специалиста, однако она максимально вариативна и может быть реализована с помощью бесконечного количества инструментов: от самописных скриптов до готовых решений.

Кто мы такие?

Меня зовут Смирнова Мария, я инженер по машинному обучению в ML-команде проектов для бизнеса и покупателей ВКонтакте. Расскажу, как мы оптимизировали обучение моделей, внедрив инструмент Kubeflow Pipelines в нашу практику. Я хочу показать, чем Kubeflow Pipelines может быть полезен ML-специалисту и как в нём без потери существующей кодовой базы можно запускать готовые пайплайны.

В организационной структуре VK наше бизнес-подразделение именуют как «Средний и Малый бизнес». Мы разрабатываем функциональность, которая помогает продавцам и покупателям найти друг друга: продавцу — разместить объявление о продаже своего товара так, чтобы как можно быстрее найти «своего» клиента, а клиенту — предоставить площадку, на которой он сможет ознакомиться с товарами, как минимум один из которых точно захочет купить.

Большая часть наших задач связана с NLP. Мы экспериментируем с различными методами обработки данных и десятками архитектур: от классических моделей до продвинутых нейронных сетей. Вот лишь некоторые примеры наших задач:

  • Классификация постов: может ли пост о продаже товара на стене быть переформатирован в карточку товара?

  • Извлечение именованных сущностей: нужно достать из текста объявлений основные характеристики товара, например, название и цену.

  • Кластеризация по названиям и фотографиям товаров.

При процесс решения абсолютно любой задачи выглядит следующим образом:

  1. Этап проектирования: с заказчиком (это менеджер продукта, сотрудник другого подразделения или член нашей команды) мы обговариваем цель, как мы будем измерять успешность решения, договариваемся о сроках. Со своей стороны продумываем этапы выполнения задачи, то есть попросту её декомпозируем.

  2. Этап разработки: сбор данных, их обработка, проведение ML-экспериментов, выбор лучшей модели, её тестирование.

  3. Этап внедрения: этим тоже занимаемся мы, то есть наша работа не заканчивается на самой модели — дальнейшее её затаскивание в продукт происходит также с помощью ML-специалистов.

ML-разработка — это больше, чем Jupyter Notebook, который считается ключевым инструментом в нашей работе. Бесспорно, ноутбуки очень полезны: в них удобно проводить эксперименты, обрабатывать данные, проверять выводы написанных скриптов и строить графики. Однако, после проведения экспериментов нужно сделать следующее:

  1. Реализовать инференс: это то, как наша модель будет вести себя в проде. Своего рода процесс, который принимает на вход данные, а возвращает предсказание модели.

  2. Убедиться в том, что использованный нами код для лучшей версии модели не пропадёт, как и время на выполнение нашей задачи. Здесь мы приходим к тому, что такое пайплайн. Пайплайн – это набор операций машинного обучения.

На этой ноте процесс запуска ячеек в ноутбуке превращается в полноценную разработку функциональности, которая в дальнейшем будет использована в эксплуатации.

Как мы реализовали работу с пайплайнами

В ходе работы мы пришли к выводу, что процесс обучения ML-модели (пайплайн) практически повторяет собой тот процесс, в рамках которого мы решали задачи,за исключением первого и последнего этапов.

У нас есть репозиторий, в котором мы храним наши пайплайны: на одну продовскую задачу одна директория. Эта директория, как минимум, включает в себя два скрипта: для обработки сырых данных и для обучения модели.

Скрипт для обработки данных принимает на вход путь к сырому набору данных и формирует из него выборки, включающие в себя набор признаков и целевых переменных, для обучения и тестирования модели. Скрипт обучения использует эти выборки для того, чтобы в результате получить готовую ML-модель.

Обучение моделей мы проводим с помощью конфигурационного файла — каждый скрипт предобработки и обучения принимает на вход набор аргументов: для обработки данных это может быть название файла, который мы хотим использовать, параметры для train_test_split, настройки самописных функций для обработки текста; для обучения модели — различные параметры объектов Dataset и Dataloader, нейронной сети и коллбэков (обратных вызовов).

Мы сформировали универсальную функциональность, с помощью которой запускаем все наши пайплайны. Она принимает на вход нужный конфигурационный файл с параметрами и функции для указанного нами пайплайна. Схематично это показано на изображении:

Доступ к ресурсам у нас реализован через платформу Kubeflow, поэтому мы с помощью манифеста через терминал запускали обучение всегда как один сервис в одном Pod.

Вся эта история прекрасно работала до тех пор, пока мы не столкнулись со следующими ограничениями:

  1. Мы не можем из терминала обращаться к какому-либо этапу пайплайна. На самом деле можем, но не так, как хотим. Ситуация: если у нас очень долго обучается модель, но её качество нас устраивает и мы хотим сохранить текущую контрольную точку, то просто так остановить этот процесс мы не можем. Так как весь пайплайн у нас запускается в одном Pod’е, то, соответственно, работать с этим процессом мы можем как с одной сущностью. В текущей ситуации мы вольны либо остановить весь пайплайн и настроить коллбэк модели под нашу возникшую внезапно потребность, либо дождаться, пока модель обучится до конца.

  2. При возникновении ошибки внутри какого-либо этапа пайплайна всё нужно запускать заново. Ситуация аналогична первому пункту. Что-то сломалось в конце пайплайна — исправляем и запускаем с нуля.

  3. Сложно сохранять все параметры пайплайна. Самостоятельно придумываем, как будем сохранять все параметры для всех запущенных экспериментов.

  4. Неудобно запускать новые эксперименты: параметры приходится настраивать через терминал или прямо в коде.

Как мы используем Kubeflow Pipelines

Kubeflow Pipelines — это один из компонентов платформы Kubeflow, который позволяет строить ML-пайплайны в виде графов. С помощью этого сервиса можно декомпозировать один большой ML-пайплайн на универсальные блоки, чтобы использовать их повторно.

Для разработки пайплайна Kubeflow Pipelines используется DSL (domain-specific language), логика которого реализована в библиотеке kfp. В документации Kubeflow Pipelines говорится о двух версиях DSL, однако на момент реализации нашего решения при работе с разными версиями этой библиотекой мы выбрали версию kfp 1.8.11.

С помощью kfp можно работать с пайплайном как с графом, узлами которого являются этапы пайплайна. Каждый этап можно повторно использовать, а последовательность этих этапов настраивать так, как нам хочется, с помощью условий — объектов kfp.dsl.Condition.

Структура проекта

Для работы с Kubeflow Pipelines был разработан отдельный репозиторий. Основная составляющая пайплайна, компонент, представляет собой один независимый его шаг. Например: предварительная обработка, обучение, тестирование. В нашем случае компонентами являются Python-функции, в аргументы которых мы будем передавать нужные для каждого шага параметры. Так как у нас уже были готовые скрипты, которые декомпозировали наши пайплайны, то мы их попросту вынесли в этот проект.
Структура итогового репозитория приведена ниже:

Всю питоновскую функциональность, которая у нас была, мы перенесли в директорию core/components и разбили на универсальные компоненты, base, и относящиеся к конкретным пайплайнам — в примере это task_1 и task_2. Все конфигурационные файлы пайплайнов отдельно добавляем в core/configs.

Также мы добавили словарь всех компонентов core/components.yaml:

При разработке нового пайплайна требуется добавить необходимые его этапы в core/components и зафиксировать в этом словаре. Дальше процесс автоматизируется.

Автоматизированная сборка пайплайна

Для формирования и запуска пайплайна каждый компонент преобразуется в контейнер, объект kfp.dsl.ContainerOp, у которого обязательно должен быть задан базовый образ. Он должен содержать все библиотеки и другие зависимости, которые используются в функции этого компонента. Базовые образы всех компонентов могут быть одинаковыми или различаться между собой. В нашем случае для всех компонентов базовый образ один и содержит Python-библиотеки, которые используются в пайплайне.

Существует несколько способов преобразования функции в kfp.dsl.ContainerOp, мы выбрали метод kfp.components.func_to_container_op. Важное замечание: при работе с kfp все импорты библиотек и объявленные константы должны находиться внутри питоновской функции. Это, пожалуй, единственное изменение которое потребовалось нам сделать для внедрения Kubeflow Pipelines. Пример:

Процесс сборки пайплайна:

  1. Преобразуем все нужные нам функции в kfp.dsl.ContainerOp.

  2. Пишем логику пайплайна.

1. Преобразование в kfp.dsl.ContainerOp

Схема процесса:

В components.yaml мы храним все модули, которые хотим использовать, например:

base:
  upload_data: kfp_pipelines.core.components.base. upload_data
  download_data: kfp_pipelines.core.components.base. download_data
task_1:
  preprocess_data: kfp_pipelines.core.components.task_1.preprocess_data
...

В основе функции для импорта компонентов лежит importlib.importmodule:

def import_from_config(obj_name: str, config: Dict[str, str]):
   obj_name = obj_name.lower().strip()

   if obj_name not in config.keys():
       raise ValueError(
           f"Provided object must be one of: {' '.join(list(config.keys()))}, "
           f"but {obj_name} was given."
       )

   module_name, cls_name = config[obj_name].rsplit(".", 1)

   retrieved_obj = getattr(import_module(module_name), cls_name)
   return retrieved_obj

Для каждого пайплайна мы последовательно указываем название нужных нам компонент и для их импорта используем скрипт выше.

Преобразуем в объект kfp.dsl.ContainerOp с помощью метода kfp.components.func_to_container_op:

with open(Path(__file__).parent / "components.yaml", "rt") as config:
   _CONFIG = yaml.safe_load(config.read())


def init_ops(steps: list, project: str = "task_1"):
   _possible_steps = {**_CONFIG[project], **_CONFIG["base"]}
   ops = {}
   for step in steps:
       ops[step] = func_to_container_op(
           import_from_config(step, _possible_steps),
           base_image={ПУТЬ К БАЗОВОМУ ОБРАЗУ},
       )
   return ops

На выходе из этой функции мы получаем словарь компонентов пайплайна, к которым будем обращаться по заданным нами ключам.

2. Строим пайплайн

Упрощённая схема пайплайна:

Мы до сих пор оперируем нашими старыми методами, но всю остальную логику будет реализовывать функциональность kfp. Рассмотрим псевдокод, реализующий эту схему для task_1:

from kfp.dsl import Condition, pipeline

from kfp_pipelines.core import init_ops, run_pipeline

# Pipeline metadata
_PROJECT_NAME = "task_1"
_RUN_NAME = f"{_PROJECT_NAME}_run"

# Pipeline steps
_LOAD_DATA = "download"
_FILE_EXISTENCE = "check_file_existence"
_PREPROCESS_DATA_STEP = "preprocess_data"
_TRAIN_STEP = "train"
_UPLOAD_DATA = "upload"


def main() -> None:
    # Initiating `ContainerOp` objects
    ops = init_ops(
        _PROJECT_NAME,
        [
            _LOAD_DATA,
            _FILE_EXISTENCE,
            _PREPROCESS_DATA_STEP,
            _TRAIN_STEP,
            _UPLOAD_DATA,
        ],
    )

    # All the arguments of the whole pipeline should be specified
    @pipeline(
        name=_PROJECT_NAME,
        description=f"`{_PROJECT_NAME}` data preprocessing and model training",
    )
    def task_1_pipeline(
            data_param,
            training_param,
    ) -> None:
        _load_data = ops[_LOAD_DATA](
            data_param=data_param
        )

        _data_existence = ops[_FILE_EXISTENCE](
            paths=[
                _load_data.outputs["train_data_path"],
                _load_data.outputs["val_data_path"],
            ]
        )
        # if training and validation data do not exist, run preprocessing, else skip
        # this step
        with Condition(
                _data_existence.output == 0,
                _PREPROCESS_DATA_STEP,
        ):
            _preprocess_data = ops[_PREPROCESS_DATA_STEP](
                data_param=data_param
            ).after(_load_data)

        _train = ops[_TRAIN_STEP](
            train_data_path=_load_data.outputs["train_data_path"],
            val_data_path=_load_data.outputs["val_data_path"],
            training_param=training_param
        ).after(_preprocess_data)

        _upload = ops[_UPLOAD_DATA](
            output_path=_train.outputs["output_path"],
            save_param=_train.outputs["to_save"],
        ).after(_train)

    run_pipeline(
        project_name=_PROJECT_NAME,
        run_name=_RUN_NAME,
        func=task_1_pipeline,
    )


if __name__ == "__main__":
    main()

Мы формируем функцию main, в которой будем инициализировать словарь компонентов ops. Далее выстраиваем логику самого пайплайна. Для этого мы пишем функцию и используем для неё декоратор kfp.dsl.pipeline. Важное замечание: функция пайплайна должна в качестве параметров явно содержать всё то, что мы будем использовать, а функции внутри пайплайна должны содержать явно указанные аргументы. DSL kfp не поддерживает *args и **kwargs, поэтому все параметры компонентов в пайплайне нужно задавать явно.

Внутри пайплайна мы можем передавать параметры компонента как из конфига, так и из других компонентов. Чтобы один компонент принимал на вход значения из другого, выход первого должен быть представлен в виде объекта NamedTuple. Пример:

from typing import NamedTuple


def download(
   data_param
) -> NamedTuple(
   "Outputs",
   train_data_path=str,
   val_data_path=str,
):
   from collections import namedtuple
   ...
   train_data_path = "sample_path"
   val_data_path = "sample_path"
  
   output = namedtuple(
       "Outputs",
       [
           "train_data_path",
           "val_data_path",
       ],
   )
   return output(train_data_path, val_data_path)

Важно:

  • В теле функции-компонента должны быть указаны все импорты. Это значит, что импортируемые библиотеки должны быть установлены в образе, указанном при формировании контейнера из этой функции-компонента.

  • Объекты kfp не умеют обрабатывать русский язык: docstring и комментарии нужно писать на английском.

  • DSL kfp не поддерживает тип None, булевы значения (True/False), тип typing.Dict (обычный dict поддерживается), типы любых нестандартных объектов.

Касаемо последнего пункта, если у вас есть функция:

def train(
    sample_obj: torch.utils.data.DataLoader,
    sample_bool: bool = False,
    sample_dict: Dict[str, Any],
    sample_none = None,
    ...

то kfp не сможет её обработать из-за типов. Пример, как можно избежать ошибки:

def train(
    sample_obj,  # не указывать тип
    sample_bool: bool = 0,  # заменить True/False на 1/0
    sample_dict: dict,  # не конкретизировать дикт typing'ом
    sample_none = "",   # вместо None вставлять пустую строку или иначе обрабатывать это значение
    ...

Как это выглядит

Интерфейс экспериментов показан ниже.

У одного эксперимента может быть несколько запусков. Индикатор статуса показывает, завершился ли эксперимент успешно либо упал с ошибкой. На картинке показан эксперимент vko_name_predictor, у которого было три запуска, два из которых успешно завершились. Причины, по которым запуск может провалиться в основном связаны Python-кодом, а именно логикой, реализованной внутри компонент пайплайнов.

Можем нажать на один из завершенных пайплайнов и посмотреть о нем детальную информацию.

Каждый запуск имеет графическое отображение выполнения всех этапов в виде графа: по ходу выполнения этапов пайплайна пользователь может наблюдать, какой из них сейчас выполняется. Успешно завершенные этапы пайплайна обозначены зелеными индикаторами. На картинке наш пайплайн состоит из следующих этапов:

  1. create s3 paths – создаем пути на нужные нам данные;

  2. preprocess data – обрабатываем данные;

  3. train – обучаем модель;

  4. upload inf config – формируем конфигурационный файл инференса и сохраняем его;

  5. run inference – тестируем модель на инференсе.

Стрелки показывают, откуда идут входные данные в этапы пайплайна. Например, все три этапа «preprocess data», «train» и «run inference» получают на вход путь к данным, с которыми предстоит работать, из этапа «create s3 paths». Графическое отображение стрелок можно настроить с помощью кнопки «Simplify Graph», тогда в случае упрощенного графа стрелки будут показывать последовательность этапов, а не потоков данных.

Нажав в интерфейсе на любой из этапов, можно посмотреть какие у него параметры, что он ожидает на вход и должен отдавать на выход. Пример показан ниже.

Для нового запуска эксперимента достаточно нажать «Clone Run» в интерфейсе и ввести в интерфейсе новые параметры, с которыми мы хотим работать. Форма, которая будет представлена, повторяет конфигурационный файл пайплайна. Все значения можно будет настроить через интерфейс.

Итог

Таким образом, с помощью библиотеки kfp и нескольких функций мы можем интегрировать существующий код для запуска ML-пайплайнов в инструмент Kubeflow Pipelines с удобным интерфейсом, в котором пользователь, он же разработчик, может быстро запускать эксперименты по машинному обучению и выбирать лучшее решение.

Комментарии (0)