Привет, Хабр! Это тимлид DS группы ранжирования и поиска Дана Злочевская и тимлид группы разработки Михаил Нестеров из Lamoda Tech. 

Как и у любой крупной e-commerce платформы, данные — наш главный актив. Они помогают бизнесу принимать обоснованные решения, а пользователям — получать персонализированный, качественный опыт во всех продуктах Lamoda.

Поэтому в продакшене ежедневно работают десятки ML-пайплайнов, а в Airflow запускаются сотни DAG-воркфлоу. Данные готовят и используют более 100 специалистов из самых разных команд: аналитики, дата-сайентисты, ML-инженеры, маркетологи — у каждой свои задачи и логика работы с ними. 

Однако с ростом команд, задач и инфраструктуры мы начали сталкиваться с рядом системных проблем:

  • Разрозненные подходы к подготовке данных. Каждая команда собирала данные «под себя», по своим правилам и в своем формате, что приводило к дублированию информации и нерациональному использованию вычислительных ресурсов.

  • Дублирование логики. Одни и те же преобразования выполнялись в разных пайплайнах с минимальными отличиями — это не только неэффективно, но и увеличивает риск ошибок.

  • Сложности с переиспользованием. Найти нужные данные, понять, как они были получены, и интегрировать их свой пайплайн — становилось нетривиальной задачей.

  • Рост time-to-market. На каждый новый ML-продукт или эксперимент у команд уходило всё больше времени просто на «разогрев»: сбор данных, выравнивание форматов, отладка пайплайна.

Тогда мы поняли, что пора систематизировать наш подход к хранению и работе с датасетами, и реализовали собственный фреймворк на основе Apache Spark — Feature Storage, который сейчас является стандартом в компании. А позже мы выделили отдельное решение для специфичных кликстрим-данных — Action Storage.

В этой статье мы хотим поделиться нашим опытом построения этих инструментов и рассказать, как со временем эволюционировал наш подход к хранению данных в Lamoda Tech. Надеемся, он будет вам полезен и подарит парочку интересных идей.

С какими данными мы работаем

В наших хранилищах более 3,5 Пб данных — это действительно много, и чтобы обеспечить их обновление, ежедневно запускаются сотни ETL-процессов.

Основной источник информации для аналитики и машинного обучения — это кликстрим. Он представляет собой топик в Kafka, который содержит необходимую информацию о действиях клиентов на сайте и в мобильном приложении — какие страницы и товары они просматривают, куда переходят, что добавляют в корзину или в избранное. Эти данные — основа персонализации и поведенческого анализа.

Кликстрим мы загружаем в HDFS с помощью Spark Streaming, после чего — без изменения смысла, но с преобразованием сложной вложенной JSON-структуры — сохраняем в Hive-таблицу.

Второй важный источник — базы данных бэкенда. Там хранится самая актуальная информация по текущему стоку и его доступности, данные по заказам, возвратам, а также по пользователям: какая у них скидка лояльности на данный момент, с какого устройства они обычно сидят, где находятся.

На основе этих данных мы решаем сразу несколько задач:

  • Рассчитываем метрики для дашбордов, которые помогают принимать операционные и продуктовые решения. После расчёта метрики доставляются в Power BI и Grafana.

  • Готовим датасеты для обучения и инференса ML-моделей, чаще всего на основе CatBoost и PyTorch.

  • Обновляем фичи для ML-моделей с онлайн-инференсом, чтобы они работали на самых свежих данных. Обновлённые фичи попадают в наши key-value хранилища — Aerospike и Redis.

Все эти процессы запускаются регулярно — один или несколько раз в день. Для оркестрации мы используем Airflow, а основная обработка идёт на PySpark.

Казалось бы, всё просто: берём данные, трансформируем — и передаем дальше в модель или визуализацию. Но по мере роста объёма данных, количества пользователей и требований к качеству — поддержка этой инфраструктуры становится всё интереснее и сложнее.

