Уже более двух лет data build tool активно используется в компании Wheely для управления Хранилищем Данных. За это время накоплен немалый опыт, мы на тернистом пути проб и ошибок к совершенству в Analytics Engineering.

Несмотря на то, что в русскоязычном сегменте уже есть несколько публикаций, посвященных применению dbt, всё ещё нельзя говорить о широкой популярности и интересе, которые продукт стремительно обретает на Западе.

Поэтому сегодня я предлагаю вам экскурсию по Хранилищу Данных Wheely. В формат публикации я попытался уложить самые яркие моменты и впечатления от использования dbt, снабдив реальными примерами, практиками и опытом. Добро пожаловать под кат.

Структура превыше всего

Измерять сложность Хранилища Данных в количестве гигабайт сегодня - дурной тон

Налить кучу тяжело интерпретируемых данных без метаинформации (читай мусора) не составит большого труда. Гораздо сложнее из этих данных получить что-то осмысленное. То, на что с уверенностью могут опираться business stakeholders, принимая решения. То, что регулярно измеряется на предмет качества и актуальности. Наконец, то, что соответствует принципам Keep it simple (KISS) и Don’t repeat yourself (DRY).

Первостепенным элементом я считаю прозрачность структуры Хранилища Данных. Чаще всего DWH выстраивается согласно многослойной логике, где каждому этапу соответствует набор преобразований, детали реализации которого скрыты для последующих слоев (элемент абстракции).

Схема слоев Хранилища Данных
Схема слоев Хранилища Данных

Зеленым цветом – слой источников данных sources. Это реплики структур и таблиц из исходных систем, которые поддерживаются ELT-сервисом. Данные синхронизируются 1:1 с источником, без каких-либо преобразований. Опциональный слой flatten позволяет вложенные иерархические структуры (JSON) превратить в плоские таблицы. 

Слой staging предназначен для простых преобразований: переименование полей, преобразование типов, расчет новых колонок с помощью конструкции case. На этом этапе мы готовим почву для дальнейших преобразований, приводим всё к единому виду и неймингу.

Intermediate или промежуточный слой отвечает за формирование предварительных таблиц и агрегатов, где происходит обогащение данных. Для ряда бизнес-областей мы не используем этот слой, для других логика может насчитывать до 5-10 взаимосвязанных моделей. 

Кульминацией являются data marts или Витрины Данных, которые используются Data Scientists / Business Users / BI tools. Слой, в свою очередь, делится на:

  • dimensions: пользователи, компании, машины, водители, календарь

  • facts: поездки, транзакции, сеансы, продвижения, коммуникации

  • looker: материализованные представления и витрины, оптимизированные под чтение из BI-системы

Число 120  из заголовка публикации относится только к витринам данных:

Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures

На текущий момент в проекте:

  • 273 модели во всех перечисленных слоях

  • 493 теста на эти модели, включая not null, unique, foreign key, accepted values

  • 6 снапшотов для ведения истории SCD (slowly changing dimensions)

  • 532 макроса (большая часть из которых импортирована из сторонних модулей)

  • 7 operations включая vacuum + analyze

  • 81 источник данных

Помимо разбиения на логические слои, Хранилище можно нарезать по бизнес-областям. В случае необходимости есть возможность пересчитать или протестировать витрины, относящиеся к вертикалям Marketing / Supply / Growth / B2B. Например, в случае late arriving data или ручных корректировках маппингов/справочников.

Осуществляется это за счет присвоения моделям и витринам тегов, а также за счет богатых возможностей синтаксиса выбора моделей. Запустить расчет всех витрин вертикали Marketing и их вышестоящие зависимости:

dbt run -m +tag:marketing

Этот же принцип лежит в основе организации кодой базы. Все скрипты объединены в директории с общей логикой и понятными наименованиями. Сложно потеряться даже при огромном количестве моделей и витрин:

Иерархия проекта dbt
.

|____staging

| |____webhook

| |____receipt_prod

| |____core

| |____wheely_prod

| |____flights_prod

| |____online_hours_prod

| |____external

| |____financial_service

|____marts

| |____looker

| |____dim

| |____snapshots

| |____facts

|____flatten

| |____webhook

| |____receipt_prod

| |____wheely_prod

| |____communication_prod

|____audit

|____sources

