Привет, сообщество. Меня зовут Илья, я старший разработчик в БФТ-Холдинге. В этой статье хочу поделиться опытом использования dbt (data build tool) в рамках проекта «Консолидации и нормализации данных» для процессов трансформации данных и создания витрин.
Данные собираются из огромного количества источников.
Изначально при выборе инструмента трансформации руководствовались такими известными преимуществами dbt:
простота использования и освоения: не требуется знание дополнительных языков, технологий и т.д. кроме SQL;
избавление от ручных операций DDL, т.к. это трудоемко, особенно при больших объемах и нескольких контурах разработки;
возможность переиспользования (reused) кода;
наличие шаблонизатора Jinja для написания более гибкого и эффективного кода;
наличие большого количества реализованных коннекторов: GreenPlum, ClickHouse, Spark и т.д.
реализация на Python, поэтому совместимость и поддержка многих сторонних продуктов.
Для эффективной работы с таким количеством данных применяется методология Data Mesh: несколько команд (6-7 команд по 4-8 человек) работают над своим доменом данных, определяемом бизнес-функцией. Результатом работы являются витрины, из которых данные в виде сообщений в формате json отправляются в соответствующие микросервисы. Внутри каждого домена реализуются свои алгоритмические особенности, но пайплайн каждого домена унифицирован и реализуется набором специальных dbt-моделей, о чем будет рассказано ниже.
Наше хранилище построено на базе Greenplum. Оркестрацией занимается Apache Airflow, в котором реализован оператор DbtOperator. Он умеет запускать нужную команду dbt run -–model …, dbt run -–select
… и т.д.
Данные поднимаются из источников в хранилище в слой ods оперативных данных. Далее происходит трансформация данных с использованием dbt: собирается core-слой данных, где формируется версионность, а далее строятся витрины в dm-слое.
Расскажу чуть подробнее про пайплайн или «цепочку трансформации» – перечень моделей, которые реализуют определенный для данного домена алгоритм обработки данных и приводят к созданию одной или более таблиц в core- и dm-слое. Каждая цепочка трансформации имеет свой уникальный код (transform_code). Как правило, даг обработки одной цепочки трансформации состоит из трех шагов\задач (task) DbtOpertor:
begin_transformation: специальная служебная dbt-модель обращается в лог обработок data_transform и ищет там дату окончания периода предыдущей успешной (is_success = 1) обработки – эта дата становится началом текущей обработки, а окончанием – время запуска обработки;
dbt_transformation: выполняются модели данной цепочки трансформации с параметрами периода обработки из предыдущей задачи. Перечень моделей для выполнения фиксируется в конфигурации task’а;
end_transformation: еще одна служебная dbt-модель определяет, была ли цепочка трансформации выполнена успешно, т.е. все модели в ней выполнились без ошибок, и фиксирует период обработки и признак успешности в лог обработок.
Параметры конфигурации дага хранятся в базе метаданных Airflow – и ключевым параметром дага является код трансформации: именно благодаря ему даг «умеет» собирать модели конкретного домена.
Таким образом, в сборе любой цепочки трансформации принимают участие специальные служебные dbt-модели и даг Airflow – вместе платформа нашего хранилища данных.
В своей статье «How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh» автор Zhamak Dehghani (Жамак Дегани) выделяет возможности, которые должна обладать платформа хранилища данных. Ниже приведены некоторые из них, которые реализованы в нашей платформе связкой dbt + Airflow:
масштабируемое хранения данных в разных форматах;
версионность data-продуктов;
хранение схемы данных data-продукта;
оркестрация потоков/процессов по обработке данных;
хранение метаданных и data lineage;
расчёт метрик качества data-продуктов;
ведение каталога данных.
И немного о метриках качества, упомянутых выше. Ключевые модели в каждом слое унифицированы по структуре – это позволяет при необходимости получать метрики качества обработки данных в домене. В некоторых наших доменах есть практика применения расчета параметров «доля готовых для отправки сообщений» и «доля успешно отправленных сообщений». Доля готовых для отправки сообщений – это отношение прошедших валидацию сообщений N к общему количеству сообщений в витрине M. Невалидные сообщения анализируются, по ним устанавливаются и устраняются причины ошибок, в результате доля готовых сообщений растет. Доля успешно отправленных сообщений – это отношение успешно обработанных микросервисом R сообщений к общему количеству отправленных сообщений N. Тут работа стоится аналогично предыдущему параметру с той лишь разницей, что в результате анализа может быть сделан вывод как о некорректно сформированном сообщении, так и об ошибках в работе микросервиса.
Сбором статистики и расчетом метрик занимается еще один вид служебных моделей. В их работе используется унификация моделей и возможности шаблонизатора Jinja, фрагмент кода для сбора данных о невалидных сообщениях приведен ниже.
{% for table in tables %}
select
{{table[1]}}::text msg_type,
count(distinct {{table[2]}}) filter(where coalesce(err, '') <> '') as count_err,
count(distinct {{table[2]}}) as count_all
from {{table[0]}}
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
В настоящее время проект насчитывает порядка 3 тысяч моделей, 350 макросов, 200 источников (sources) и 50 тестов. Благодаря выбранным решениям, мы рассчитываем, что это далеко не предел. Доменная методология в сочетании с использованием dbt позволяют получить такие эффекты:
гибкость и масштабируемость: работа с большим хранилищем небольшим количеством команд независимо друг от друга, возможность разбиения хранилища на логические части\слои и проведение работ, пересчета каждого из них по отдельности;
простота использования и низкий порог вхождения, т.к. если разработчик, аналитик знаком с SQL, то освоить dbt не составит большого труда;
более удобная по сравнению скриптами в БД работа с git’ом и, в целом, с развертыванием без необходимости деплоя всего проекта целиком;
возможность использования при необходимости метрик качества работы домена;
возможность оптимизации и рефакторинга моделей для их выполнения с минимальными затратами ресурсов: выбор правильного ключа дистрибуции (GP) по цепочке трансформации данных, определение оптимального количества join’ов, полей и т.д. При этом при необходимости цепочку трансформации в любом месте можно безболезненно для результата модифицировать;
построение графа обработки, по которому можно отследить путь данных (Data Lineage), увидеть архитектурные дефекты, получить прозрачную структуру хранилища, свести к минимуму дублирование кода, увеличить его переиспользование, а также задать параллельную обработку моделей средствами самого dbt. Сюда же отнесу формирование каталога с документацией и описаниями.
Делитесь своим опытом и вопросами в комментариях! :)