Эволюция подхода к работе с данными для батч-моделей
Эволюция подхода к работе с данными для батч-моделей

Больше данных — больше проблем 

Изначально мы не слишком задумывались о стандартизации: данные сохранялись «как получится» — в произвольных форматах и местах. Но со временем объёмы выросли, и пользоваться такими данными стало сложно.

Чтобы переиспользовать ранее подготовленные датасеты, нужно было разбираться в чужом коде или просто дублировать логику в своих пайплайнах. Это занимало много времени и вело к накоплению технического долга.

Работа с DAG в Airflow тоже превратилась в головоломку. Каждый создавал свои пайплайны со своей структурой. Чтобы понять, что лежит в финальном датасете, нужно было прочитать и вникнуть в весь DAG — а внутри часто скрывалась сложная, многоступенчатая логика, которую не так просто было «раскурить» в короткий срок. Вдобавок данные сохранялись в разных местах и форматах, поэтому систематизировать уже готовые наборы было почти невозможно.

Всё это тормозило разработку ML-продуктов: time-to-market увеличивался, команды тратили всё больше времени не на новые фичи, а на понимание уже сделанного.

Тогда мы сформулировали, какие требования должна выполнять новая архитектура хранения и обработки данных:

  • Прозрачность логики. Каждый этап обработки должен быть выделен явно, а не скрыт внутри сложного пайплайна. Модели должны выполняться отдельно от расчёта фичей — изоляция помогает контролировать качество и упростить отладку.

  • Единый формат хранения. Мы хотели разработать универсальный формат, подходящий для разных типов фичей, но при этом гибкий и понятный. Добавление, удаление и версионирование фич должно быть прозрачным и одинаково реализованным для всех.

  • Воспроизводимость. В любой момент должна быть возможность восстановить значения фичей за нужную дату или период. Это критично для отладки моделей и анализа результатов в ретроспективе.

Feature Storage: фреймворк для работы с данными

Так появился Feature Storage — это наш новый подход к тому, как мы «варим» данные для моделей и дашбордов. Фреймворк состоит из двух ключевых компонентов:

  1. Единый формат хранения датасетов и удобный интерфейс работы с ними.

  2. Унифицированная логика построения пайплайнов.

Единый формат хранения датасетов

Мы разработали универсальный способ хранения обработанных датасетов, который назвали Feature Set. Каждый Feature Set  — это формализованный набор данных, к которому обязательно указываются:

  • Ключ — набор колонок, гарантирующий уникальность строк,

  • Версия — позволяет хранить разные варианты одного Feature Set.

Чтобы упростить работу с Feature Set в HDFS, мы написали собственный пакет на python с использованием PySpark, который и дал имя всему фреймворку — Feature Storage.

Теперь, чтобы сохранить все наборы данных, достаточно вызвать одну функцию, передав в неё DataFrame и указав нужные параметры: ключи, версию, и при необходимости — дополнительные атрибуты (например, настройки компрессии). Всё работает прозрачно, унифицировано и одинаково во всех проектах.

Теперь посмотрим, как это работает на практике.

Допустим, нам нужно рассчитать промежуточные данные — контентные фичи по товарам. Нас интересует всё, что можно сказать о товаре на момент обработки: бренд, категория, вендор, новизна, наличие скидки и так далее.

Для этого мы пишем DAG, который формирует датафрейм, где для каждого товара (SKU) в каждой стране указываются его атрибуты.

Чтобы сохранить этот датафрейм как Feature Set, нам нужно указать несколько параметров:

  • Название Feature Set.

  • Ключ. Это набор колонок, по которым гарантируется уникальность строк. В него обязательно должен входить timestamp — дата расчёта. В нашем случае это sku_id, country, timestamp. Логично: у одного и того же товара в одном регионе не может быть двух разных наборов атрибутов в один момент времени.

  • Версия. Например, v2, потому что раньше у нас уже была версия v1, где колонка gender вычислялась по-другому. Теперь логика изменилась, и мы выкатываем новую версию, не затрагивая старую.

  • Партиционирование. Feature Storage явно фиксирует, как датафрейм будет партицироваться в HDFS — это всегда timestamp и версия. Благодаря этому можно независимо работать с несколькими версиями одного и того же Feature Set.

