Введение
Привет, Хабр!
Меня зовут Марк Порошин, я занимаюсь DataScience в DV Group. Недавно я уже рассказывал про то, как начать трансформировать данные с помощью dbt. Сегодня я решил поделиться, как мы в DV Group поженили dbt, Greenplum и DataVault, собрали все грабли, что могли; немного поконтрибьютили в open-source, но по итогу остались очень довольны результатом.
Расскажу сначала пару слов о том, что такое DataVault. DataVault - методология построения хранилища, предполагающая высокую нормализацию данных (3ая нормальная форма). Основными ее компонентами являются:
hub - “сущность” хранит только первичный и бизнес-ключ;
satellite - “свойства сущности”, относятся многие к одному с хабом и хранит свойства сущности;
link - “связь между сущностями” - отношение многие ко многим между сущностями (не обязательно двумя).
Чтобы стало чуть понятнее, давайте рассмотрим пример. Предположим мы хотим хранить информацию о запусках рекламных кампаний. У нас есть данные о том, когда клиенты запускали кампанию для каких-то товаров. Как же в этом случае может выглядеть ER диаграмма?
Можно заметить, что в сателлитах есть поле effective_from
и <entity>_hashdiff
, благодаря которому в DavaVault реализуется SCD2, это дает возможность реализовывать “версионность” данных.
Больше почитать про Data Vault можно здесь:
Прежде чем переходить к основной части, хочу дать поделиться статьей, потому что я начинал изучать эту тему именно с нее и во многом статьи будут пересекаться, но я хочу больше сконцентрироваться на деталях, а еще обсудить ошибки, которые мы совершили.
Постановка задачи
Из внешних источников данных мы периодически загружаем историю покупок пользователей в таблицу pure.pure_transactions
на Greenplum и хотим преобразовать ее в структуру Data Vault, т.е. разбить данные на хабы, линки и сателлиты. Преобразование происходит в 3 этапа.
Сначала нужно подготовить таблицу с данными, которые будут загружаться (мы будем выбирать данные за 1 день).
Далее необходимо обогатить данные всеми необходимыми хешами, но об этом дальше.
И, наконец, расщепить данные на сущности.
Таблица pure.pure_transactions
описывает историю покупок пользователей с некоторой метаинформацией. К сожалению, показать ее полностью я не могу, но в рамках статьи нам необходимы только следующие поля:
id транзакции(
transaction_id
);дата транзакции(
transaction_date
);цена товара(
price
);количество купленного товара(
quantity
);наименование товара(
product_name
);id категории товара(
cat_id
);
Мы выделили из этих данных две сущности
транзакция или строчка в чеке (
transaction_id
);товар (
product_name
);
Теперь когда у нас есть представление о том, чего мы хотим и какие у нас данные, перейдем к самому интересному.
Адаптер для greenplum
Прежде чем начать писать dbt-код, хочу немного рассказать про особенности работы с Greenplum. Greenplum — база построена на основе Postgres, поэтому синтаксис SQL запросов практически полностью совпадает, но есть ряд значительных отличий, которые будут использоваться в дальнейшем, и которые стали причиной реализации отдельного адаптера для dbt. Подробнее про это можно почитать здесь. А еще хочу поделиться интересным докладом, он будет полезен всем, кто начинает работать Greenplum.
Функциональность адаптера
Важная особенность Greenplum — возможность указать поле дистрибьюции. По этому полю Greenplum будет “раскладывать” данные по сегментам и по этому же полю будут самые эффективные join-ы. Указать параметр можно следующим образом:
{{
config(
...
distributed_by='<field_name>'
...
)
}}
Сжатие и колоночная ориентация
Greenplum предназначен для работы с большими данными, уменьшение времени на чтение/запись за счет сжатия является значительным фактором, который позволяет сократить время выполнения запроса. В dbt при использовании адаптера для Greenplum, это можно имплементировать следующим образом:
{{
config(
...
appendonly='true',
orientation='column',
compresstype='ZSTD',
compresslevel=4,
blocksize=32768
...
)
}}
Здесь мы указали параметр appendonly='true'
, он позволяет Greenplum создать таблицу оптимизированную для вставок. А еще мы добавили, что хотим использовать колоночную ориентацию orientation='column'
. И, наконец, указывали тип сжатия compresstype='ZSTD'
, который хотим использовать и его параметры compresslevel=4
, blocksize=32768
. Указанные значения являются параметрами по умолчанию — их можно не прописывать отдельно, если они для вас подходят.
Партиционирование
Последней важной особенностью является партиционирование, в postgres тоже есть эта возможность, но такой возможности нет у адаптера dbt-postgres(прошу поправить меня, если я ошибаюсь). Партиционирование позволяет разбить таблицу на несколько физических файлов по некоторому условию и читать только необходимые партиции. Из-за того что в Greenplum нельзя настраивать партиционирование, во время создания таблицы с помощью create table as select
, реализация этой фичи получилась не очень симпатичной. Требуется указать строчку с определением полей и строчку с определением партиционирования:
{% set fields_string %}
id int4 null,
incomingdate timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (incomingdate)
(
START ('2021-01-01'::timestamp) INCLUSIVE
END ('2023-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
...
fields_string=fields_string,
raw_partition=raw_partition,
default_partition_name='other_data'
...
)
}}
Построение DataVault
Raw
Сперва нужно выделить данные за день, для этого создали модель raw.raw_transaction
:
{{
config(
schema='raw',
materialized='table'
)
}}
with transaction_day_dedup as (
select * from (
select *,
row_number() over (
partition by pa."transaction_id"
order by pa."savetime" asc
) as rn
from {{ source('pure', 'pure_transactions') }} pa
where
'{{ var('raw_transactions')['start_date'] }}' <= transaction_date
and
transaction_date < '{{ var('raw_transactions')['end_date'] }}'
) as h
where rn = 1
)
select
"transaction_id" as transaction_id,
"transaction_date" as transaction_date,
"price" as price,
"quantity" as quantity,
"product_name" as product_name,
"cat_id" as cat_id,
...
'PURE_TRANSACTIONS' as record_source
from transaction_day_dedup ra
Здесь мы с помощью CTE выбирали данные за один день и дедублицировали по полю transaction_id
. После запуска модели в таблице raw.raw_transaction
у нас оказались данные за 1 день, если указать соответствующие переменные var('raw_transactions')['start_date']
и var('raw_transactions')['end_date']
:
vars:
raw_transactions:
start_date: '2022-01-01 00:00:00.0'
end_date: '2022-01-02 00:00:00.0'
Stage
К данным в таблице raw.raw_transaction
добавили первичные ключи, которые будут использоваться в сущностях DataVault.
Мы использовали пакет dbtvault (его пришлось немного доработать, чтобы он поддерживал последнюю версию dbt). Он позволяет сократить количество boilerplate кода.
Чтобы установить необходимые зависимости мы добавили в корень dbt проекта файл package.yml
со следующим содержанием:
packages:
- git: "https://github.com/markporoshin/dbtvault.git"
revision: develop
и вызвать команду:
dbt deps
После этого у вас появится папка dbt_packages, в которой будут находиться исходники установленных пакетов.
В модели stage_transactions
мы завели переменную yaml_metadata
и указываем поля, которые станут основой для ключей. Их существует два типа:
Первичные ключи сущностей: хабы и линки;
HASHDIFF — хеши для отслеживания изменений в свойствах сущности, которые строятся из полей сателлита.
{{
config(
schema='stage',
materialized='table',
)
}}
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
LOAD_DATE: (SAVETIME + 1 * INTERVAL '1 day')
EFFECTIVE_FROM: 'SAVETIME'
hashed_columns:
TRANSACTION_PK:
- 'transaction_id'
TRANSACTION_HASHDIFF:
is_hashdiff: true
columns:
- 'price'
- 'quantity'
- 'transaction_date'
PRODUCT_PK:
- 'product_name'
PRODUCT_HASHDIFF:
is_hashdiff: true
columns:
- 'cat_id'
LINK_TRANSACTION_PRODUCT_PK:
- 'transaction_id'
- 'product_name'
...
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
Результатом вызова модели получится таблица stage.stage_transactions
в базе данных со следующими полями:
transaction_id
transaction_date
price
quantity
product_name
cat_id
transaction_pk
transaction_hashdiff
product_pk
product_hashdiff
link_transaction_product_pk
load_date
effective_from
record_source
…
Теперь у нас есть все необходимые хеши для того чтобы строить хранилище в методологии DataVault.
Создание Хаба
Рассмотрим создание хаба на примере сущности “продукт”, из исходных данных у нас есть его название cleanedname
, которое является бизнес-ключом(natural key), на stage стадии мы создали первичный ключ product_pk
, а также поле classid
, которое является его свойством.
Модель хаба product будет выглядеть следующий образом:
{{
config(
schema='raw_vault',
materialized='incremental',
distributed_by='product_pk',
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_nk = "cleanedname" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
В начале мы описываем конфигурацию модели указываем схему, тип материализации и ключ дистрибьюции. После этого определяем переменные:
source_model
— таблицу источник, из которой будет происходить выгрузка данных для пополнения хаба;src_pk
— название поля, в котором хранится первичный ключ хаба;src_nk
— название поля, в котором хранится бизнес-ключ хаба;load_date
— название поля с датой загрузки данных;src_source
— название поля, в котором хранится наименование источника данных.
И вызываем макрос для генерации кода. В результате появится таблица с следующим DDL:
CREATE TABLE raw_vault.h_product (
product_pk text NULL,
cleanedname text NULL,
load_date text NULL,
record_source unknown NULL
)
WITH (
appendonly=true,
blocksize=32768,
orientation=column,
compresstype=zstd,
compresslevel=4
)
DISTRIBUTED BY (product_pk);
Рассмотрим еще один пример модели хаба. Дело в том, что сущность "транзакция"(сточка чека), в отличие от продукта однозначно относится со временем, когда совершили покупку, и хочется добавить поле incomingdate
для того чтобы реализовать партиционирование по нему. У нас на данный момент число транзакций превысило миллиард и обновление хаба без партиционирования занимает несколько часов.
{% set fields_string %}
transaction_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_id text NULL,
transaction_date timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='transaction_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_nk = "transaction_date" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["transaction_date"] -%}
{%- set partition_cause = "'" + var('h_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('h_transaction')['start_date'] + "'" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model,
src_extra=src_extra, partition_cause=partition_cause) }}
Это уже не совсем соответствует подходу DataVault, но следование ему в точности обходилось бы слишком дорого.
DDL модели h_transaction
:
CREATE TABLE raw_vault.h_transaction (
transaction_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_id text NULL,
transaction_date timestamp NULL
)
WITH (
appendonly=true,
blocksize=32768,
orientation=column,
compresstype=zstd,
compresslevel=4
)
DISTRIBUTED BY (transaction_pk)
PARTITION BY RANGE(transaction_date)
(
START ('2020-01-01 00:00:00'::timestamp without time zone) END ('2028-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval) WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')
COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4),
DEFAULT PARTITION extra WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')
COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
);
Создание сателлита
У хаба продуктов есть свойство cat_id
, поэтому мы создали сателлит для его хранения:
{{
config(
schema='raw_vault',
materialized='incremental',
distributed_by='product_pk',
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_hashdiff = "product_hashdiff" -%}
{%- set src_payload = ["cat_id, productname"] -%}
{%- set src_eff = "effective_from" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model) }}
Модель выглядит практически также, как их хаб за исключением трех дополнительных переменных:
src_hashdiff
— название поля, хранящие хеш данного набора свойств;src_eff
— название поля, хранящее дату, с которой данный кортеж актуален;src_payload
— список полей, которые составляют свойства данной сущности.
Аналогично с хабом нам потребовалось внедрить партиционирования для сателлитов, для этого рассмотрим сателлит для сущности транзакция:
{% set fields_string %}
transaction_pk text NULL,
transaction_id text NULL,
transaction_hashdiff text NULL,
price float4 NULL,
quantity float4 NULL,
transaction_date timestamp NULL,
load_date text NULL,
record_source text NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='transaction_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_hashdiff = "transaction_hashdiff" -%}
{%- set src_payload = [
"price",
"quantity",
"itemsum",
"transaction_id",
"transaction_date"
] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model, partition_cause=partition_cause) }}
Создание линки
Теперь, когда у нас есть два хаба и сателлиты к ним, осталось только создать таблицу, связывающую их. В роли такой сущности в DavaVault служат ссылки.
{% set fields_string %}
link_transaction_product_pk text NULL,
transaction_pk text NULL,
product_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_date timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='link_transaction_product_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "link_transaction_product_pk" -%}
{%- set src_fk = ["transaction_pk", "product_pk"] -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["incomingdate"] -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}
{{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model,
src_extra=src_extra, partition_cause=partition_cause) }}
В методологии DavaVault есть возможность создавать сателлиты для link, однако кажется, что если у связи есть свойства, значит можно выделить еще одну сущность. Кстати, насчет связей между более чем двумя сущностями — лично я пришел к выводу, что лучше избегать подобные структуры без острой необходимости, поскольку последующие запросы получаются нетривиальными особенно если в подобной связи встречаются NULL.
Выводы
Я надеюсь, что мне удалось вас убедить, что применение dbt в связке с datavault позволяет сильно облегчить построение хранилища DavaVault. Я буду очень рад замечаниям, вопросам и комментариям, а так же приглашаю присоединиться к улучшению адаптера для Greenplum =)
Хочу подвести небольшой итог и выделить основные рекомендации, которые я могу дать на основе совершенных ошибок:
Используйте партиционирование везде, где это возможно;
Изучите SQL запросы, которые компилирует dbtvault, поскольку иногда их можно оптимизировать в контексте вашей задачи;
В greenplum есть потрясающая фича
external table
так что можно отказаться от хранения исходных данных (pure
схема в статье).
У меня осталось непокрытой последняя тема о том, как автоматизировать процесс наполнения хранилища с помощью Dagster(развитие Airflow от его же создателей). Если тема актуальна, пишите в комментарии, я расскажу о ней.
gadfly_02
Привет, спасибо за статью. Как реализовали инкрементальную загрузку, на основе снепшотов или что-то своё накрутили? Формируете ли таблицу ключей, на основе которой строится таблица сравнения для последующего инкрементельного обновления? Если да, то очень интересно, как реализовывали на dbt.
p0mami Автор
Привет, dbtvault использует материализацию
incremental
и при первой загрузке генерит один sql запрос (просто раскидывает по сущностям), при повторной загрузке он уже загружает- только новые хабы
- только новые линки
- только те сателлиты у которых другой hash_diff
Конкретно в мы отказались от версионности сателлитов и подправили sql запрос, чтобы сохранялась только первая версия, связано это с тем, что это раздувает таблицу и усложняет запросы(надо выбрать самые актуальные), а в нашем случае профита не приносит.
Если говорить про архитектуру, то "грязные" данные хранились на кластере кликхауса и мы с помощью Dagster'a создавали в нем витрину с теми данными, которые хотим залить в datavault. На стороне гринплама была
external table
на эту витрину. И тем же дагстером запускали все стадии построения DataVaultТе пайплан выглядит примерно следующим образом:
clickhouse->pure_greenplum->raw_greenplum->stage_greenplum->raw_vault_greenplum
Лучше всего попробовать запустить на синтетических данных и посмотреть в target какие sql запросы получились
Приведу пример для хаба: