Вы не задумываетесь над процессами, а фокусируетесь на решении задач в сжатые сроки, только когда вы аналитик в стартапе или маленькой команде. Но после первых успехов хочется оглянуться назад и наладить процессы, почистить библиотеку артефактов и подтянуть качество. Особенно когда команда стремительно растет. Непрозрачная структура тяжело поддается управлению и не позволяет быстро обучать сотрудников.

Меня зовут Елдос, я Big Data аналитик, и сегодня я расскажу о том, как команда Fintech из службы Big Data аналитики и машинного обучения Beeline Казахстан создала среду для совместной работы, связала используемые инструменты одним ключом, обеспечила централизованное хранение кода в Git и подсчетов в HDFS, и тем самым обеспечила воспроизводимость экспериментов.

Зачем нам это 

Предположим, в команде из пяти аналитиков каждый пишет код, делает анализ и строит модели обособленно. Остальные члены команды не имеют доступа к этой работе, поэтому не проверяют качество результата и не воспроизводят эксперимент. Если вы захотите оптимизировать витрину или модель вашего коллеги, то потратите больше времени на ознакомление и передачу кода, чем на саму задачу. Это особенно сложно, если нужная модель не попала в продуктив или сотрудник в отпуске или уволился.

Что такое воспроизводимость экспериментов?

В разработке программного обеспечения и анализе больших данных эксперимент проводится, чтобы подтвердить или опровергнуть гипотезы. В него входит исходный код, Jupyter Notebooks (Jupyter-блокноты или ноутбуки), таблицы, выгрузки и другие артефакты. 

Воспроизводимость эксперимента – это его способность воспроизводить одинаковые или близкие результаты при повторном проведении эксперимента в аналогичных условиях. 

Чтобы обеспечить воспроизводимость экспериментов/гипотезы, наши команды Big Data аналитики и машинного обучения следуют трем правилам:

  • код, анализ, данные и артефакты эксперимента хранятся централизованно;

  • код/Jupyter-блокноты последовательны и воспроизводимы;

  • завершенные эксперименты не изменяются. Для переиспользования данных обращаемся к ним из другого эксперимента. 

К централизации экспериментов нас привела необходимость в коллаборативной среде, где команда работает совместно и ежедневно обменивается знаниями и результатами. 

Для этого при проверке гипотезы создается задача в Jira, которая агрегирует проделанную работу и связывает все компоненты: исходный код, анализ которого находится в Git-репозитории, а входные данные и результаты вычислений – в распределенной файловой системе HDFS.

Централизация исходного кода

Централизация данных в GitLab
Централизация данных в GitLab

Каждая команда аналитиков разных проектов работает в своем репозитории Git со структурой каталогов, которая соответствует основным направлениям и их подпроектам.

Стабильный протестированный анализ/код эксперимента хранится в основной master-ветке. 

Пример использование VCS для экспериментов
Пример использование VCS для экспериментов

Поэтому всегда в начале работы над экспериментом делаем копирование изменений Pull Request из master-ветки в локальный master, и из него уже создаем новую ветку эксперимента. Название ветки эксперимента соответствует номеру задачи в Jira и написано в стиле kebab-case, что автоматически связывает ветку и задачу в Jira. Затем в папке соответствующего проекта создаем каталог эксперимента с названием ветки. 

Составное название ветки эксперимента на примере задачи fin-295
Составное название ветки эксперимента на примере задачи fin-295
Пример структуры папок в Git-репозитории
Пример структуры папок в Git-репозитории

После завершения проверки мини-гипотез заливаем изменения с помощью commit в локальную ветку эксперимента, а в конце рабочего дня отправляем все коммиты в удаленный репозиторий командой “push”.

После того как задача завершена и протестирована локально, запрашиваем Merge Request, в котором настроены правила для проверки качества кода (code review) другими участниками команды. Изменения попадают в master-ветку только после проверки и одобрения кода двумя проверяющими. 

Merge Request должны быть понятными и развернутыми, важно подсветить следующее:

  1. Какая была цель? 

  2. Какая гипотеза? 

  3. Что было протестировано / сделано? 

  4. Какие результаты и метрики качества? 

  5. Какие сложности были и какие есть возможные доработки? 

  6. На что стоит обратить внимание?

Работа с Jupyter Notebook

Ноутбуки эксперимента вместе с историей изменений также хранятся в Git-репозитории и доступны в html-формате для просмотра и изучения аналитики проекта.