Новая логика построения пайплайнов

Помимо унифицированного формата хранения, мы полностью пересмотрели подход к построению пайплайнов обработки данных. Теперь у нас есть чёткие правила, которым мы следуем:

  • Каждый Feature Set рассчитывается в отдельном DAG. Это значит, что каждый этап обработки данных стал изолированным. Теперь гораздо проще понять, какой результат получается на каждом шаге — не нужно копаться в огромных монолитных пайплайнах.

  • Версионирование данных. У каждого Feature Set может быть несколько версий, которые можно независимо читать, обновлять и удалять. Это позволяет безопасно внедрять изменения и откатываться при необходимости.

  • Общий принцип партиционирования. Мы всегда партиционируем данные по версии, ключу и времени расчёта. Это делает работу с любым Feature Set понятной и предсказуемой: даже если его написал кто-то другой, мы сразу видим, как он устроен, и как с ним работать эффективно.

Если представить это визуально, наши пайплайны теперь — это цепочка, в которой каждый из DAG создаёт логически обособленный слой данных. Каждый DAG на выходе формирует один Feature Set. Он может использовать один или несколько наборов данных в качестве входных, а разные версии одного Feature Set могут считаться параллельно. Например, пока идёт A/B-тест разных версий фичей, мы поддерживаем обе, а после — оставляем только лучшую.

Этот подход оказался очень удобным. Специалисты перестали «изобретать велосипед» и дублировать фичи — теперь можно взять уже готовый Feature Set и использовать в своём проекте. Так у нас сформировался довольно объёмный каталог готовых популярных наборов данных: фичи по товарам, пользователям, взаимодействиям, атрибуциям событий. Всё это — результат переиспользуемых пайплайнов, с которыми работают аналитики и дата-сайентисты.

Ещё один большой плюс — стандартизированный способ записи данных. Когда объёмы выросли, стало понятно, что некоторые фичи занимают слишком много места. Раньше это означало бы, что нужно разбираться в каждом DAG вручную, лезть в HDFS, проверять, как устроены данные, и настраивать сжатие. А с Feature Storage всё оказалось проще: мы просто обновили версию пакета, где задали параметры компрессии, и получили значительное сокращение дискового пространства — без изменений в логике пайплайнов.

Action Storage: как мы упростили работу с кликстримом

Внедрение Feature Storage действительно сильно ускорило и упростило нашу работу с данными. Но оставался «слон в комнате» — кликстрим.

Напомним: для нас кликстрим — это огромная (больше 50 ТБ!) Hive-таблица с почти сырыми json-атрибутами. Работать с ней не так просто — и вот почему:

  • Нет изолированного доступа к событиям. События кликстрима активно используются как в моделях, так и в аналитике. При этом важно уметь работать с разными типами событий — просмотрами, кликами, добавлениями в корзину и другими — изолированно, поскольку у каждой команды и задачи свои потребности.

  • Разнородные атрибуты. Каждое событие несёт свой набор атрибутов и требует отдельной обработки.

  • Постоянные изменения. Структура событий меняется довольно часто — нам важно поддерживать стабильную работу и плавно переходить на новые версии.

  • Слабый уровень мониторинга и контроля. Необходимо быстро реагировать на изменения в потоке данных и отслеживать возможные проблемы — иначе некорректные данные могут попасть в модели и дашборды, что критично для бизнеса.

Поэтому мы поняли, что хотим сделать новый инструмент для работы с событиями кликстрима. Так появился Action Storage.

Требования к нему были довольно простыми, но критичными:

  1. Возможность изолированно работать с каждым типом события,

  2. Валидация данных,

  3. Переиспользование атрибутов событий,

  4. Единый интерфейс работы со всеми событиями,

  5. Возможность хранить и легко читать события за несколько лет,

  6. Простая и понятная кодовая база, чтобы любой коллега мог легко поддерживать её и добавлять новые события.

