Привет, Хабр!

Меня зовут Марк Порошин, в DV Group я занимаюсь Data Science. Мы работаем с большим количеством данных, на данный момент приближаемся к 10тб данных на нашем кластере Greenplum. Так как бизнес достаточно молодой, требования заказчиков, аналитиков постоянно меняются, да и сама структура данных периодически дополняется, поэтому мы выбрали достаточно современную технологию построения Data Warehouse — DataVault. Данные методологии очень привлекателны своей гибкостью, однако ценой за эту гибкость будет огромное количество таблиц. Это приводит сразу к двух основным проблемам:

  • Нужна база данных, которая поддерживает и хорошо справляется с большим количеством join-ов;

  • Способ автоматизации инкрементального наполнения таблиц, поскольку руками прописывать SQL запросы очень трудоемко, а еще это чревато ошибками.

Здесь я расскажу про технологию, которую мы используем в DV Group — dbt(data build tool), она позволяет во многом справиться со второй проблемой и очень хорошо себя зарекомендовала в нашем проекте.

Настройка проекта

Знакомство с dbt начнем с тестового проекта. В качество целевой базы данных будем использовать postgres, которую я настроил локально на своей машине. Создаем папку проекта, я буду работать в PyCharm, это вовсе необязательно, тут каждый выбирает сам. Необходимо настроить окружение python3 и установить необходимые зависимости.

pip install dbt-core==1.1.0 dbt-postgres==1.1.0

После этого инициализируем dbt проект:

(venv) ➜  PostgresDBTIntro dbt init
11:32:18  Running with dbt=1.1.0
Enter a name for your project (letters, digits, underscore): dbt_postgres_intro   
Which database would you like to use?
[1] postgres

(Don\'t see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
11:33:04  
Your new dbt project "dbt_postgres_intro" was created!

На данный момент у вас в проекте должна появиться папка с таким же названием, что вы указали в качестве имени проекта.

Первые шаги в dbt

Давайте немного пройдем по файлам, которые появились после инициализации проекта.

структура проекта dbt
структура проекта dbt

Под номером один находится файл dbt_project.yml, в котором мы описываем структуру проекта, переменные(vars), дефолтные типы материализаций моделей. Также здесь можно прописать хуки on-run-start, on-run-end. К этим тонкостям мы вернемся позже, а сейчас рассмотрим файл под номером 2 my_first_dbt_model.sql

/*
    Welcome to your first dbt model!
    Did you know that you can also configure models directly within SQL files?
    This will override configurations stated in dbt_project.yml

    Try changing "table" to "view" below
*/

{{
    config(materialized='table')
}}

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data

/*
    Uncomment the line below to remove records with null `id` values
*/

-- where id is not null

Пропускаем блок комментариев, экранированных с помощью /* ... */ и видим:

{{
    config(materialized='table')
}}

DBT построен на основе Jinja, поэтому {{ ... }} используются для экранирования кода. В нем вызываем macro(читай “функцию”) - config, в который передаем аргументы для конфигурации нашей модели. В данном случае у нас всего лишь один аргумент materialized со значением 'table'. Это значит, что в результате запуска модели “my_first_dbt_model”, должна быть создана (пересоздана) таблица с таким же названием, как и название файла. 

Следом идет sql код для выбора данных:

select *
from source_data

Прежде чем запускать модель, нужно разобраться с еще одним моментом.Пока мы еще нигде не прописали креденшены для подключения к инстансу нашего Postgres’a. Это делается с помощью файла profiles.yml. В моем случае он выглядит следующим образом:

config:
  send_anonymous_usage_stats: False
  use_colors: True
  partial_parse: True


dbt_postgres_intro:
  outputs:
    dev:
      type: postgres
      threads: 3
      host: localhost
      port: 5432
      user: markporoshin
      pass: "<password>"
      dbname: dbt_intro_db
      schema: public
  target: dev

Я разместил этот файл на одном уровне с файлом dbt_project.yml, сделано это для удобства дальнейшего деплоя. DBT предлагает стандартное расположение файла со всеми конфигурациями (/Users/<user>/.dbt/profiles.yml на mac os). Чтобы узнать ваше дефолтное расположение, можно просто попробовать запустить модель, и в логах dbt напишет, где он по дефолту ищет файл с конфигами подключения:

dbt run --project-dir ./ -m my_first_dbt_model

Если же вы расположите profiles.yml также как я, вызов модели будет выглядеть следующим образом:

dbt run --project-dir ./ --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model

Здесь мы указываем расположением dbt проекта --project-dir ./; путь к папке с файлом profiles.yml - --profiles-dir ./; название профиля --profile dbt_postgres_intro, поскольку у вас может быть несколько профилей в одном файле profiles.yml для разных проектов или разных окружений (например DEV, PROD)

При запуске модели для базы данных Postgres dbt дополнит его create table ... as ... и мы получим следующий sql код для создания таблицы:

create  table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as (
	with source_data as (
	    select 1 as id
	    union all
	    select null as id
	)
	select *
	from source_data
);

Остановимся здесь чуть подробнее. DBT создал нам табличку, но в названии почему-то присутствует постфикс __dbt_tmp. Это связано с тем, что dbt создает таблицу в несколько этапов:

 -- создание новой таблицы
create  table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as (
with source_data as (
    select 1 as id
    union all
    select null as id
)
select *
from source_data
);

 -- если целевая таблица уже есть, переименуем ее в backup
alter table "dbt_intro_db"."public"."my_first_dbt_model" rename to "my_first_dbt_model__dbt_backup";

 -- теперь переименуем новую таблицу в целевую
alter table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" rename to "my_first_dbt_model"

-- после того, как все предыдущие этапы прошли успешно, можем удалять backup
drop table if exists "dbt_intro_db"."public"."my_first_dbt_model__dbt_backup" cascade

dbt отслеживает успешность обновления таблицы, а если что-то пошло не так, возвращает все к “статусу кво”.

Проследить за тем, что именно делает dbt, при вызове модели, можно добавлением флага -d:

dbt -d run --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model

Как работает dbt

Чуть подробнее остановимся на том, что происходит, когда вы запускаете модель. При выполнении dbt run dbt выполняет следующие действия:

  1. Парсит модели, макросы, тесты итд. На этом этапе не выполняются никакие sql запросы;

  2. Компилирует и запускает файлы уже не содержащие Jinja код.

Это важно понимать, чтобы избегать ошибок. Полезно прочитать статью в документации.

Понимание этого факта может помочь в дебаге запуска моделей. После успешной компиляции, скомпилированный файл можно найти в папке target/compiled(генерируется автоматически), а если была успешно пройдена стадия run в папке target/run можно найти sql код который будет выполнен.

Магия jinja

Наконец-то мы можем перейти к самому “вкусному” в dbt, тому, что помогает избавиться от написания boilerplate кода и сильно упростить жизнь data engineer =).