Наша команда использует следующие рекомендации при работе с Jupyter-блокнотами: 

  • Блокноты эксперимента пронумерованы, и, если запускать их один за другим, результат выполнения будет соответствовать исходному запуску полного эксперимента. 

  • Такое же требование применяется к ячейкам кода ноутбука. Если запустить все ячейки последовательно, результат будет тем же, что и при запуске всего блокнота. 

  • Помимо воспроизводимости результатов, код сопровождается описательными ячейками Markdown, чтобы коллеги быстрее ориентировались в блокноте. 

Применяя эти правила как часть совместной работы, мы обеспечиваем воспроизводимость экспериментов.

Централизация входных данных и результатов

Централизация данных в HDFS
Централизация данных в HDFS

Для хранения и управления витринами данных создаем отдельную папку в командном каталоге системы хранения данных HDFS. Название папки содержит номер задачи (Jira). 

Пример задачи fin-222 и списка входящих в нее витрин данных
Пример задачи fin-222 и списка входящих в нее витрин данных

Для простоты управления мы связали все компоненты эксперимента (код, данные, задача) единым ключом (номера задачи в Jira). В роли коннектора между HDFS и Python-кодом выступает самописный класс Task, который создан для облегчения работы с данными.

Как связать все компоненты эксперимента единым ключом

Все, что нужно для такой связи, – инициализировать класс Task с основным атрибутом (номером задачи в Jira) и создать экземпляр этого класса. Все дальнейшие действия производятся через созданный экземпляр – с необходимыми для комфортной работы методами:

  • ls

  • write

  • read

  • get_path

Метод “ls” универсальный. С ним можно работать и со стандартным HDFS dfs -ls, или, к примеру, вывести список разделов той или иной директории/таблицы.

Примеры использования метода “ls”
Примеры использования метода “ls”

Метод “write” записывает просчеты в виде датамартов и имеет следующие аргументы:

  • df – объект датафрейм (Spark DataFrame);

  • name – название сущности/датамарта;

  • Partition By – поле, по которому ведется партиционирование набора данных;

  • repartition – перераспределение данных;

  • mode – ‘overwrite’ или ‘append’;

  • CSV – по умолчанию ‘False’, но можно включить запись CSV-файлов.

Использование метода “write” на примере просчета витрины c названием suspended_inactive_abons
Использование метода “write” на примере просчета витрины c названием suspended_inactive_abons

Метод “read” предназначен для считывания сущности/датамарта в папке эксперимента и имеет следующие аргументы:

  • instance – название датамарта в Task или неполный путь до референса в формате 'fin-000/name'.

  • date – дата чтения. Если даты нет, датамарт считается целиком (все партиции).

  • fmt – маска даты. Принимает формат Python-масок. Если его нет, то берутся 5 рандомных партиций из источника и автоматически идет подбор формата дат.

  • key – ключ партиционирования. Если его нет, то берутся 5 рандомных партиций из источника и автоматически идет подбор ключа партиционирования.

  • dates – даты чтения диапазона. Если какие-то даты из диапазона не были найдены, то прочитается только то, что есть. И отобразится warning о том, что какие-то партиции не загрузились. Список незагрузившихся дат будет в warning.

  • CSV – индикатор считывания CSV.

Ниже приведены примеры использования метода “read”:

>>>fin_000 = Task("fin-000", spark)
Чтение всего пути
>>>fin_000.read("geo_features")
#'/fin_tech/tasks/fin_000/geo_features'
 
Чтение определенного формата во всем пути
>>>fin_000.read("geo_features", "*", "*.parq")
#'/fin_tech/tasks/fin_000/geo_features/*/*.parq/'
 
Чтение определённой патриции
>>>fin_000.read("geo_features", TIME_KEY='2020-01-01')
#'/fin_tech/tasks/fin_000/geo_features/TIME_KEY=2020-01-01'
 
Чтение определённой даты с заданным типом даты
```
fin_000.read(
   "geo_features",
   date = datetime(2020, 1, 1),
   fmt = "%Y-%m"
)
```
#'/fin_tech/tasks/fin_000/geo_features/2020-01'
 
Чтение определённой даты с заданным типом даты и партиции
>>>fin_000.read("geo_features", date=datetime(2020, 1, 1), fmt="%Y%m", key="MONTH_KEY")
#'/fin_tech/tasks/fin_000/geo_features/MONTH_KEY=202001'
 
Чтение списка дат
>>>fin_000.read("geo_features", dates=pd.date_range("2020-01-01", "2020-09-01", freq="MS"))
#'/fin_tech/tasks/fin_000/geo_features/{01-2020,02-2020,03-2020,04-2020,05-2020,06-2020,07-2020,08-2020,09-2020}'
 