Посмотрим на то, как устроен Action Storage. По сути, он состоит из двух компонентов:

  • нового слоя данных для событий кликстрима (набор Action-таблиц),

  • фреймворка для удобной работы с этими таблицами.

Архитектурно Action Storage находится между «сырым» кликстримом в HDFS и Feature Storage. При этом он не использует дополнительные источники вроде DWH — это облегчает его работу и даёт максимум контроля над качеством и структурой именно событийных данных.

В результате мы получили набор Action-таблиц, которые далее могут напрямую использоваться в Feature Storage и в других пайплайнах.

В основе Action Storage лежит приложение на PySpark, которое регулярно запускается в Airflow. Фактически это стандартный ETL-процесс, который:

  • читает данные из кликстрима,

  • обрабатывает их (приводя к нужной структуре и формату),

  • записывает в готовые Action-таблицы,

  • проверяет качество данных (валидирует структуру и атрибуты).

Так как для разных задач может требоваться поддержка различных версий событий, и каждая версия должна считаться изолированно, мы внедрили простую схему версионирования как самих Action-таблиц, так и этапов их обработки. Версия указывается прямо в имени объекта (в виде номера версии в суффиксе имени) — это позволяет легко управлять разными версиями и обеспечивать совместную работу старых и новых пайплайнов.

Давайте посмотрим, как это работает на практике — на примере создания события клика в каталоге.

Создание action-таблицы для события клика в каталоге
Создание action-таблицы для события клика в каталоге

Чтение данных

Здесь всё просто: кликстрим у нас уже сохраняется в HDFS. Раз в день мы читаем суточную порцию данных. При необходимости кэшируем прочитанное, чтобы в рамках одного процесса рассчитать несколько разных Action-таблиц. После чтения данных переходим к их обработке.

Обработка

Для каждого события создаем отдельный класс-обработчик. Помимо того, что сам класс события версионирован, мы также вводим версию для каждого обработчика атрибутов. Основной метод класса логически разбит на отдельные подэтапы: 

  1. Предобработка. Мы фильтруем нужные для вычисления конкретного события данные и приводим их к более удобной структуре. Например, для события клика в каталоге нам потребовалось привести вложенные json-структуры в плоский формат.

  2. Вычисление атрибутов. Далее мы формируем атрибуты события в удобном для использования виде. Для этого применяем набор версионируемых обработчиков атрибутов — каждый тип атрибута обрабатывается отдельно. Версионирование обработчиков даёт возможность менять логику вычисления атрибута при создании новой версии события, не ломая старые пайплайны.

  3. Постобработка. На последнем этапе мы выполняем дополнительную фильтрацию и преобразования. В случае кликов в каталоге нам, например, понадобилось добавить дедупликацию. Несмотря на то, что у нас уже есть базовая техническая очистка кликстрима от явно «плохих» или тестовых данных, она не защищает от логических дублей. Поэтому на этапе постобработки мы реализовали полноценную дедупликацию.

А теперь посмотрим на примере, как будет выглядеть код класса нашего события — ClickCatalogV1

Два из четырех атрибутов — платформа (platform) и артикул товара (sku), будут парситься своими функциями-обработчиками PlatformParserV1 и ExplodedSkuParserV1. При этом, если в другом событии потребуется аналогично парсить эти атрибуты, мы можем легко переиспользовать уже написанные обработчики. Сама обработка события происходит в методе parse, которые включает в себя предобработку (метод pre_transform), вычисление атрибутов (метод column_parse) и постобработку (метод post_transform).

Ниже мы показываем пример описания класса-обработчика событий кликов по товарам в каталоге.

Пример описания класса-обработчика событий кликов по товарам в каталоге
Пример описания класса-обработчика событий кликов по товарам в каталоге