Для начала создадим новую dbt модель, чтобы немного наполнить нашу базу данными:

{{
    config(
        materialized='table',
    )
}}


select 1 as id, 'Nikita' as name, 'Analytics' as type
union
select 2 as id, 'Stanislav' as name, 'Analytics' as type
union
select 3 as id, 'Alex' as name, 'CTO' as type
union
select 4 as id, 'Artem' as name, 'DevOps' as type
union
select 5 as id, 'Artem' as name, 'DataScience' as type
union
select 6 as id, 'Victor' as name, 'Backend' as type
union
select 7 as id, 'Mark' as name, 'DataEngineer' as type

Переменные

В dbt существует два способа работать с переменными.

Во-первых, вы можете их указать в файле dbt_projects.yml:

vars:
  developer_name: "Nikita"

Дальше использовать в модели:

{{
    config(
        materialized='view',
    )
}}


select
    id,
    type
from {{ ref('developers') }} d
where d.name = '{{ var('developer_name') }}'

Здесь мы видим сразу несколько новых моментов. В качестве материализации мы выбрали тип 'view', это приводит к созданию не таблицы, а view. Дальше мы берем в качестве источника данных {{ ref('developers') }}, то есть мы хотим, чтобы dbt нашел модель developers и сам подставил путь к ней (возможно, что модель лежит не в дефолтной схеме или для нее задан alias, это все можно настроить в macro config). И последнее, в условии where с помощью макроса var обращаемся к глобальным переменным dbt и вытягиваем значение переменной developer_name.

Во втором случае мы можем использовать локальные переменные:

{{
    config(
        materialized='view',
    )
}}

{% set type = 'DevOps' %}


select
    id,
    name
from {{ ref('developers') }} d
where type = '{{ type }}'

Создаем переменную с помощью ключевого слова set и экранизируем это все с помощью {% … %}.