Чтение списка дат с определённым типом дат
>>>fin_000.read("geo_features", dates=pd.date_range("2020-01-01", "2020-10-01", freq="MS"), fmt="%m-%Y"))
#'/fin_tech/tasks/fin_000/geo_features/{01-2020,02-2020,03-2020,04-2020,05-2020,06-2020,07-2020,08-2020,09-2020,10-2020}'

Как переиспользовать данные

В ситуациях, когда проверяем несколько гипотез на одних и тех же данных, удобнее переиспользовать витрины данных или модели завершенных экспериментов без их пересчета. Мы создаем референс на результаты закрытых задач, чтобы не забивать HDFS дубликатами.

Предположим, fin-000 – это старый эксперимент. В его HDFS-каталоге хранится несколько таблиц, включая suspended_inactive_abons, которая содержит 4 партиции. В новом эксперименте fin-999 мы переиспользуем партиции таблицы, но при этом для завершения эксперимента не хватает разделов partition_5 и partition_6.

Постановка задачи
Постановка задачи

По нашим правилам, нельзя дописывать новые партиции в закрытую задачу fin-000, чтобы не нарушить ее воспроизводимость. Поэтому в папке задачи fin-999 создается каталог fin-000 с пустой подпапкой suspended_inactive_abons, а затем в рамках нового эксперимента запускается код fin-000 для создания новых разделов partition_5 и partition_6. 

Пример переиспользования результатов fin-000 в задаче fin-999
Пример переиспользования результатов fin-000 в задаче fin-999

Для примера мы допустили ошибку и снова просчитали уже существующий partition_4. При считывании в рамках коннектора fin-999 используется метод “read” со ссылкой на старую задачу fin-000. 

Пример дублирования партиций
Пример дублирования партиций

Ридер считывает все партиции из старой и новой задач, а затем объединяет (union) результаты считывания. Дубликат не принимается во внимание при считывании. Необходимо вручную удалить раздел из нового эксперимента.

Так мы автоматизировали коннектор между завершенными и новыми экспериментами, а также создание необходимых каталогов и подкаталогов для переиспользования данных.

Ближайшие планы

Первая задача, которая стоит перед командой, это разработка пайплайнов,  которые будут работать с пайплайнами DVC, что позволит:

  • автоматизировать и упорядочить процессы предварительной обработки данных;

  • сохранить время и ментальное здоровье аналитика;

  • обеспечить совпадение просчетов в продуктивной и рабочей средах за счет сериализации в среде разработки и десериализации в продуктивной среде;

  • сократить появление ошибок при переносе модели и исключить дополнительную поддержку кода в продуктивной среде.

Следующая задача – реализация автоматических smoke-тестов в последнем стейдже пайплайна в CI/CD. Проверка будет состоять в сравнении скоров модели, полученных при прогоне модели в локальных и продуктовых средах на одной и той же валидационной выборке. Близкие к идентичным результаты подтверждают, что модель, прошедшая дополнительную обработку при установке в продуктив, идентична тестовой модели. Примером обработок служит замена нулов, замена функций агрегаций, конвертация из PySpark датафрейма в Pandas и наоборот, в процессе которой модель может «поехать» из-за применения специфических методов и функций каждого инструмента. К примеру, типы данных PySpark могут не совпадать с типами данных Pandas или LGBM.

Воспроизводимость пайплайна, который состоит из исходного кода, обеспечивает инструмент DVC. Он отлично справляется с версионированием пайплайнов и данных, позволяет запускать код в правильном порядке, логировать изменения в данных/коде. И, благодаря механизму хэширования и логирования мета-данных, он дает нам полную уверенность в том, что эксперимент полностью воспроизводим. При повторном прогоне пайплайна без изменений, DVC сигнализирует, что никаких правок в коде и в данных не было и что надобности запускать пайплайн снова нет. Остается добавить этот тест-кейс в CI/CD. 

В наши цели также входит автоматическая проверка блокнотов на воспроизводимость в CI/CD. Мы уже рассказали о правиле последовательности блокнотов при последовательном запуске ячеек одного блокнота и запуске нескольких блокнотов. Но мы пойдем дальше и сделаем надстройку над CI/CD, которая будет несколько раз последовательно запускать ноутбуки в некотором обособленном окружении, а затем сравнивать созданные логи, CSV-файлы и другие артефакты. Совпадение артефактов будет свидетельствовать о воспроизводимости блокнотов.

Будет отлично, если в комментариях вы расскажете, как выстроен workflow в вашей компании. А еще круче – поделитесь своими лучшими практиками того, как вы достигаете полной воспроизводимости экспериментов.

Комментарии (0)