|____aux

| |____dq

| | |____marts

| | |____external

|____intermediate

Оптимизация физической модели

Логическое разделение на слои и области - это замечательно. Но не менее важно и то, как эта логика ложится на конкретную СУБД. В случае Wheely это Amazon Redshift.

Подход с декомпозицией позволит разбить логику на понятные части, которые можно рефакторить по отдельности. Одновременно это помогает оптимизатору запросов подобрать лучший план выполнения. По такому принципу реализована одна из центральных витрин – journeys (поездки).

Цепочка зависимостей витрины поездок (journeys)
Цепочка зависимостей витрины поездок (journeys)

На этапе обогащения данных важна скорость склейки таблиц (join performance), поэтому данные сегментированы и отсортированы в одинаковом ключе, начиная с sources. Это позволит использовать самый быстрый вид соединения - sort merge join:

Конфигурация для оптимального соединения – sort merge join
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort="request_id"
   )
}}

Витрина же хранится отсортированной по самым популярным колонкам доступа: city, country, completed timestamp, service group. В случае правильного подбора колонок Interleaved key позволяет значительно оптимизировать I/O и ускорить отрисовку графиков в BI-системах.

Конфигурация для быстрого чтения витрины – interleaved sortkey
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort_type='interleaved',
       sort=["completed_ts_loc"
               , "city"
               , "country"
               , "service_group"
               , "is_airport"
               , "is_wheely_journey"]
   )
}}

При этом часть моделей есть смысл материализовать в виде views (виртуальных таблиц), не занимающих дисковое пространство в СУБД. Так, слой staging, не содержащий сложных преобразований, конфигурируется на создание в виде представлений на уровне проекта:

       staging:
           +materialized: view
           +schema: staging
           +tags: ["staging"]

Другой интересный пример – результаты проверки качества данных. Выбранный тип материализации – ephemeral, т.е. на уровне СУБД не будет создано ни таблицы, ни представления. При каждом обращении к такой модели будет выполнен лишь запрос. Результат такого запроса является слагаемым в суммарной таблице, содержащей метрики всех проверяемых объектов.

В свою очередь большие таблицы фактов имеет смысл наполнять инкрементально. Особенно при условии того, что факт, случившийся однажды, больше не меняет своих характеристик. Таким образом мы процессим только изменения (delta) – новые факты, произошедшие после последнего обновления витрины. Обратите внимание на условие where:

Пример инкрементального наполнения витрины
{{
   config(
       materialized='incremental',
       sort='metadata_timestamp',
       dist='fine_id',
       unique_key='id'
   )
}}

with fines as ( 

   select 

         fine_id
       , city_id
       , amount
       , details
       , metadata_timestamp
       , created_ts_utc
       , updated_ts_utc
       , created_dt_utc 

   from {{ ref('stg_fines') }}
   where true
   -- filter fines arrived since last processed time
   {% if is_incremental() -%}
       and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
   {%- endif %} 

), 

...

Кстати, о принципах MPP и о том, как выжать максимум из аналитических СУБД я рассказываю на курсах Data Engineer и Data Warehouse Analyst (скоро первый запуск!).

SQL + Jinja = Flexibility

Высокоуровневый декларативный язык SQL прекрасен сам по себе, но вкупе с движком шаблонизации Jinja он способен творить чудеса.

Любой код, который вы используете с dbt проходит этапы compile & run. На этапе компиляции интерпретируются все шаблонизированные выражения и переменные. На этапе запуска код оборачивается в конструкцию CREATE в зависимости от выбранного типа материализации и фишек используемой СУБД: clustered by / distributed by / sorted by. Рассмотрим пример:

Model code:
{{
   config(
       materialized='table',
       dist="fine_id",
       sort="created_ts_utc"
   )
}} 

with details as (

  select
       {{
       dbt_utils.star(from=ref('fine_details_flatten'),
           except=["fine_amount", "metadata_timestamp", "generated_number"]
       )
       }}
   from {{ ref('fine_details_flatten') }}
   where fine_amount > 0

)

select * from details
Compiled code:
with details as (  

   select
  
       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "master"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0

) 

select * from details
Run code:
create  table
   "master"."dbt_test_akozyr"."f_chauffeurs_fines"
   diststyle key distkey (fine_id)   
   compound sortkey(created_ts_utc)
 as (    

with details as (  

   select

       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "master"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0

)

select * from details

 );

