Привет, Хабр! Это тимлид DS группы ранжирования и поиска Дана Злочевская и тимлид группы разработки Михаил Нестеров из Lamoda Tech.
Как и у любой крупной e-commerce платформы, данные — наш главный актив. Они помогают бизнесу принимать обоснованные решения, а пользователям — получать персонализированный, качественный опыт во всех продуктах Lamoda.
Поэтому в продакшене ежедневно работают десятки ML-пайплайнов, а в Airflow запускаются сотни DAG-воркфлоу. Данные готовят и используют более 100 специалистов из самых разных команд: аналитики, дата-сайентисты, ML-инженеры, маркетологи — у каждой свои задачи и логика работы с ними.
Однако с ростом команд, задач и инфраструктуры мы начали сталкиваться с рядом системных проблем:
Разрозненные подходы к подготовке данных. Каждая команда собирала данные «под себя», по своим правилам и в своем формате, что приводило к дублированию информации и нерациональному использованию вычислительных ресурсов.
Дублирование логики. Одни и те же преобразования выполнялись в разных пайплайнах с минимальными отличиями — это не только неэффективно, но и увеличивает риск ошибок.
Сложности с переиспользованием. Найти нужные данные, понять, как они были получены, и интегрировать их свой пайплайн — становилось нетривиальной задачей.
Рост time-to-market. На каждый новый ML-продукт или эксперимент у команд уходило всё больше времени просто на «разогрев»: сбор данных, выравнивание форматов, отладка пайплайна.
Тогда мы поняли, что пора систематизировать наш подход к хранению и работе с датасетами, и реализовали собственный фреймворк на основе Apache Spark — Feature Storage, который сейчас является стандартом в компании. А позже мы выделили отдельное решение для специфичных кликстрим-данных — Action Storage.
В этой статье мы хотим поделиться нашим опытом построения этих инструментов и рассказать, как со временем эволюционировал наш подход к хранению данных в Lamoda Tech. Надеемся, он будет вам полезен и подарит парочку интересных идей.

С какими данными мы работаем
В наших хранилищах более 3,5 Пб данных — это действительно много, и чтобы обеспечить их обновление, ежедневно запускаются сотни ETL-процессов.
Основной источник информации для аналитики и машинного обучения — это кликстрим. Он представляет собой топик в Kafka, который содержит необходимую информацию о действиях клиентов на сайте и в мобильном приложении — какие страницы и товары они просматривают, куда переходят, что добавляют в корзину или в избранное. Эти данные — основа персонализации и поведенческого анализа.
Кликстрим мы загружаем в HDFS с помощью Spark Streaming, после чего — без изменения смысла, но с преобразованием сложной вложенной JSON-структуры — сохраняем в Hive-таблицу.
Второй важный источник — базы данных бэкенда. Там хранится самая актуальная информация по текущему стоку и его доступности, данные по заказам, возвратам, а также по пользователям: какая у них скидка лояльности на данный момент, с какого устройства они обычно сидят, где находятся.

На основе этих данных мы решаем сразу несколько задач:
Рассчитываем метрики для дашбордов, которые помогают принимать операционные и продуктовые решения. После расчёта метрики доставляются в Power BI и Grafana.
Готовим датасеты для обучения и инференса ML-моделей, чаще всего на основе CatBoost и PyTorch.
Обновляем фичи для ML-моделей с онлайн-инференсом, чтобы они работали на самых свежих данных. Обновлённые фичи попадают в наши key-value хранилища — Aerospike и Redis.
Все эти процессы запускаются регулярно — один или несколько раз в день. Для оркестрации мы используем Airflow, а основная обработка идёт на PySpark.
Казалось бы, всё просто: берём данные, трансформируем — и передаем дальше в модель или визуализацию. Но по мере роста объёма данных, количества пользователей и требований к качеству — поддержка этой инфраструктуры становится всё интереснее и сложнее.