Сразу зафиксируем, что в dbt по документации существует три типа “экранизации”:

  1. {{ ... }} — для вывода переменных или результатов выполнения макросов в скомпилированный файл;

  2. {% ... %} — для объявления переменных, циклов, условных операторов и т.д.;

  3. {# ... #} — комментарии.

Я встречал использование {%- ... -%}, кажется это тоже самое, что и обычные скобки с процентами.

Циклы

Я думаю уже примерно понятна логика и структура Jinja инъекций в dbt. Ниже приведен пример модели, в которой используются массив и цикл:

{{
    config(
        materialized='table',
    )
}}


{%- set types = ['Analytics', 'DataScience'] -%}


select
    id,
    name
from
    {{ ref('developers') }}
where type in (
    {%- for type in types -%}
        '{{ type }}'
        {%- if not loop.last %},{% endif -%}
    {%- endfor -%}
)

Использование вспомогательных запросов

Зачастую хочется выполнить какой-то вспомогательный запрос, прежде чем запускать саму модель. Например, в контексте наших данных, мы хотим сначала получить разработчиков, наименования которых начинаются с буквы ‘а’, сохранить их в переменную, а потом использовать в целевом запросе. Понятно, что это все можно прописать в самом запросе, но существуют задачи, когда такое решение получается либо не оптимальным, либо громоздким, а иногда и вовсе невозможным. Рассмотрим использование вспомогательных запросов на примере:

{{
    config(
        materialized='table'
    )
}}

{% set names_start_with_a_query %}
    select
        name
    from
        {{ ref('developers') }}
    where lower(name) like 'a%'
{% endset %}
{% set names_start_with_a = [] %}

{% if execute %}
    {% set names_start_with_a = run_query(names_start_with_a_query).columns[0].values() %}
{% endif %}
{{ log(names_start_with_a, info=True) }}

select
    id,
    name,
    type
from
    {{ ref('developers') }}
{% if names_start_with_a != () %}
    where name in (
        {%- for name in names_start_with_a %}
            '{{ name }}'
            {%- if not loop.last %},{% endif -%}
        {%- endfor -%}
    )
{% endif %}

После блока с конфигурацией модели, мы определяем переменные, names_start_with_a_querynames_start_with_a в которые записываем вспомогательный запрос и пустой массив.

Следом идет условный оператор, где мы выполняем запрос находящийся в переменной names_start_with_a_queryи записываем результат в переменную names_start_with_a. Однако необходимо чуть подробнее остановиться на том, зачем нам нужна обертка выполнения запроса в условный оператор. Все дело в уже упомянутом жизненном цикле выполнения модели. execute — специальная переменная, которая имеет значение True, если выполнение модели(макроса и тд) в “execute” моде, это значит, что в данный момент уже прошла стадия парсинга и можно выполнять sql запросы.

Пользовательские macro

Необходимость написания собственных macro объясняется несколькими причинами: во-первых, это уменьшение дублирования кода, во-вторых декомпозиция и на самом деле теми же аргументами, зачем нужны функции во всех языках программирования. 

Создадим в папке macros файл so_important_macro.sql:

{% macro so_important_macro(number) %}
    {% set so_important_query %}
        select 1 as info
        union
        select 2 as info
    {% endset %}
    {%- set info = run_query(so_important_query).columns[0].values() -%}
    {{ log('number ' + number|string, info=True) }}
    {{ return(info) }}
{% endmacro %}

И дальше можем использовать его в нашей модели:

{{
    config(
        materialized='table'
    )
}}

{% if execute %}
    {% set info =  so_important_macro(4) %}
    {{ log(' info: ' + info|string, info=True) }}
{% endif %}

select 1 as id

В результате в логах мы получим следующее:

Running with dbt=1.1.0
Found 8 models, 4 tests, 0 snapshots, 0 analyses, 168 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
  
Concurrency: 3 threads (target='dev')

1 of 1 START table model public.test_macro ..................................... [RUN]
number 4
info: (Decimal('1'), Decimal('2'))
1 of 1 OK created table model public.test_macro ................................ [SELECT 1 in 0.15s]
  
Finished running 1 table model in 0.24s.
  
Completed successfully
  
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Инкрементальная материализация

Наконец мы перешли к самому интересному=)

Такой тип материализации позволяет инкрементально наполнять таблицу. Рассмотрим сначала данный тип на синтетическом примере. Предположим, что мы хотим на каждый запуск модели добавлять в нее максимальное значение в таблице +1, если в таблице нет данных, тогда вставляем 1.

{{
    config(
        materialized='incremental'
    )
}}

{% set data_to_insert = 1 %}

{% if is_incremental() %}
    {% set max_number_query %}
        select max(num) from {{ this }}
    {% endset %}
    {% set data_to_insert = run_query(max_number_query).columns[0].values()[0]|int + 1 %}
{% endif %}

{{ log('number to insert: ' + data_to_insert|string, info=True)}}

select {{ data_to_insert }} as num