Ключевым моментом является тот факт, что пишете вы только лаконичный шаблонизированный код, а остальным занимается движок dbt. Написание boilerplate code сведено к минимуму. Фокус инженера или аналитика остается преимущественно на реализуемой логике.

Во-вторых, как происходит выстраивание цепочки связей и очередности создания витрин, продемонстрированные на картинках выше? Внимательный читатель уже заметил, что в рамках написания кода при ссылках на другие модели нет хардкода, но есть конструкция {{ ref('fine_details_flatten') }}– ссылка на наименование другой модели. Она и позволяет распарсить весь проект и построить граф связей и зависимостей. Так что это тоже делается абсолютно прозрачным и органичным способом.

С помощью шаблонизации Jinja в проекте Wheely мы гибко управляем схемами данных и разделением сред dev / test / prod. В зависимости от метаданных подключения к СУБД будет выбрана схема и период исторических данных. Продакшн модели создаются в целевых схемах под технической учетной записью. Аналитики же ведут разработку каждый в своей личной песочнице, ограниченной объемом данных в 3-е последних суток. Это реализуется с помощью макроса:

Макрос управления схемами для подключений:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
 
   {%- set default_schema = target.schema -%}

   {%- if target.name == 'prod' and custom_schema_name is not none -%} 

       {{ custom_schema_name | trim }} 

   {%- else -%}

       {{ default_schema }} 

   {%- endif -%} 

{%- endmacro %}

Еще одним важным преимуществом является самодокументируемый код. Иными словами, из репозитория проекта автоматически можно собрать статический сайт с документацией: перечень слоев, моделей, атрибутный состав, метаинформацию о таблицах в СУБД и даже визуализировать граф зависимостей (да-да, картинки выше именно оттуда).

Не повторяйся – лучше подготовь макрос

Однотипный код, повторяющиеся обращения и действия, зачастую реализуемые по принципу copy-paste нередко являются причиной ошибок и багов. В Wheely мы придерживаемся принципа Do not repeat yourself и любой сколько-нибудь похожий код шаблонизируем в макрос с параметрами. Писать и поддерживать такой код становится сплошным удовольствием.

Простой пример с конвертацией валют:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd 

{%- endmacro %}
Вызов макроса из модели:
 select     

       ...

       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('transfer_min_price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
       , {{ convert_currency('pickup_charge', 'currency') }}
       , {{ convert_currency('cancel_fee', 'currency') }}
       , {{ convert_currency('net_bookings', 'currency') }}
       , {{ convert_currency('gross_revenue', 'currency') }}
       , {{ convert_currency('service_charge', 'currency') }}     

       ... 

   from {{ ref('requests_joined') }} r   

По большому счету, макрос это просто вызов функции с передачей аргументов, на уже знакомом вам диалекте Jinja. Результатом работы макроса является готовый к исполнению SQL-скрипт. Макрос для кросс-сверки значений в колонках:

Сравнить значения двух колонок
-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
  
   {%- if is_numeric == true -%}
       {%- set src_column = 'round(' + src_column + ', 2)' -%}       
       {%- set trg_column = 'round(' + trg_column + ', 2)' -%}
   {%- endif -%} 

       CASE
           WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
           WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
           WHEN {{ src_column }} IS NULL THEN 'missing in source'
           WHEN {{ trg_column }} IS NULL THEN 'missing in target'
           WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
           ELSE 'unknown'
       END

{%- endmacro %}

В макрос можно запросто записать даже создание UDF-функций:

Создать UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}

{% set sql %}

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
           RETURNS VARCHAR(512)
           STABLE
       AS $$
       # Convert column to binary, strip "0b" prefix, pad out with zeroes
       if bitwise_column is not None: 
           b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
           return b
       else:
           None 
       $$ LANGUAGE plpythonu
       ;  

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
           RETURNS VARCHAR(128)
       STABLE
       AS $$
           SELECT nvl(
                         DECODE($2, null, null, 'deleted')
                       , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
                       , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null) 
                   )
       $$ LANGUAGE SQL
       ;         

 {% endset %}

 {% set table = run_query(sql) %}

{%- endmacro %}