Больше данных — больше проблем
Изначально мы не слишком задумывались о стандартизации: данные сохранялись «как получится» — в произвольных форматах и местах. Но со временем объёмы выросли, и пользоваться такими данными стало сложно.
Чтобы переиспользовать ранее подготовленные датасеты, нужно было разбираться в чужом коде или просто дублировать логику в своих пайплайнах. Это занимало много времени и вело к накоплению технического долга.
Работа с DAG в Airflow тоже превратилась в головоломку. Каждый создавал свои пайплайны со своей структурой. Чтобы понять, что лежит в финальном датасете, нужно было прочитать и вникнуть в весь DAG — а внутри часто скрывалась сложная, многоступенчатая логика, которую не так просто было «раскурить» в короткий срок. Вдобавок данные сохранялись в разных местах и форматах, поэтому систематизировать уже готовые наборы было почти невозможно.
Всё это тормозило разработку ML-продуктов: time-to-market увеличивался, команды тратили всё больше времени не на новые фичи, а на понимание уже сделанного.
Тогда мы сформулировали, какие требования должна выполнять новая архитектура хранения и обработки данных:
Прозрачность логики. Каждый этап обработки должен быть выделен явно, а не скрыт внутри сложного пайплайна. Модели должны выполняться отдельно от расчёта фичей — изоляция помогает контролировать качество и упростить отладку.
Единый формат хранения. Мы хотели разработать универсальный формат, подходящий для разных типов фичей, но при этом гибкий и понятный. Добавление, удаление и версионирование фич должно быть прозрачным и одинаково реализованным для всех.
Воспроизводимость. В любой момент должна быть возможность восстановить значения фичей за нужную дату или период. Это критично для отладки моделей и анализа результатов в ретроспективе.
Feature Storage: фреймворк для работы с данными
Так появился Feature Storage — это наш новый подход к тому, как мы «варим» данные для моделей и дашбордов. Фреймворк состоит из двух ключевых компонентов:
Единый формат хранения датасетов и удобный интерфейс работы с ними.
Унифицированная логика построения пайплайнов.
Единый формат хранения датасетов
Мы разработали универсальный способ хранения обработанных датасетов, который назвали Feature Set. Каждый Feature Set — это формализованный набор данных, к которому обязательно указываются:
Ключ — набор колонок, гарантирующий уникальность строк,
Версия — позволяет хранить разные варианты одного Feature Set.
Чтобы упростить работу с Feature Set в HDFS, мы написали собственный пакет на python с использованием PySpark, который и дал имя всему фреймворку — Feature Storage.
Теперь, чтобы сохранить все наборы данных, достаточно вызвать одну функцию, передав в неё DataFrame
и указав нужные параметры: ключи, версию, и при необходимости — дополнительные атрибуты (например, настройки компрессии). Всё работает прозрачно, унифицировано и одинаково во всех проектах.

Теперь посмотрим, как это работает на практике.
Допустим, нам нужно рассчитать промежуточные данные — контентные фичи по товарам. Нас интересует всё, что можно сказать о товаре на момент обработки: бренд, категория, вендор, новизна, наличие скидки и так далее.
Для этого мы пишем DAG, который формирует датафрейм, где для каждого товара (SKU) в каждой стране указываются его атрибуты.

Чтобы сохранить этот датафрейм как Feature Set, нам нужно указать несколько параметров:
Название Feature Set.
Ключ. Это набор колонок, по которым гарантируется уникальность строк. В него обязательно должен входить
timestamp
— дата расчёта. В нашем случае этоsku_id
,country
,timestamp
. Логично: у одного и того же товара в одном регионе не может быть двух разных наборов атрибутов в один момент времени.Версия. Например,
v2
, потому что раньше у нас уже была версияv1
, где колонкаgender
вычислялась по-другому. Теперь логика изменилась, и мы выкатываем новую версию, не затрагивая старую.Партиционирование. Feature Storage явно фиксирует, как датафрейм будет партицироваться в HDFS — это всегда timestamp и версия. Благодаря этому можно независимо работать с несколькими версиями одного и того же Feature Set.
Новая логика построения пайплайнов
Помимо унифицированного формата хранения, мы полностью пересмотрели подход к построению пайплайнов обработки данных. Теперь у нас есть чёткие правила, которым мы следуем:
Каждый Feature Set рассчитывается в отдельном DAG. Это значит, что каждый этап обработки данных стал изолированным. Теперь гораздо проще понять, какой результат получается на каждом шаге — не нужно копаться в огромных монолитных пайплайнах.
Версионирование данных. У каждого Feature Set может быть несколько версий, которые можно независимо читать, обновлять и удалять. Это позволяет безопасно внедрять изменения и откатываться при необходимости.
Общий принцип партиционирования. Мы всегда партиционируем данные по версии, ключу и времени расчёта. Это делает работу с любым Feature Set понятной и предсказуемой: даже если его написал кто-то другой, мы сразу видим, как он устроен, и как с ним работать эффективно.
Если представить это визуально, наши пайплайны теперь — это цепочка, в которой каждый из DAG создаёт логически обособленный слой данных. Каждый DAG на выходе формирует один Feature Set. Он может использовать один или несколько наборов данных в качестве входных, а разные версии одного Feature Set могут считаться параллельно. Например, пока идёт A/B-тест разных версий фичей, мы поддерживаем обе, а после — оставляем только лучшую.