Важно подчеркнуть: механизм вычисления одних и тех же атрибутов в разных событиях часто совпадает. Атрибуты вроде даты, платформы, артикула товара и другие парсятся одинаково во всех событиях. Одна из ключевых особенностей нашего решения — возможность накапливать парсеры атрибутов разных версий, а затем быстро собирать новые Action-таблицы путем их переиспользования. По сути, мы строим каталог переиспользуемых парсеров — очень похоже на концепцию «маркетплейса атрибутов».

Запись

На этапе записи мы управляем количеством партиций и для экономии дискового пространства сортируем и сжимаем данные. Запись происходит в версионированные таблицы, где версия является частью имени таблицы.

Например, изначально мы выделили 8 ключевых пользовательских событий и для них создали таблицы версии v1. Затем, после рефакторинга кода и оптимизаций, подготовили новые таблицы версии v2. Теперь наши пользователи могут в своем темпе переезжать на новые версии, и рано или поздно мы отключим расчет версий v1, оставив данные для сохранения истории.

Проверка

Контроль качества данных мы реализовали с помощью библиотеки PyDeequ.

Конфигурацию проверок мы описываем прямо в настройках пайплайнов в Airflow. Ранее мы создали генератор DAG на основе HOCON-файлов, поэтому конфигурации проверок также удобно передавать в таком формате.

Пример проверки качества данных:

  • слева — список выполняемых проверок,

  • справа — описание проверяемых данных.

Результаты мы сохраняем в отдельной Hive-таблице с подробной информацией о статусе каждой проверки. На эти данные мы навешиваем оповещения и строим визуализации, чтобы быстро замечать проблемы.

Планы на будущее

Поделимся также нашими планами по развитию Action Storage — возможно, они пригодятся тем, кто пойдет по нашему пути:

  1. Развитие проверки качества данных. Кликстрим — динамическая структура, данные часто меняются, что приводит к регулярным ошибкам. Хотим стремиться к realtime-проверке качества данных.

  2. Перевод оставшихся загрузок на Action-таблицы. Уже перевели часть пайплайнов, что позволило сильно оптимизировать вычисления. Планируем полностью перейти на Action Storage там, где это ускорит работу.

  3. Создание каталога данных. С ростом количества атрибутов и взаимосвязей между объектами становится сложно ориентироваться в данных. Сейчас мы активно внедряем OpenMetadata.

  4. Создание админки событий, которая предоставляет аналитикам удобный инструмент для заведения и контроля событий

Вместо заключения

Путь к созданию Feature Storage и Action Storage начался с накопившихся рабочих сложностей: дублирование логики, трудности с переиспользованием данных и растущий time-to-market для ML-задач. 

Сейчас Feature Storage является стандартом внутри Lamoda Tech. Он позволяет нам быстрее запускать модели, легче делиться фичами между командами и лучше управлять качеством данных. Action Storage, в свою очередь, решает схожие задачи для событийных данных, требующих особого внимания из-за специфики их структуры и объёмов.

Всё это не финальная точка, а скорее фундамент. Мы продолжаем развивать инструменты, улучшать документацию, автоматизировать контроль качества и думать, как сделать работу с данными ещё прозрачнее и удобнее.

С точки зрения процесса мы поняли, что не стоит бояться менять подходы к данным, даже если кажется, что всё уже устоялось и локальные проблемы можно решить по ходу. Инвестировав немного усилий, можно внедрить инструменты, которые в разы ускоряют работу и упрощают разработку. Это был итеративный процесс: мы двигались от конкретных проблем и не пытались решать всё сразу, чтобы не усложнять логику.

Можно возразить, что мы просто сделали велосипеды, которые могли бы заменить opensource-решениями. Однако на практике стало понятно, что создание собственного решения, подходящего именно для нашего стека и инструментов, гораздо проще и эффективнее, чем попытки интегрировать сложные и перегруженные opensource-опции.

Будем рады, если наш опыт окажется полезным. Будем рады ответить на ваши комментарии о работе с ML-данными.

Комментарии (0)