Параметризовать можно и довольно сложные вещи, такие как работа с nested structures (иерархическими структурами) и выгрузка во внешние таблицы (external tables) в S3 в формате parquet. Эти примеры вполне достойны отдельных публикаций.

Не изобретай велосипед – импортируй модули

Модуль или package - это набор макросов, моделей, тестов, который можно импортировать в свой проект в виде готовой к использованию библиотеки. На портале dbt hub есть неплохая подборка модулей на любой вкус, и, что самое главное, их список постоянно пополняется.

С помощью модуля логирования и добавления 2 простых hooks на каждый запуск dbt у меня как на ладони появляется статистическая информация о времени, продолжительности, флагах и параметрах развертывания. Я наглядно вижу модели анти-лидеры по потребляемым ресурсам (первые кандидаты на рефакторинг):

models:
   +pre-hook: "{{ logging.log_model_start_event() }}"
   +post-hook: "{{ logging.log_model_end_event() }}"
Мониторинг развертывания dbt моделей на кластере Redshift
Мониторинг развертывания dbt моделей на кластере Redshift

Измерение календаря собирается в одну строку, при этом набор колонок поражает:

{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
Измерение календарь, сгенерированное макросом
Измерение календарь, сгенерированное макросом

С помощью модуля dbt_external_tables я уже выстраиваю полноценный Lakehouse, обращаясь из Хранилища к данным, расположенным в файловом хранилище S3. К примеру, самые свежие курсы валют, получаемые через API Open Exchange Rates в формате JSON:

External data stored in S3 accessed vith Redshift Spectrum
 - name: external
     schema: spectrum
     tags: ["spectrum"]
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://data-analytics.wheely.com/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Ну и, конечно, ночью по расписанию работает VACUUM + ANALYZE, ведь Redshift это форк PostgreSQL. Дефрагментация, сортировка данных в таблицах, сбор статистик. Иначе говоря поддержание кластера в тонусе, пока dba спит.

dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
VACUUM + ANALYZE
VACUUM + ANALYZE

Running in production: используем dbt Cloud в Wheely

dbt Cloud это платный сервис для управления проектами, основанными на движке dbt. За небольшие деньги команда получает возможность создавать окружения, конфигурировать джобы и таски, устанавливать расписание запусков, и даже полноценную IDE (среду разработки!) в браузере.

Прежде всего речь идет об удобстве использования: приятный и понятный визуальный интерфейс, простота поиска и ориентирования, акцентирование ключевой информации при разборе ошибок и чтении логов:

Во-вторых, это гибкие настройки условий запуска джобов. Начиная от простых условий с выбором дня недели и времени, продолжая кастомными cron-выражениями, и заканчивая триггером запуска через webhook. Например, именно через вебхук мы связываем в цепочку завершение выгрузок для кросс-сверки и начало расчета соответствующих витрин в Хранилище (kicked off from Airflow):

В третьих, это консолидация всех важных уведомлений одном месте. Для нашей команды это канал в Slack и любые проблемы связанные с Production-запусками. В режиме реального времени мы получаем все уведомления об инцидентах с деталями и ссылками на подробный лог.

Сам dbt является проектом с открытым исходным кодом, и использование продукта dbt Cloud представляется очень удобным, но не обязательным. В качестве альтернативных способов можно выбрать любой другой оркестратор: Airflow, Prefect, Dagster, и даже просто cron. В своем проекте Сквозная Аналитика я организую оркестрацию при помощи Github Actions. Выходит очень занятно.

Вместо заключения

В команде аналитики Wheely мы стремимся к тому, чтобы работа была наполнена смыслом и приносила удовлетворение и пользу, но не раздражение и негодование. Все перечисленные в публикации достоинства не могут не вызвать симпатию со стороны новых членов команды и значительно ускоряют процессы адаптации и onboarding.

Сегодня бизнес и команда активно растут. Доступен ряд интересных позиций: Head of Data Insights, Product Analyst. У тебя есть возможность узнать детали из первых уст и получить прямую рекомендацию.

Также время от времени я провожу вебинары и выступления, на которых подробнее рассказываю о своей работе и проектах. Следить за моими публикациями можно в телеграм-канале Technology Enthusiast – https://t.me/enthusiastech

Пишите, задавайте вопросы и, конечно, пробуйте dbt в своих проектах!