Этот подход оказался очень удобным. Специалисты перестали «изобретать велосипед» и дублировать фичи — теперь можно взять уже готовый Feature Set и использовать в своём проекте. Так у нас сформировался довольно объёмный каталог готовых популярных наборов данных: фичи по товарам, пользователям, взаимодействиям, атрибуциям событий. Всё это — результат переиспользуемых пайплайнов, с которыми работают аналитики и дата-сайентисты.
Ещё один большой плюс — стандартизированный способ записи данных. Когда объёмы выросли, стало понятно, что некоторые фичи занимают слишком много места. Раньше это означало бы, что нужно разбираться в каждом DAG вручную, лезть в HDFS, проверять, как устроены данные, и настраивать сжатие. А с Feature Storage всё оказалось проще: мы просто обновили версию пакета, где задали параметры компрессии, и получили значительное сокращение дискового пространства — без изменений в логике пайплайнов.
Action Storage: как мы упростили работу с кликстримом
Внедрение Feature Storage действительно сильно ускорило и упростило нашу работу с данными. Но оставался «слон в комнате» — кликстрим.

Напомним: для нас кликстрим — это огромная (больше 50 ТБ!) Hive-таблица с почти сырыми json
-атрибутами. Работать с ней не так просто — и вот почему:
Нет изолированного доступа к событиям. События кликстрима активно используются как в моделях, так и в аналитике. При этом важно уметь работать с разными типами событий — просмотрами, кликами, добавлениями в корзину и другими — изолированно, поскольку у каждой команды и задачи свои потребности.
Разнородные атрибуты. Каждое событие несёт свой набор атрибутов и требует отдельной обработки.
Постоянные изменения. Структура событий меняется довольно часто — нам важно поддерживать стабильную работу и плавно переходить на новые версии.
Слабый уровень мониторинга и контроля. Необходимо быстро реагировать на изменения в потоке данных и отслеживать возможные проблемы — иначе некорректные данные могут попасть в модели и дашборды, что критично для бизнеса.
Поэтому мы поняли, что хотим сделать новый инструмент для работы с событиями кликстрима. Так появился Action Storage.
Требования к нему были довольно простыми, но критичными:
Возможность изолированно работать с каждым типом события,
Валидация данных,
Переиспользование атрибутов событий,
Единый интерфейс работы со всеми событиями,
Возможность хранить и легко читать события за несколько лет,
Простая и понятная кодовая база, чтобы любой коллега мог легко поддерживать её и добавлять новые события.
Посмотрим на то, как устроен Action Storage. По сути, он состоит из двух компонентов:
нового слоя данных для событий кликстрима (набор Action-таблиц),
фреймворка для удобной работы с этими таблицами.
Архитектурно Action Storage находится между «сырым» кликстримом в HDFS и Feature Storage. При этом он не использует дополнительные источники вроде DWH — это облегчает его работу и даёт максимум контроля над качеством и структурой именно событийных данных.

В результате мы получили набор Action-таблиц, которые далее могут напрямую использоваться в Feature Storage и в других пайплайнах.
В основе Action Storage лежит приложение на PySpark, которое регулярно запускается в Airflow. Фактически это стандартный ETL-процесс, который:
читает данные из кликстрима,
обрабатывает их (приводя к нужной структуре и формату),
записывает в готовые Action-таблицы,
проверяет качество данных (валидирует структуру и атрибуты).

Так как для разных задач может требоваться поддержка различных версий событий, и каждая версия должна считаться изолированно, мы внедрили простую схему версионирования как самих Action-таблиц, так и этапов их обработки. Версия указывается прямо в имени объекта (в виде номера версии в суффиксе имени) — это позволяет легко управлять разными версиями и обеспечивать совместную работу старых и новых пайплайнов.
Давайте посмотрим, как это работает на практике — на примере создания события клика в каталоге.

Чтение данных
Здесь всё просто: кликстрим у нас уже сохраняется в HDFS. Раз в день мы читаем суточную порцию данных. При необходимости кэшируем прочитанное, чтобы в рамках одного процесса рассчитать несколько разных Action-таблиц. После чтения данных переходим к их обработке.
Обработка
Для каждого события создаем отдельный класс-обработчик. Помимо того, что сам класс события версионирован, мы также вводим версию для каждого обработчика атрибутов. Основной метод класса логически разбит на отдельные подэтапы:
Предобработка. Мы фильтруем нужные для вычисления конкретного события данные и приводим их к более удобной структуре. Например, для события клика в каталоге нам потребовалось привести вложенные json-структуры в плоский формат.
Вычисление атрибутов. Далее мы формируем атрибуты события в удобном для использования виде. Для этого применяем набор версионируемых обработчиков атрибутов — каждый тип атрибута обрабатывается отдельно. Версионирование обработчиков даёт возможность менять логику вычисления атрибута при создании новой версии события, не ломая старые пайплайны.
Постобработка. На последнем этапе мы выполняем дополнительную фильтрацию и преобразования. В случае кликов в каталоге нам, например, понадобилось добавить дедупликацию. Несмотря на то, что у нас уже есть базовая техническая очистка кликстрима от явно «плохих» или тестовых данных, она не защищает от логических дублей. Поэтому на этапе постобработки мы реализовали полноценную дедупликацию.
А теперь посмотрим на примере, как будет выглядеть код класса нашего события — ClickCatalogV1.
Два из четырех атрибутов — платформа (platform
) и артикул товара (sku
), будут парситься своими функциями-обработчиками PlatformParserV1
и ExplodedSkuParserV1
. При этом, если в другом событии потребуется аналогично парсить эти атрибуты, мы можем легко переиспользовать уже написанные обработчики. Сама обработка события происходит в методе parse, которые включает в себя предобработку (метод pre_transform
), вычисление атрибутов (метод column_parse
) и постобработку (метод post_transform
).
Ниже мы показываем пример описания класса-обработчика событий кликов по товарам в каталоге.

