Это приключение посвящено созданию продвинутых макросов, которые используют метаданные модели для управления инкрементальностью и тестами качества. Задание параметров модели в виде метаданных позволяет наглядно представлять их в каталоге данных и переиспользовать в разных задачах.
Квест подготовлен для раскрытия темы в рамках dbt Meetup #5 и нацелен на инженеров аналитики, которые готовы глубоко погрузиться в написание макросов dbt для решения сложных практически значимых задач.
Предложенный подход позволяет эффективно выстраивать сложные автоматизации в проекте, является незаменимым для больших дата-лейков на базе Trino/Presto и позволяет изучить макросы на продвинутом уровне, достаточном для создания собственных сложных автоматизаций.
Это урок курса-симулятора по Data WhereHouse на базе dbt от Inzhenerka.Tech. В нем 10 задач. Эта статья выложена с целью вашей практики. Решайте больше задач :) Если у вас остануться вопросы, задавайте их в нашем сообществе. Скоро у нас планируются вебинары по dbt и Dagster
Игорь
Старший аналитик
Должен признать, что проект dbt-scooters, который Марк создал для хранилища на базе PostgreSQL, впечатляет. dbt действительно выглядит как фреймворк, который позволит нам выстроить качественную и масштабируемую аналитику больших данных. Причём независимо от хранилища, ведь он имеет большое количество адаптеров для разных баз данных. Жалко, что я сам не успел до такого додуматься. Надо тоже почаще читать статьи про модные фреймворки вместо работы.
Но насколько легко мы сможем адаптировать текущие модели, которые легко переваривают небольшие датасеты в постгресе, до обработки терабайтов данных в каком-нибудь модном дата-лейке?
Предположим, мы в итоге остановимся на движке Trino, который я сейчас рассматриваю для переезда. Он позволяет запускать SQL-запросы поверх большого количества файлов, которые лежат в облачном хранилище. То есть это не полноценная база, а дата-лейк, где расчетный движок SQL отделен от хранилища файлов.
Облачное хранилище позволяет дешево хранить петабайты данных в облаке, а производительный движок Trino автоматически масштабируется в зависимости от объема данных, который нужно обработать. Это позволяет за сравнительно небольшое время обрабатывать колоссальные объемы данных, с чем постгрес не справляется просто потому, что это классическая СУБД, которая не создавалась для обработки бигдаты.
Возьмем к примеру модель events_clean, которая удаляет дубли из таблицы пользовательских событий:
{% set date = var("date", none) %}
select distinct
user_id,
"timestamp",
type_id,
{{ updated_at() }}
from
{{ source("scooters_raw", "events") }}
where
{% if is_incremental() %}
{% if date %}
date("timestamp") = date '{{ date }}'
{% else %}
"timestamp" > (select max("timestamp") from {{ this }})
{% endif %}
{% else %}
"timestamp" < timestamp '2023-08-01'
{% endif %}
Это инкрементальная модель, которая обрабатывает данные пачками, чтобы не перегрузить базу, потому что поиск дублей это очень ресурсоемкая операция, которая требует последовательного сравнения всех строк между собой.
На данный момент данные разделяются на батчи следующим образом:
при первом запуске обрабатываются все события до 1 августа 2023 года (timestamp '2023-08-01')
при втором запуске обрабатываются все оставшиеся события
при третьем и последующем запусках запрос будет обрабатывать только новые события, которые пока отсутствуют в таблице
За поиск новых событий отвечает условие "timestamp" > (select max("timestamp") from {{ this }}), которое находит их по временной метке.
Здорово конечно, что эта логика сейчас работает в текущем проекте. Но выглядит это как очередная халтура джуна, когда он кое-как решил задачу, совершенно не думая о масштабировании!
Предположим, что данные о пользовательских событиях представляют собой 500 гигабайт файлов формата parquet, которые лежат в облаке S3. Если я запущу эту модель в Trino, то во-первых движок начнет сканировать каждый файл, чтобы прочитать все значения timestamp и отфильтровать данные по условию. Это займет существенное время.
После фильтрации останется, допустим, 200 гигабайт. А хватит ли движку мощностей, чтобы удалить все дубли из такого объема данных? Не удивлюсь, если тупо закончится оперативная память, что сделает задачу полностью нерешаемой.
Нет, инкрементальную логику надо менять, чтобы сделать процесс более предсказуемым и лучше ограничить объем данных, который мы обрабатываем.
Наверняка, такой большой массив данных будет изначально партицирован по дате. Это значит, что в исходной таблице будет дополнительная колонка date, которая просто будет представлять собой дату временной метки timestamp. Зачем такое дублирование? Потому что для колонки date будет включено партицирование на уровне хранения файлов в облаке.
Теперь все 500 гигабайт будут лежать не в одной папке, а будут разделены по подпапкам, каждая их которых представляет одну конкретную дату, например:
date=2023-06-01/
date=2023-06-02/
date=2023-06-03/
Это позволит раскидать 500 гигов по папкам, а потом выбирать, с какими датами мы сейчас работаем. Тогда неэффективное условие "timestamp" < timestamp '2023-08-01' превратится в оптимизированное "date" < timestamp '2023-08-01', которое будет сканировать не все файлы, а только определенные папки, соответствующие условию.
Такое партицирование появилось давно еще в движке Apache Hive и стало настолько эффективным подходом для обработки бигдаты, что его до сих пор используют многие современные хранилища. Trino, Presto, AWS Athena, AWS Redshift Spectrum, Apache Spark, Google BigQuery и не только используют подобную технику партицирования для эффективной работы.
Даже модные форматы типа Apache Iceberg, Delta Lake и хранилища типа Lakehouse на их основе такое поддерживают, хотя тут можно использовать более сложные и автоматизированные подходы, но сейчас это не важно.
Главное, что для партицированного источника данных можно написать намного более эффективную инкрементальную модель, которая будет надежно обрабатывать данные за предсказуемое время благодаря грамотному обращению с партициями по дате.
Как же мне теперь подготовить имитатор партицированного по дате источника для текущего проекта в PostgreSQL? Встроенное партицирование постгреса, которое работает по похожему принципу, трогать не хочется, это создаст слишком много ненужного оверхеда.
Поэтому просто создадим поверх сырой таблицы событий представление с колонкой date и вообразим, что эта колонка используется для разбиения всех событий на независимые партиции, как будто мы работаем c Trino.
Задача 1. Добавить колонку date в таблицу событий.
Надо создать модель models/events_prep.sql, которая подготовит сырые события для анализа. В частности, добавить колонку с датой события:
select
user_id,
"timestamp",
type_id,
date("timestamp") as "date"
from
{{ source("scooters_raw", "events") }}
В models/properties.yml создадим запись с конфигурацией:
- name: "events_prep"
description: "User events prepared for following processing"
config:
materialized: "view"
columns:
- name: "date"
description: |
Date of event derived from timestamp.
Imitates Hive-styled partitioning of events by date.
It is needed for efficient incremental processing by engines
with partitioning support (i.e. Trino).
И материализуем представление в базе:
dbt build -s events_prep
Вот так теперь может выглядеть код новой модели events_clean_v2, которая будет использовать партицирование для более быстрой и эффективной обработки данных:
{% set date = var("date", none) %}
select distinct
user_id,
"timestamp",
type_id,
{{ updated_at() }},
"date"
from
{{ ref("events_prep") }}
where
{% if is_incremental() %}
{% if date %}
"date" = date '{{ date }}'
{% else %}
"date" > (select max("date") from {{ this }})
{% endif %}
{% else %}
"date" < date '2023-08-01'
{% endif %}
Фильтрация теперь происходит не по "timestamp", а по "date". Уже лучше, чем было, но часть проблем осталась нерешенной:
Пачки данных непредсказуемого размера из-за грубого фильтра "date" < date '2023-08-01'.
Само значение date '2023-08-01' получается запрятанным в коде модели и его может быть сложно найти в каталоге данных.
Из опыта могу сказать, что вот такое хитрое партицирование, когда мы вычисляем значения партиций на лету, может просто не сработать: "date" > (select max("date") from {{ this }}). Например, сервис AWS Athena, построенный на движке Trino, при определенных условиях может плохо обработать такое условие с подзапросом, что приведет к полному сканированию всех файлов, т.е. партицирование будет проигнорировано. Google Big Query тоже этим грешит.
Решить эти проблемы можно так:
Вместо простого условия по одной дате создадим сложный инкрементальный макрос, который будет обрабатывать строго заданное количество дней (например, 30), начиная с заданной даты. При чем этот диапазон должен настраиваться под каждую модель исходя из ее сложности и мощности хранилища.
Все параметры инкрементальности (начальная дата первой пачки, размер пачки и т.д.) должны быть описаны в одном месте у всех на виду, например, в метаданных модели.
Вместо того, чтобы находить максимальную дату данных на лету в подзапросе, заставим dbt обращаться к базе во время компиляции модели, чтобы находить значение max("date") и подставлять его в код модели как текст. Модель получится простой и пуленепробиваемой, любой движок справится с чтением конкретных партиций.
Кажется, пришло время обратиться к интернету в поисках сложных макросов…
"Я у мамы инженер"
Блогер
Привет, мои подписчики! Как и обещал, возвращаюсь к вам с разбором макросов, которые я использую для автоматизации своих проектов dbt. Макросы сложные, крутые, а я отдаю вам их бесплатно. Достойно лайка, я считаю!
Задача 2. Создать макрос для обращения к базе.
Чаще всего в комментариях вы спрашиваете, как можно во время компиляции модели обратиться к хранилищу, чтобы считать какое-нибудь значение для его подстановки в модель. Такой подход позволяет упростить код модели, чтобы базе было проще с ним справиться.
А теперь повторяй за мной. Создаем файл macros/select_first_value.sql с кодом:
{%- macro select_first_value(query) -%}
{%- if execute -%}
{{ return(run_query(query).columns[0].values()[0]) }}
{%- endif -%}
{%- endmacro -%}
В этом коде хорошо все. Во-первых, макрос принимает на вход аргумент query с текстом SQL-запроса, который надо выполнить перед материализацией модели, а точнее в момент ее компиляции. Запрос выполняется с помощью встроенной функции run_query.
Затем из результата мы извлекаем первую колонку, а из нее — первое значение, и возвращаем его на выход макроса функцией return. Такой простой подход позволяет вытащить из базы любое скалярное значение, например, максимум/минимум или true/false в качестве ответа на вопрос.
Важно, что весь код обернут в условие if execute. Это необходимо сделать, потому что dbt компилирует каждую модель в два этапа:
Предварительный парсинг и компиляция для составления Data Lineage. На этом этапе прежде всего рендерятся макросы ref и source для определения дерева зависимостей. Встроенная переменная execute принимает здесь значение false, чтобы показать, что это лишь подготовка. Функция run_query и другие сложные макросы здесь попросту не работают, поэтому наш макрос не будет делать ничего.
Полная компиляция проекта для запуском материализации. Здесь уже работают все функции и даже можно запускать предварительные запросы в базе с помощью run_query, что мы и делаем. Переменная execute принимает значение true, поэтому мы его ловим.
И не забудь создать в файле macros/properties.yml запись для каталога данных, чтобы твои коллеги быстрее уловили твою гениальность:
- name: "select_first_value"
description: "Query database and get first value from first column of result"
arguments:
- name: "query"
type: "string"
description: "SQL query to get scalar value from database during compilation"
Задача 3. Проверить макрос select_first_value.
Готово! Теперь ты можешь использовать макрос select_first_value в любой модели, я покажу, как я это делаю, в следующий раз. А пока соберем на коленке и запустим простой запрос, чтобы убедиться, что макрос работает:
dbt show --inline "select {{ select_first_value('select 111') }} * 2 as result"
Вот как dbt обрабатывает этот код:
В момент компиляции кода с помощью макроса выполняется предварительный запрос select 111, который возвращает значение 111.
Это значение подставляется в основной запрос и скомпилированный код в итоге выглядит так: select 111 * 2 as result
dbt выполняет запрос и возвращает в консоль результат вычисления:
12:34:45 Previewing inline node:
| result |
| ------ |
| 222 |
Чувствуешь, какая теперь мощь в твоих руках??
Но сильно не увлекайся, ведь если ты используешь свой новый макрос в сотне моделей, это значит, что любая компиляция проекта (то есть любое действие с ним) будет совершать сто дополнительных запросов в базу, что может сильно замедлить выполнение любой команды dbt. Особенно если ты будешь работать с бигдатой в неповоротливом дата-лейке, где даже простейший запрос может выполняться минимум несколько секунд.
Тем не менее, иметь такой инструмент в своем арсенале бесценно. Многие, например, строят таким образом инкрементальную логику для партицированных таблиц.
А мы тем временем перейдем к чему-то более забористому.
Ты уже используешь метаданные модели? Я обожаю добавлять туда всякую дополнительную информацию, которая будет полезна для каталога данных, для управления и обслуживания таблицы.
Возьмем к примеру мой любимый проект магазина бутеров jaffle-shop-classic. Вот так выглядит фрагмент из файла models/schema.yml, который частично описывает модель customers:
models:
- name: customers
description: This table has basic information about a customer...
Добавим сюда поле config.meta для добавления к модели полезных метаданных:
models:
- name: customers
description: This table has basic information about a customer...
config:
meta:
owner: "Anton Smarty"
maturity: "medium"
deprecated: false
revisit_after: "2024-01-01"
pii_policy:
has_pii: true
pii_fields: [ "first_name", "last_name" ]
Я решил таким образом сохранить в проекте информацию о следующих аспектах модели:
ее владельце (owner)
степени готовности (maturity)
флаг о том, является ли модель устаревшей (deprecated)
когда нужно ее пересмотреть (revisit_after)
словарь с описанием политики работы с персональными данными (pii_policy)
И все это не просто хранится в виде конфига, но и автоматически представлено в каталоге данных:
Красота? К любой сущности dbt можно привязать метаданные таким образом. Подробнее про это рекомендую почитать в документации: meta.
А теперь представь, что это лишь верхушка айсберга. А что если я хочу использовать свои метаданные для управления моделью, ее кодом или вообще для запуска тестов?
Допустим я хочу:
Убедиться, что владелец таблицы содержится в другой таблице сотрудников
Сделать, чтобы готовность таблицы (maturity) из метаданных попадала в саму таблицу в качестве колонки
Проверить, что колонки с чувствительными данными (pii_policy.pii_fields) действительно присутствуют в таблице
И это лишь некоторые примеры, как я готов заморочиться, чтобы мой проект dbt был не просто запускатором скриптов, но стал полноценным инструментом обеспечения наблюдаемости, управляемости и надежности хранилища с помощью автоматизированных процессов, построенных вокруг метаданных.
Но как мне получить доступ к метаданным конкретной модели из кода самой модели или из дженерик-теста? Задачка непростая, но я ее решил! Что ж, поделюсь и с тобой кодом.
Задача 4. Создать макрос для чтения метаданных модели.
Надо сделать макрос macros/get_meta_value.sql с кодом Jinja:
{%- macro get_meta_value(model, meta_key, def_value="null") -%}
{# Get meta value from model configuration #}
{%- if execute -%}
{# Graph object is available only during Run phase #}
{% set model_node = (
graph.nodes.values() |
selectattr("resource_type", "equalto", "model") |
selectattr("alias", "equalto", model.name) |
first
) %}
{{ return(model_node.get("meta", {}).get(meta_key, def_value)) }}
{%- else -%}
{# During Parse phase, return default value #}
{{ return(def_value) }}
{%- endif -%}
{%- endmacro -%}
Макрос принимает на вход модель model (а точнее объект Relation, который создают макросы ref и this), название конкретного параметра метаданных meta_key и опциональное значение по умолчанию def_value на случай, если параметр отсутствует.
Затем происходит самое сложное: макрос сканирует весь граф сущностей graph.nodes и находит там модель по имени. Из модели извлекается поле meta (хотя изначально оно задается как config.meta) и уже из этого словаря мы извлекаем нужный нам параметр.
Заметь, что вся эта логика убрана внутрь условия if execute, потому что переменная graph доступна только на втором этапе компиляции.
И для полной красоты опишем макрос в macros/properties.yml:
- name: "get_meta_value"
description: "Get meta value from model configuration"
arguments:
- name: "model"
type: "relation"
description: "dbt model from which to retrieve the meta value"
- name: "meta_key"
type: "string"
description: "Key from meta dictionary"
- name: "def_value"
type: "any"
description: "Default value if key not found"
Чтобы проверить макрос get_meta_value, надо создать модель, описать ее конфигурацию и добавить туда метаданные. К сожалению, ваших донатов не хватило на продолжение темы, поэтому отложим до следующего раза.
Пока, и не забывай поддерживать любимого дата-инженер
Игорь
Старший аналитик
Этот блогер конечно странный, но он хотя бы шарит за dbt в отличие от Марка. Надеюсь, его код в итоге заработает. Мне явно пригодится макрос select_first_value, чтобы определять, какая последняя дата в инкрементальной таблице и делать выборку новой пачки данных для обработки.
А get_meta_value я смогу использовать, чтобы задать параметры инкрементальности не просто в коде модели, а как метаданные, которые будут на виду в каталоге данных.
Задача 5. Создать макрос с инкрементальным фильтром.
Вот такой мощный макрос должен обеспечивать выборку данных для инкрементального заполнения большой таблицы. Сохраним его в macros/incremental_date_condition.sql:
{%- macro incremental_date_condition(
model,
date=none,
start_date=none,
days_max=none,
days_back_from_today=none
) -%}
{%- if date is not none -%}
"date" = date '{{ date }}'
{%- else -%}
{%- set incrementality = get_meta_value(model, 'incrementality', {}) -%}
{{ _incremental_date_condition_range(
model=model,
start_date=start_date,
initial_start_date=incrementality.get('start_date', none),
days_max=days_max or incrementality.get('days_max', 30),
days_back_from_today=days_back_from_today or incrementality.get('days_back_from_today', 1)
) }}
{%- endif -%}
{%- endmacro -%}
{%- macro _incremental_date_condition_range(
model,
start_date,
initial_start_date,
days_max,
days_back_from_today
) -%}
{%- if start_date is none and execute -%}
{%- if is_incremental() %}
{%- set start_date_query -%}
select max("date") + interval '1 day' from {{ model }}
{%- endset -%}
{%- set start_date = select_first_value(start_date_query) -%}
{%- else -%}
{%- if initial_start_date is none -%}
{{ exceptions.raise_compiler_error(
"start_date or initial_start_date are required for initial non-incremental run"
) }}
{%- endif -%}
{%- set start_date = initial_start_date -%}
{%- endif -%}
{%- endif -%}
"date" >= date '{{ start_date }}'
and "date" < date '{{ start_date }}' + interval '{{ days_max }} day'
and "date" <= current_date - interval '{{ days_back_from_today }} day'
{%- endmacro -%}
Вот его основные особенности, которые позволяют эффективно решить задачу:
Макрос генерирует условие для where с фильтрацией данных по датам. Поэтому его надо применять к исходным данным, которые мы инкрементально обрабатываем.
Макрос может принять аргумент date, тогда он сделает выборку только для одной даты
Также макрос может принять аргументы start_date (начальная дата выборки) и days_max (количество дней в выборке), чтобы обработать сразу несколько дней.
Аргумент days_back_from_today позволяет задать, сколько дней мы хотим отступить назад от текущей даты. Это связано тем, что данные, полученные в день запуска пайплайна еще не полные, поэтому этот день лучше проигнорировать и обработать завтра.
Если параметр start_date не задан, то макрос сам сделает запрос к базе с помощью функции select_first_value и определит, с какой даты надо продолжить заполнение таблицы. Это позволяет автоматизировать процесс пересчета или заполнения таблицы, не думая о дате начала новой пачки данных каждый раз.
Параметры заполнения таблицы также могут быть автоматически импортированы из метаданных модели, если они указаны в поле incrementality. За это отвечает тот самый макрос get_meta_value
Добавим подробное описание макроса в файл macros/properties.yml:
- name: "incremental_date_condition"
description: |
Get incremental SQL condition by date column for incremental model.
Macro uses parameters from 'incrementality' dictionary declared as meta of model.
Arguments also can be specified as variables overriding meta configuration.
arguments:
- name: "model"
type: "relation"
description: "Incremental dbt model to fill with date condition"
- name: "date"
type: "string"
description: |
Particular date to compute.
If specified it overrides other parameters and arguments.
Format: 'YYYY-MM-DD'
- name: "date_start"
type: "string"
description: |
Start date for incremental run. First date of incremental batch.
Format: 'YYYY-MM-DD'
- name: "days_max"
type: "integer"
description: |
Max size of incremental batch in days.
Default: 30
- name: "days_back_from_today"
type: "integer"
description: |
Number of days to step back from current date for incremental batch.
Allows to do not process incomplete data for current or recent dates.
Default: 1
Задача 6. Создать инкрементальную модель событий.
Теперь можно создать новую версию модели с очищенными от дублей событиями, которая будет правильно использовать партицирование источника.
Создадим файл models/events_clean_v2.sql с кодом на основе нового макроса:
select distinct
user_id,
"timestamp",
type_id,
{{ updated_at() }},
"date"
from
{{ ref("events_prep") }}
where
{{ incremental_date_condition(
model=this,
date=var('date', none),
start_date=var('start_date', none),
days_max=var('days_max', none)
) }}
Во-первых, модель берет данные из представления events_prep, которое содержит необходимый нам столбец «date» и имитирует партицированную по дням таблицу Trino.
Во-вторых, все параметры инкрементальности могут быть переданы в макрос incremental_date_condition напрямую из переменных dbt. Это даст дополнительную гибкость процессу заполнения таблицы (бэкфиллингу). Например, можно будет заполнять конкретные промежутки данных из командной строки или оркестратора данных, управляя процессом с помощью переменных вместо редактирования кода.
Если переменные не переданы, макрос будет искать параметры в метаданных. В крайнем случае он попытается определить значения сам, обращаясь к базе на этапе компиляции кода.
Также важно правильно сконфигурировать модель в файле models/properties.yml:
- name: "events_clean_v2"
description: "User events without duplicates"
config:
materialized: "incremental"
strategy: "merge"
unique_key: [ "user_id", "timestamp", "type_id" ]
meta:
incrementality:
start_date: "2023-06-01"
days_max: 60
Здесь выбрана стратегия инкрементальности merge, что в сочетании с заданным уникальным ключом unique_key позволит предотвратить дублирование данных при попытке пересчитать уже имеющиеся дни. Новые записи просто перезапишут старые.
А в поле метаданных config.meta задан словарь incrementality, который задает следующие параметры бэкфиллинга:
start_date — начальная дата для всей таблицы. Это значение будет автоматически использовано при самом первом запуске модели. Также оно наглядно покажет в каталоге данных, что исторические данные начинаются с 1 июня 2023 года.
days_max — размер каждой инкрементальной пачки автоматически составит не более 60 дней, т.е. почти два месяца. Это позволит пересчитать весь текущий трехмесячный датасет всего за два последовательных запуска модели.
Задача 7. Материализовать модель.
Теперь превратим модель в таблицу, попутно анализируя ее скомпилированный код.
Поскольку изначально таблица в базе отсутствует, первый ее запуск автоматически создаст таблицу, которая будет содержать первые 60 дней, начиная с 1 июня:
dbt build -s events_clean_v2
Вот так выглядит скомпилированный код модели из папки target:
select distinct
user_id,
"timestamp",
type_id,
now() as updated_at,
"date"
from
"dev_zan7"."dbt"."events_prep"
where
"date" >= date '2023-06-01'
and "date" < date '2023-06-01' + interval '60 day'
and "date" <= current_date - interval '1 day'
Теперь нужно добавить оставшийся месяц, запустив модель еще раз. Никакие параметры передавать не надо:
dbt build -s events_clean_v2
Код после компиляции меняется:
select distinct
user_id,
"timestamp",
type_id,
now() as updated_at,
"date"
from
"dev_zan7"."dbt"."events_prep"
where
"date" >= date '2023-07-31 00:00:00'
and "date" < date '2023-07-31 00:00:00' + interval '60 day'
and "date" <= current_date - interval '1 day'
Начальная дата 31 июля была автоматически определена макросом на основе данных, имеющихся после прошлого запуска. Таким образом процесс полного заполнения сводится к многократному выполнению одной и той же простой команды. Это можно организовать разными способами, но главное, что никакие параметры не надо корректировать вручную на ходу.
Разве что параметр days_max можно будет уменьшить в метаданных, если хранилище не будет справляться с объемом данных. Это приведет лишь к необходимости больше раз выполнить команду dbt build, чтобы обработать все события.
Теперь в DBeaver можно убедиться, что таблица events_clean_v2 содержит ровно столько же записей, сколько и неоптимизированный оригинал events_clean. Каждая из них представляет события за 91 день.
Если сейчас материализовать events_clean_v2 третий раз, то новые данные добавлены не будут, потому что они не пройдут обновленный фильтр:
select distinct
user_id,
"timestamp",
type_id,
now() as updated_at,
"date"
from
"dev_zan7"."dbt"."events_prep"
where
"date" >= date '2023-08-31 00:00:00'
and "date" < date '2023-08-31 00:00:00' + interval '60 day'
and "date" <= current_date - interval '1 day'
Ведь в исходном представлении events_prep события заканчиваются 30 августа.
Задача 8. Пересчитать заданные дни.
А что насчет управления макросом через переменные?
Попробуем пересчитать весь июль. Для этого надо задать начальную дату 1 июля и указать диапазон 31 день:
dbt build -s events_clean_v2 --vars "{start_date: 2023-07-01, days_max: 31}"
Столбец updated_at таблицы покажет, что данные за весь июль были только что обновлены.
А если просят обновить события только за 30 августа? Передадим переменную date:
dbt build -s events_clean_v2 --vars "date: 2023-08-30"
dbt показывает, что были обновлены лишь 3768 записей:
1 of 1 START sql incremental model dbt.events_clean_v2........[RUN]
1 of 1 OK created sql incremental model dbt.events_clean_v2...[INSERT 0 3768 in 4.64s]
Вот таким длинным путем мы пришли в сложному макросу, который делает процесс работы с тяжелыми инкрементальными таблицами максимально удобным, автоматизированным и оптимизированным под партицирование.
Кстати, такой подход можно применить не только к материализации моделей, но и к тестам качества больших таблиц. Сложные тесты на больших данных могут выполняться десятки минут, проверяя каждый день одни и те же исторические данные, которые не меняются.
В таком случае имеет смысл ограничить тестирование лишь последними свежими партициями, сузив его до одного месяца или даже нескольких дней. Такой подход позволяет высвободить много времени при запуске пайплайна и сэкономить деньги на вычислительных ресурсах.
Игорь
Старший аналитик
А это идея, экономить деньги я люблю. Возьмем за основу имеющийся дженерик-тест unique_key и создадим на его основе более умную версию, которая к тому же будет брать параметры из метаданных.
Задача 9. Создать автоматизированный тест уникального ключа.
Создадим вот такой дженерик-тест tests/generic/unique_key_meta.sql:
{% test unique_key_meta(model) %}
{# Test combination of columns declared as 'unique_key' meta field for uniqueness. #}
{# Optionally test supports 'testing' meta dictionary of models for parameters. #}
{%- set unique_key = get_meta_value(model, 'unique_key', none) -%}
{%- if execute -%}
{%- if unique_key is none -%}
{{ exceptions.raise_compiler_error(
"unique_key meta field is required for unique_key_meta test"
) }}
{%- endif -%}
{%- set unique_key_csv = unique_key | join(', ') -%}
{%- endif -%}
{# Extract 'testing' meta parameters. #}
{%- set testing = get_meta_value(model, 'testing', {}) -%}
{%- set days_max = testing.get('days_max', none) -%}
{%- set date_column = testing.get('date_column', 'date') -%}
with validation_errors as (
select
{{ unique_key_csv }}
from {{ model }}
{% if days_max -%}
{%- set start_date_query -%}
select max({{ date_column }}) - interval '{{ days_max }} day' from {{ model }}
{%- endset -%}
{%- set start_date = select_first_value(start_date_query) -%}
where {{ date_column }} >= date '{{ start_date }}'
{%- endif %}
group by {{ unique_key_csv }}
having count(*) > 1
)
select *
from validation_errors
{% endtest %}
Вот на что здесь стоит обратить внимание:
Тест проверяет уникальность заданного набора колонок (ключа), что особенно важно для дата-лейков, где поддержка уникальных ключей отсутствует. А значит, приходится заботиться о них самостоятельно, хотя бы тестируя каждую таблицу на отсутствие дублей.
Уникальный ключ задается в поле unique_key в метаданных модели. Такой подход позволяет убрать из теста дополнительные аргументы и задавать все параметры в централизованном виде (в meta), где к ним также смогут обратиться другие тесты.
По этой причине тест имеет только один аргумент model. В то время как оригинальный unique_key имел второй аргумент columns, который надо было задавать в конфигурации.
Тест также обращается к полю testing в метаданных модели и ищет там параметры days_max (количество последних дней для тестирования) и date_column (название колонки с датами).
На этапе компиляции тест обращается к базе и макросом select_first_value запрашивает дату начала, исходя из имеющихся данных. Это особенно важно для тестирования представлений, где неаккуратный запрос может привести к тому, что партицирование будет проигнорировано.
Задача 10. Внедрить тест в модель.
Привяжем тест к модели events_full, которая является представлением и содержит джойн двух таблиц. Поэтому постоянная проверка ключа на всю глубину может стоить дорого.
Вот как надо сконфигурировать модель в models/properties.yml, добавив все необходимые метаданные, включая уникальный ключ и глубину тестирования 60 дней:
- name: "events_full"
description: "User events enriched with meaningful types"
config:
materialized: "view"
meta:
unique_key: [ "user_id", "timestamp", "type_id" ]
testing:
days_max: 60
data_tests: [ "unique_key_meta" ]
Аналогичным образом теперь можно подключать новый тест к любым моделям проекта, не забывая правильно настроить метаданные.
Задача 11. Запустить улучшенный тест.
Запускаем наш продвинутый тест как обычно:
dbt test -s events_full
Проверка отрабатывает без ошибок и имеет лаконичное название:
1 of 2 START test events_full_is_complete ........ [RUN]
2 of 2 START test unique_key_meta_events_full_ ... [RUN]
1 of 2 PASS events_full_is_complete .............. [PASS in 4.94s]
2 of 2 PASS unique_key_meta_events_full_ ......... [PASS in 12.01s]
Кстати, даже в PostgreSQL он работает быстрее, чем исходный unique_key на 20-50%. Сказывается прежде всего ограниченный объем данных для тестирования. А уменьшая параметр testing.days_max в метаданных, модель можно еще больше ускорять.
Теперь по такой же логике можно разрабатывать другие дженерик-тесты и цеплять их на поле testing в метаданных, чтобы иметь единый центр управления тестами модели.
Задача 12. Зафиксировать результат.
Чтобы Марк не присвоил все лавры по создание таких крутых наработок, помечу хотя бы новую инкрементальную модель как свою. Для этого пригодятся все те же метаданные. Добавлю свое имя в поле owner, как учил тот блогер (файл models/properties.yml):
- name: "events_clean_v2"
description: "User events without duplicates"
config:
materialized: "incremental"
strategy: "merge"
unique_key: [ "user_id", "timestamp", "type_id" ]
meta:
owner: "Igor Dushnin"
incrementality:
start_date: "2023-06-01"
days_max: 60
Коммитим и пушим результат! Не терпится дождаться, обновления каталога данных.
Задача 13. Оценить метаданные в каталоге данных.
Вот так выглядит моя прекрасная модель events_clean_v2:
Сразу видно, кто ее создал и какая начальная дата в ней содержится, даже запрос писать не нужно.
А вот метаданные обновленной events_full:
Любой аналитик сходу поймет, какой у модели уникальный ключ! Это же супер полезно, особенно для дата-лейка, который сам про уникальные ключи ничего не знает.
Когда Марк увидит мои новые макросы, то обзавидуется. Подожду, когда он прибежит с просьбой все объяснить. Главное, чтобы я не был слишком занят.
Все получилось? Свериться с эталонным проектом можно здесь.
Это урок курса-симулятора по Data WhereHouse на базе dbt от Inzhenerka.Tech. В нем 10 задач. Эта статья выложена с целью вашей практики. Решайте больше задач :) Если у вас остануться вопросы, задавайте их в нашем сообществе. Скоро у нас планируются вебинары по dbt и Dagster