Всем привет! Меня зовут Саша Ткачев, я ведущий дата-инженер отдела управления ценностью данных в Лемана ПРО (Леруа Мерлен). Наша команда занимается разработкой self-service инструментов для пользователей платформы данных. Сегодня расскажу о новом продукте — DQ platform.
Почему именно платформа? У нас получилась большая система, которая включает в себя самые разнообразные компоненты — планировщик заданий, REST-API интерфейс предоставления информации и приема DQ проверок на исполнение, модуль работы с секретами, множество систем: хранения, сквозного логирования, обслуживания БД, восстановления работоспособности при возникновении ошибок (сетевых, работоспособности БД), валидации входящих моделей и т.п.
Почему мы вообще решили этим заняться?
У нас не было централизованного процесса по проведению DQ проверок, и каждая команда городила свой огород из самописных скриптов. Работало одновременно несколько систем, и у всех были собственные подходы проведения DQ проверок. Прогнозировать сроки реализации проверок было практически нереально, переиспользовать инструменты — невозможно, документация по продуктам тоже отсутствовала.
Стало понятно, что уйти от зоопарка решений и сократить время вывода проверок в продуктовые среды можно только с помощью перестройки процессов. Вишенкой на торте стала реализация на базе решения Sphinx качественной документации, которая создается и генерируется на основе кода.
Изменение подходов и создание единого ядра исполнения сделало возможным вывод тестов через графический интерфейс за нескольких минут. Для более сложных тестов все стало зависеть только от скорости работы команд и их готовности проводить релиз своих проверок. С нашей стороны сроки проведения ревью для набора тестов (обычно в наборе 5-15 тестов) теперь составляют до 1 дня (но это самый пессимистичный вариант, обычно 10-15 минут).
Важно понимать, что DQ — это процесс, который никогда не заканчивается. И чем быстрее мы меняемся, тем эффективнее мы работаем и тем качественнее наши данные.
Ниже расскажу про наше идеальное видение платформы и про то, как мы к нему пришли.
Планировщик заданий
Зачем разрабатывать платформу с собственным планировщиком, если можно на Airflow простыми DAGs сделать такой же функционал с локальными запусками?
Первое ограничение получения данных — это требование OLTP-системы по технологическим окнам. Есть OLTP-системы, которые отвечают за производственные процессы, и нагружать эти системы DQ проверками во время пика нагрузок не совсем оправданно. Точнее сказать, такие процессы должны иметь свой регламент и разрешать выполнение только в технологические окна, которые определены администраторами OLTP-систем. Становится понятно, что гарантии может дать только глобальный планировщик, а набор локальных планировщиков на это не способен.
Второе ограничение — мы не должны превышать лимит базы данных на количество подключений. Следовательно, нам нужно ограничить количество подключений на пользователя и на базу. Не имея полной информации о том, кто сейчас исполняет проверки, такие гарантии давать бессмысленно.
Какие гарантии дает наш планировщик:
Есть регламентные или запрещенные окна. При выполнении в запрещенный интервал мы отмечаем выполнения как ошибку и обрываем pipeline обработки. Цель – не копить очередь на исполнение в своем бэке.
На каждое подключение создается пул в 4 подключения. В принципе, этого достаточно почти для любой СУБД.
На каждый instance задается пул в 10 подключений. Сколько бы ни было пользователей, мы будем копить очередь. Как вы догадались, очистка очереди происходит как раз при попытке запуска обработки в запрещенный интервал, который падает с ошибкой.
Теперь понятно, что локальными DAGs, нам не удастся гарантировать выполнение этих идей.
Планировщик обосновали, двигаемся дальше.
СУБД для хранения результатов
Результат работы теста (это самое важное) должен быть сохранен в отдельном хранилище. Оно позволяет нам работать в асинхронном режиме при сравнении наборов данных между двумя различными СУБД. Выполнение всегда опирается на технологические окна, которые, как обычно, не сходятся по времени работы.
Следующее требование — это эффективное и простое сравнение результатов. Сравнение любых наборов в бэке должно происходить без использования join, но с использованием синтаксиса SQL. Сравнивать в запросе приходится по 2, 3, 10 и более наборов из разных СУБД, причем соединение идет по full join. Под капотом мы создали универсальную модель хранения assets, которая состоит всего из двух таблиц. Операции join заменяются на агрегат с фильтрацией — получается красиво и эффективно. Запросы типовые. С точки зрения объема информации места явно занимаем больше, чем в классическом варианте с нормализованными формами таблиц. Дата-инженерам модель пришлась по вкусу, хотя поначалу было непривычно даже продвинутым специалистам. Эту модель рассмотрим чуть позже.
REST API
Наша платформа должна просто интегрироваться в ландшафт компании. Тут самым очевидным способом является работа через REST API. На сегодняшний день поверх платформы как раз через REST API работают уже несколько продуктов DQ.
Отдача информации о состоянии проверки происходит тоже через REST API, как и лог исполнения проверки. При постановке проверки на исполнение пользователю возвращается уникальный идентификатор UUID, который и обеспечивает прозрачное логирование, а также повышает доверие к платформе как таковой.
Закинул мету в REST API — остальное сделает DQ platform. Если возникнут непредвиденные обстоятельства, платформа отдаст пользователю внятный лог ошибки.
Работа с подключениями
Вопрос простой по сути, но не по трудоемкости.
Разделим подключение на две составляющие: instance и учетную запись. Как бы странно ни звучало, это очень полезно. Пусть у нас будет 1000 instances, и для каждого instance нужно завести пользователя, а их может быть несколько. Один пользователь — для чтения публичной информации, другой — для чтения чувствительных для бизнеса данных. Сложность заведения пропорциональна количеству пользователей для каждого instance. Instance, как правило, — это не конфиденциальная информация, следовательно, при построении подключения instance можно передавать в открытую, а вот учетные данные пользователей необходимо хранить в vault. Теперь для уровня доступа public можно завести одного пользователя. Размерность задачи сильно уменьшилась — превратилась в хранение данных по одному пользователю в vault.
Идеальное видение платформы сформировали, можно описывать архитектуру проекта.
Архитектура проекта
У нас стандартом в компании принят планировщик Airflow (все дата-инженеры могут писать только на этой технологии). А это Python с его мощным набором библиотек: Pydantic, Alembic, Sqlalchemy, Pymongo, Networkx, SODA. Поэтому для самого ядра обработки мы взяли Airflow и сделали его динамичным (использовали DAG run).
Pydantic – быстрая библиотека для валидации и сериализации данных.
Alembic – поддержка миграций для наших двух БД.
Sqlalchemy – универсальный доступ к любой SQL СУБД. Причем некоторые СУБД поддерживают подключение в режиме stream. То есть для получения информации можно использовать легкие обработчики информации.
Sqlalchemy + Networkx + Airflow (DAG run) – планировщик заданий.
Pymongo – доступ к MongoDB.
SODA – один из лучших DQ фреймворков. У меня были пилоты и на GE, но результат не очень.
Для REST-API, был опыт работы с Fast API, с его бесшовной интеграцией с Pydantic из коробки. Прикрутили OAUTH 2.0 авторизацию через Keycloack.
Из схемы остается непонятным, как это работает на Airflow и что значат модули, которые отвечают за транспортировку результатов запросов. На самом деле, как раз при выходе Airflow 2.0 мы ждали выход DAG run, который может запускать подчиненный DAG обработки. Так вот, сами атомарные TransferSQL, TransferMongo и DQP_SODA — это универсальные DAGs, которые выполняются только через DAG run, у них нет расписания. Вся мета с параметрами для запуска хранится в СУБД Postgresql. Airflow DAG в нашем случае — это универсальный обработчик, который делает одну универсальную операцию.
Для обработки мы вводим понятие pipeline. Его выбрали как у проекта ARGO — это разделение меты на шаги исполнения и последовательность исполнения (DAG). Шаги исполнения — как раз наши базовые методы (TransferSQL, TransferMongo, DQP_SODA), задающиеся через мету описания и DAG обработки, которые реализованы на базе библиотеки Networkx.
Как организована динамическая маршрутизация на Airflow?
Каждый DAG реализован в виде класса, который имеет атрибуты (kind, version) — уникальные параметры для каждой меты. У класса обработки формируем уникальный Airflow DAG ID в пределах коммунального Airflow. Далее уже дело техники: составляем словарь {(kind, version), “Класс обработки”}; при получении пары (kind, version) у класса обработчика получаем его ID и вызываем его через DAG run.
То есть от классического Airflow и его DAG у нас получилась что-то типа Celery с набором обработчиков поверх Airflow.
Почему мы не использовали классический Celery? Ответ очень простой у нас есть:
SRE-инженер, который отвечает за бесперебойное функционирование Airflow;
большая экспертиза по построению различных решений на Airflow.
Как говорится, без лишних затрат мы успешно интегрировали существующее решение в сложную корпоративную среду.
Универсальная модель хранения assests
Asset в нашем понимании — это набор данных, сформированный запросом к источнику.
Классифицируем поля согласно классической OLAP теории на группы:
измерения;
меры;
время.
Начнем с ER-диаграммы:
Рассмотрим таблицу fact_asset_data.
Измерения в формате {key: value}::json - сохраним в поле dim_fields.
Для мер аналогично в measure_fields.
Время вынесем как отдельное поле date_from.
Добавим еще и date_to, к какому периоду относится asset. [date_from, date_to) — интервал постоянства измерений и мер (верхняя граница не включается в интервал).
Is_actual — bool значение, которое указывает на актуальность asset при перекрытии по временному периоду.
Dim_asset_id поле для связи с таблицей dim_asset.
Остальные поля носят специальный характер.
Таблица dim_asset формирует смысловую нагрузку: что за набор данных к нам пришел, его описание.
Поля (domain, key) формируют альтернативный уникальный ключ. Domain — это команда, которая сформировала asset, key — его уникальный идентификатор. Key обычно случайно сгенерированный UUID.
Операции объединения asset без join
Как уже поясняли ранее, операции full join для мер заменяем операциями агрегации и фильтрации, а для измерений используем coalesce.
Как это работает более подробно показано в репозитарии: https://github.com/biwed/dp--review/blob/main/dq_platfrom/dq_platform_01/sql/create_test.sql.
Заключение
Самым большим успехом можно считать изменение подхода к DQ в нашей компании. Были оптимизированы процессы вывода проверок для дата-инженеров в продуктовую среду и созданы системы мониторинга на основе сходимости и качества данных.
На уровне компании DQ platform стала стандартом исполнения DQ проверок. Поверх DQ platform работают системы построения витрин и UI-системы с визуализаций результатов работы (Réviseur).
Теперь DQ — это действительно управляемый процесс с положительным влиянием на показатель скорости вывода проверок в продуктовую среду. Оценочно этот показатель в среднем уменьшился с недель до часов.