Важно подчеркнуть: механизм вычисления одних и тех же атрибутов в разных событиях часто совпадает. Атрибуты вроде даты, платформы, артикула товара и другие парсятся одинаково во всех событиях. Одна из ключевых особенностей нашего решения — возможность накапливать парсеры атрибутов разных версий, а затем быстро собирать новые Action-таблицы путем их переиспользования. По сути, мы строим каталог переиспользуемых парсеров — очень похоже на концепцию «маркетплейса атрибутов».
Запись
На этапе записи мы управляем количеством партиций и для экономии дискового пространства сортируем и сжимаем данные. Запись происходит в версионированные таблицы, где версия является частью имени таблицы.
Например, изначально мы выделили 8 ключевых пользовательских событий и для них создали таблицы версии v1. Затем, после рефакторинга кода и оптимизаций, подготовили новые таблицы версии v2. Теперь наши пользователи могут в своем темпе переезжать на новые версии, и рано или поздно мы отключим расчет версий v1, оставив данные для сохранения истории.
Проверка
Контроль качества данных мы реализовали с помощью библиотеки PyDeequ.
Конфигурацию проверок мы описываем прямо в настройках пайплайнов в Airflow. Ранее мы создали генератор DAG на основе HOCON-файлов, поэтому конфигурации проверок также удобно передавать в таком формате.
Пример проверки качества данных:
слева — список выполняемых проверок,
справа — описание проверяемых данных.
Результаты мы сохраняем в отдельной Hive-таблице с подробной информацией о статусе каждой проверки. На эти данные мы навешиваем оповещения и строим визуализации, чтобы быстро замечать проблемы.

Планы на будущее
Поделимся также нашими планами по развитию Action Storage — возможно, они пригодятся тем, кто пойдет по нашему пути:
Развитие проверки качества данных. Кликстрим — динамическая структура, данные часто меняются, что приводит к регулярным ошибкам. Хотим стремиться к realtime-проверке качества данных.
Перевод оставшихся загрузок на Action-таблицы. Уже перевели часть пайплайнов, что позволило сильно оптимизировать вычисления. Планируем полностью перейти на Action Storage там, где это ускорит работу.
Создание каталога данных. С ростом количества атрибутов и взаимосвязей между объектами становится сложно ориентироваться в данных. Сейчас мы активно внедряем OpenMetadata.
Создание админки событий, которая предоставляет аналитикам удобный инструмент для заведения и контроля событий
Вместо заключения
Путь к созданию Feature Storage и Action Storage начался с накопившихся рабочих сложностей: дублирование логики, трудности с переиспользованием данных и растущий time-to-market для ML-задач.
Сейчас Feature Storage является стандартом внутри Lamoda Tech. Он позволяет нам быстрее запускать модели, легче делиться фичами между командами и лучше управлять качеством данных. Action Storage, в свою очередь, решает схожие задачи для событийных данных, требующих особого внимания из-за специфики их структуры и объёмов.
Всё это не финальная точка, а скорее фундамент. Мы продолжаем развивать инструменты, улучшать документацию, автоматизировать контроль качества и думать, как сделать работу с данными ещё прозрачнее и удобнее.
С точки зрения процесса мы поняли, что не стоит бояться менять подходы к данным, даже если кажется, что всё уже устоялось и локальные проблемы можно решить по ходу. Инвестировав немного усилий, можно внедрить инструменты, которые в разы ускоряют работу и упрощают разработку. Это был итеративный процесс: мы двигались от конкретных проблем и не пытались решать всё сразу, чтобы не усложнять логику.
Можно возразить, что мы просто сделали велосипеды, которые могли бы заменить opensource-решениями. Однако на практике стало понятно, что создание собственного решения, подходящего именно для нашего стека и инструментов, гораздо проще и эффективнее, чем попытки интегрировать сложные и перегруженные opensource-опции.
Будем рады, если наш опыт окажется полезным. Будем рады ответить на ваши комментарии о работе с ML-данными.