Макро is_incremental доступен для моделей с типом incremental и он возвращает True, если таблица уже существует. Это необходимо в случае наличия рекурсии в запросе, например при дедубликации данных.

Рассмотрим, что произойдет, если мы запустим модель в первый раз. Макро is_incremental() вернет False и в итоге будет создана таблица с одной строчкой со значением 1.

Если после этого мы попробуем запустить модель еще раз, тогда is_incremental() вернет True. Внутри условного оператора мы определяем sql запрос, который возвращает максимальное значение из текущей таблицы(this — специальная переменная dbt, которая возвращает Relation на текущую таблицу). Таким образом при втором запуске в таблицу будет вставлено значение 2, в третий раз 3 и так далее.

Теперь рассмотрим реальный пример использования инкрементальной материализации с дедубликацией. Предположим, что у вас есть таблица-источник raw_source, в которую периодически вставляются данные, но там могут встречаться дубликаты строчек. Для удобства, предположим, что существует поле id, которое уникально для набора остальных атрибутов, т.е по этому полю можно дедублицировать. Мы же хотим создать таблицу, в которой будут храниться только уникальные значения.

Для начала создадим в папке models файл source.yml в котором мы опишем источники данных (таблицы, которые наполняются из внешних источников и не являются моделями dbt):

version: 2

sources:
  - name: raw
    schema: public
    tables:
      - name: raw_source

И опишем модель stage_source.sql:

{{
    config(
        materialized='incremental'
    )
}}


select distinct on (src.id)
    src.*
from
    {{ source('raw', 'raw_source') }} src
{% if is_incremental() %}
    left join
    {{ this }} dst
    on src.id = dst.id
where dst.id is null
{% endif %}

При первичным запуске итоговый select запрос будет выглядеть следующим образом:

select distinct on (src.id)
    src.*
from
    "dbt_intro_db"."public"."raw_source" src

Видно, что мы выбираем все данные из raw_source и дедублицируем их по src.id

Если же мы попробуем запустить второй раз:

select distinct on (src.id)
    src.*
from
    "dbt_intro_db"."public"."raw_source" src
    left join
    "dbt_intro_db"."public"."stage_source" dst
    on src.id = dst.id
where dst.id is null

Теперь же мы сначала пытаемся найти данные, которых еще нет в stage_source и после этого дедублицируем их по ключу src.id

Документация

Очень приятным дополнением в dbt является автоматическая генерация документации. Если вы активно используете refsource dbt может автоматически построить DAG связей. Сгенерировать документацию и запустить сервер с ui можно следующим образом:

dbt docs generate --profiles-dir ./ --profile dbt_postgres_intro
dbt docs serve --profiles-dir ./ --profile dbt_postgres_intro
Визуализация DAG’a зависимостей моделей
Визуализация DAG’a зависимостей моделей

Так же можно писать документацию моделей в файле schema.yml лежащим на уровне моделей, тогда все это тоже будет красиво оформлено в ui:

ui документации
ui документации

Заключение

Надеюсь мне удалось вас заинтересовать замечательной технологией dbt, если вы о ней еще не слышали или рассказать что-то новое для тех, кто уже присматривался к ней.

В dbt есть возможность писать свои плагины, это значительно расширяет потенциал. Пишите в комментариях ваши замечания и предложения.

В мыслях есть планы рассказать, как с помощью dbt можно строить datavault на базе greenplum и не испытывать боль =) На хабре уже есть статья на эту тему, но я бы хотел ее расширить уделить внимание деталям, тому как оркестрировать это все с помощью Dagster и ошибкам, которые мы совершили:

Исходники: ссылка.

P.S.

В качестве бонуса, мы в DV Group немного доработали адаптер dbt-postgres для greenplum, чтобы можно было выбирать поле дистрибьюции, сжатие и патриционирование: ссылка на GitHub.

Комментарии (3)


  1. little-brother
    08.06.2022 03:32

    {%- ... -%} {{- ... -}} Минусы в mustache-шаблонах используются, если требуется удалить пробельные символы слева и/или справа от вставки. Может быть полезно, если вставка внутри текстовой константы.

    Запрос на функционал, аналогичный dbt, один из самых популярных для DataGrip.


    1. p0mami Автор
      08.06.2022 11:58

      Спасибо, буду знать

      Мне кажется, что плюсом dbt является возможность интеграции с Airflow/Dagster/Meltano и т.д. Благодаря этому нам во многом удалось автоматизировать elt


  1. makar_crypt
    09.06.2022 09:16

    Я как раз подобный вопрос задавал на QA habr , мне подойдет dbt ?

    https://qna.habr.com/q/1156442