Введение

Привет, Хабр!

Меня зовут Марк Порошин, я занимаюсь DataScience в DV Group. Недавно я уже рассказывал про то, как начать трансформировать данные с помощью dbt. Сегодня я решил поделиться, как мы в DV Group поженили dbt, Greenplum и DataVault, собрали все грабли, что могли; немного поконтрибьютили в open-source, но по итогу остались очень довольны результатом.

Расскажу сначала пару слов о том, что такое DataVault. DataVault - методология построения хранилища, предполагающая высокую нормализацию данных (3ая нормальная форма). Основными ее компонентами являются:

  • hub - “сущность” хранит только первичный и бизнес-ключ;

  • satellite - “свойства сущности”, относятся многие к одному с хабом и хранит свойства сущности;

  • link - “связь между сущностями” - отношение многие ко многим между сущностями (не обязательно двумя).

Чтобы стало чуть понятнее, давайте рассмотрим пример. Предположим мы хотим хранить информацию о запусках рекламных кампаний. У нас есть данные о том, когда клиенты запускали кампанию для каких-то товаров. Как же в этом случае может выглядеть ER диаграмма?

er диаграмма
er диаграмма

Можно заметить, что в сателлитах есть поле effective_from и <entity>_hashdiff, благодаря которому в DavaVault реализуется SCD2, это дает возможность реализовывать “версионность” данных.

Больше почитать про Data Vault можно здесь:

  1. Документация

  2. статья на Хабре 1

  3. статья на Хабре 2

Прежде чем переходить к основной части, хочу дать поделиться статьей, потому что я начинал изучать эту тему именно с нее и во многом статьи будут пересекаться, но я хочу больше сконцентрироваться на деталях, а еще обсудить ошибки, которые мы совершили.

Постановка задачи

Из внешних источников данных мы периодически загружаем историю покупок пользователей в таблицу pure.pure_transactions на Greenplum и хотим преобразовать ее в структуру Data Vault, т.е. разбить данные на хабы, линки и сателлиты. Преобразование происходит в 3 этапа.

  1. Сначала нужно подготовить таблицу с данными, которые будут загружаться (мы будем выбирать данные за 1 день).

  2. Далее необходимо обогатить данные всеми необходимыми хешами, но об этом дальше.

  3. И, наконец, расщепить данные на сущности.

pipeline построения datavault
pipeline построения datavault

Таблица pure.pure_transactions описывает историю покупок пользователей с некоторой метаинформацией. К сожалению, показать ее полностью я не могу, но в рамках статьи нам необходимы только следующие поля:

  1. id транзакции(transaction_id);

  2. дата транзакции(transaction_date);

  3. цена товара(price);

  4. количество купленного товара(quantity);

  5. наименование товара(product_name);

  6. id категории товара(cat_id);

Мы выделили из этих данных две сущности

  1. транзакция или строчка в чеке (transaction_id);

  2. товар (product_name);

Теперь когда у нас есть представление о том, чего мы хотим и какие у нас данные, перейдем к самому интересному.

Адаптер для greenplum

Прежде чем начать писать dbt-код, хочу немного рассказать про особенности работы с Greenplum. Greenplum — база построена на основе Postgres, поэтому синтаксис SQL запросов практически полностью совпадает, но есть ряд значительных отличий, которые будут использоваться в дальнейшем, и которые стали причиной реализации отдельного адаптера для dbt. Подробнее про это можно почитать здесь. А еще хочу поделиться интересным докладом, он будет полезен всем, кто начинает работать Greenplum.

Функциональность адаптера

Важная особенность Greenplum — возможность указать поле дистрибьюции. По этому полю Greenplum будет “раскладывать” данные по сегментам и по этому же полю будут самые эффективные join-ы. Указать параметр можно следующим образом:

{{
    config(
				...
        distributed_by='<field_name>'
				...
    )
}}

Сжатие и колоночная ориентация

Greenplum предназначен для работы с большими данными, уменьшение времени на чтение/запись за счет сжатия является значительным фактором, который позволяет сократить время выполнения запроса. В dbt при использовании адаптера для Greenplum, это можно имплементировать следующим образом:

{{
    config(
        ...
        appendonly='true',
        orientation='column',
        compresstype='ZSTD',
        compresslevel=4,
        blocksize=32768
				...
    )
}}

Здесь мы указали параметр appendonly='true', он позволяет Greenplum создать таблицу оптимизированную для вставок. А еще мы добавили, что хотим использовать колоночную ориентацию orientation='column'. И, наконец, указывали тип сжатия compresstype='ZSTD', который хотим использовать и его параметры compresslevel=4blocksize=32768. Указанные значения являются параметрами по умолчанию — их можно не прописывать отдельно, если они для вас подходят.

Партиционирование

Последней важной особенностью является партиционирование, в postgres тоже есть эта возможность, но такой возможности нет у адаптера dbt-postgres(прошу поправить меня, если я ошибаюсь). Партиционирование позволяет разбить таблицу на несколько физических файлов по некоторому условию и читать только необходимые партиции. Из-за того что в Greenplum нельзя настраивать партиционирование, во время создания таблицы с помощью create table as select, реализация этой фичи получилась не очень симпатичной. Требуется указать строчку с определением полей и строчку с определением партиционирования:

{% set fields_string %}
     id int4 null,
     incomingdate timestamp NULL
{% endset %}


{% set raw_partition %}
    PARTITION BY RANGE (incomingdate)
    (
        START ('2021-01-01'::timestamp) INCLUSIVE
        END ('2023-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    );
{% endset %}

{{
    config(
				...
        fields_string=fields_string,
        raw_partition=raw_partition,
        default_partition_name='other_data'
				...
    )
}}

Построение DataVault

Raw

Сперва нужно выделить данные за день, для этого создали модель raw.raw_transaction:

{{
    config(
        schema='raw',
        materialized='table'
    )
}}


with transaction_day_dedup as (
		select * from (
			select *,
				   row_number() over (
				   	   partition by pa."transaction_id"
		               order by pa."savetime" asc
		           ) as rn
			from {{ source('pure', 'pure_transactions') }} pa
			where 
				'{{ var('raw_transactions')['start_date'] }}' <= transaction_date
				and 
				transaction_date < '{{ var('raw_transactions')['end_date'] }}'
		) as h
		where rn = 1
	)
select
		"transaction_id" as transaction_id,
		"transaction_date" as transaction_date,
		"price" as price,
		"quantity" as quantity,
		"product_name" as product_name,
		"cat_id" as cat_id,
    ...
    'PURE_TRANSACTIONS' as record_source
from transaction_day_dedup ra

Здесь мы с помощью CTE выбирали данные за один день и дедублицировали по полю transaction_id. После запуска модели в таблице raw.raw_transaction у нас оказались данные за 1 день, если указать соответствующие переменные 
var('raw_transactions')['start_date'] и 
var('raw_transactions')['end_date']:

vars:
  raw_transactions:
    start_date: '2022-01-01 00:00:00.0'
    end_date: '2022-01-02 00:00:00.0'

Stage

К данным в таблице raw.raw_transaction добавили первичные ключи, которые будут использоваться в сущностях DataVault.

Мы использовали пакет dbtvault (его пришлось немного доработать, чтобы он поддерживал последнюю версию dbt). Он позволяет сократить количество boilerplate кода.

Чтобы установить необходимые зависимости мы добавили в корень dbt проекта файл package.yml со следующим содержанием:

packages:
  - git: "https://github.com/markporoshin/dbtvault.git"
    revision: develop

и вызвать команду:

dbt deps

После этого у вас появится папка dbt_packages, в которой будут находиться исходники установленных пакетов.

В модели stage_transactions мы завели переменную yaml_metadata и указываем поля, которые станут основой для ключей. Их существует два типа:

  1. Первичные ключи сущностей: хабы и линки;

  2. HASHDIFF — хеши для отслеживания изменений в свойствах сущности, которые строятся из полей сателлита.

{{
    config(
        schema='stage',
        materialized='table',
    )
}}


{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
  LOAD_DATE: (SAVETIME + 1 * INTERVAL '1 day')
  EFFECTIVE_FROM: 'SAVETIME'
hashed_columns:
  TRANSACTION_PK:
    - 'transaction_id'
  TRANSACTION_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'price'
      - 'quantity'
      - 'transaction_date'
  PRODUCT_PK:
    - 'product_name'
  PRODUCT_HASHDIFF:
    is_hashdiff: true
    columns:
	    - 'cat_id'
  LINK_TRANSACTION_PRODUCT_PK:
    - 'transaction_id'
    - 'product_name'
	...
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{% set source_model = metadata_dict['source_model'] %}

{% set derived_columns = metadata_dict['derived_columns'] %}

{% set hashed_columns = metadata_dict['hashed_columns'] %}

{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}

Результатом вызова модели получится таблица stage.stage_transactions в базе данных со следующими полями:

  1. transaction_id

  2. transaction_date

  3. price

  4. quantity

  5. product_name

  6. cat_id

  7. transaction_pk

  8. transaction_hashdiff

  9. product_pk

  10. product_hashdiff

  11. link_transaction_product_pk

  12. load_date

  13. effective_from

  14. record_source

Теперь у нас есть все необходимые хеши для того чтобы строить хранилище в методологии DataVault.

Создание Хаба

Рассмотрим создание хаба на примере сущности “продукт”, из исходных данных у нас есть его название cleanedname, которое является бизнес-ключом(natural key), на stage стадии мы создали первичный ключ product_pk, а также поле classid, которое является его свойством.

Модель хаба product будет выглядеть следующий образом:

{{
    config(
        schema='raw_vault',
        materialized='incremental',
        distributed_by='product_pk',
    )
}}


{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_nk = "cleanedname" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}

{{ config(schema='raw_vault') }}

{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
                src_source=src_source, source_model=source_model) }}

В начале мы описываем конфигурацию модели указываем схему, тип материализации и ключ дистрибьюции. После этого определяем переменные:

  1. source_model — таблицу источник, из которой будет происходить выгрузка данных для пополнения хаба;

  2. src_pk — название поля, в котором хранится первичный ключ хаба;

  3. src_nk — название поля, в котором хранится бизнес-ключ хаба;

  4. load_date — название поля с датой загрузки данных;

  5. src_source — название поля, в котором хранится наименование источника данных.

И вызываем макрос для генерации кода. В результате появится таблица с следующим DDL:

CREATE TABLE raw_vault.h_product (
	product_pk text NULL,
	cleanedname text NULL,
	load_date text NULL,
	record_source unknown NULL
)
WITH (
	appendonly=true,
	blocksize=32768,
	orientation=column,
	compresstype=zstd,
	compresslevel=4
)
DISTRIBUTED BY (product_pk);

Рассмотрим еще один пример модели хаба. Дело в том, что сущность "транзакция"(сточка чека), в отличие от продукта однозначно относится со временем, когда совершили покупку, и хочется добавить поле incomingdate для того чтобы реализовать партиционирование по нему. У нас на данный момент число транзакций превысило миллиард и обновление хаба без партиционирования занимает несколько часов.

{% set fields_string %}
    transaction_pk text NULL,
    load_date text NULL,
    record_source text NULL,
    transaction_id text NULL,
    transaction_date timestamp NULL
{% endset %}


{% set raw_partition %}
    PARTITION BY RANGE (transaction_date)
    (
        START ('2020-01-01'::timestamp) INCLUSIVE
        END ('2028-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    );
{% endset %}

{{
    config(
        schema='raw_vault',
        materialized='incremental',
        compresslevel=4,
        distributed_by='transaction_pk',
        fields_string=fields_string,
        raw_partition=raw_partition
    )
}}


{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_nk = "transaction_date" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["transaction_date"] -%}
{%- set partition_cause = "'" + var('h_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('h_transaction')['start_date'] + "'" -%}


{{ config(schema='raw_vault') }}

{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
                src_source=src_source, source_model=source_model,
                src_extra=src_extra, partition_cause=partition_cause) }}

Это уже не совсем соответствует подходу DataVault, но следование ему в точности обходилось бы слишком дорого.

DDL модели h_transaction:

CREATE TABLE raw_vault.h_transaction (
	transaction_pk text NULL,
	load_date text NULL,
	record_source text NULL,
	transaction_id text NULL,
	transaction_date timestamp NULL
)
WITH (
	appendonly=true,
	blocksize=32768,
	orientation=column,
	compresstype=zstd,
	compresslevel=4
)
DISTRIBUTED BY (transaction_pk)
PARTITION BY RANGE(transaction_date) 
(
START ('2020-01-01 00:00:00'::timestamp without time zone) END ('2028-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval) WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4') 
          COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4), 
DEFAULT PARTITION extra  WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4') 
          COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) 
          COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
);

Создание сателлита

У хаба продуктов есть свойство cat_id, поэтому мы создали сателлит для его хранения:

{{
    config(
        schema='raw_vault',
        materialized='incremental',
        distributed_by='product_pk',
    )
}}


{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_hashdiff = "product_hashdiff" -%}
{%- set src_payload = ["cat_id, productname"] -%}
{%- set src_eff = "effective_from" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}


{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
                src_payload=src_payload, src_eff=src_eff,
                src_ldts=src_ldts, src_source=src_source,
                source_model=source_model) }}

Модель выглядит практически также, как их хаб за исключением трех дополнительных переменных:

  1. src_hashdiff — название поля, хранящие хеш данного набора свойств;

  2. src_eff — название поля, хранящее дату, с которой данный кортеж актуален;

  3. src_payload — список полей, которые составляют свойства данной сущности.

Аналогично с хабом нам потребовалось внедрить партиционирования для сателлитов, для этого рассмотрим сателлит для сущности транзакция:

{% set fields_string %}
	  transaction_pk text NULL,
		transaction_id text NULL,
		transaction_hashdiff text NULL,
		price float4 NULL,
		quantity float4 NULL,
		transaction_date timestamp NULL,
		load_date text NULL,
		record_source text NULL
{% endset %}


{% set raw_partition %}
    PARTITION BY RANGE (transaction_date)
    (
        START ('2020-01-01'::timestamp) INCLUSIVE
        END ('2028-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    );
{% endset %}

{{
    config(
        schema='raw_vault',
        materialized='incremental',
        compresslevel=4,
        distributed_by='transaction_pk',
        fields_string=fields_string,
        raw_partition=raw_partition
    )
}}

{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_hashdiff = "transaction_hashdiff" -%}
{%- set src_payload = [
    "price",
    "quantity",
    "itemsum",
    "transaction_id",
    "transaction_date"
] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}


{{ config(schema='raw_vault') }}

{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
                src_payload=src_payload, src_eff=src_eff,
                src_ldts=src_ldts, src_source=src_source,
                source_model=source_model, partition_cause=partition_cause) }}

Создание линки

Теперь, когда у нас есть два хаба и сателлиты к ним, осталось только создать таблицу, связывающую их. В роли такой сущности в DavaVault служат ссылки.

{% set fields_string %}
    link_transaction_product_pk text NULL,
    transaction_pk text NULL,
    product_pk text NULL,
    load_date text NULL,
    record_source text NULL,
    transaction_date timestamp NULL
{% endset %}


{% set raw_partition %}
    PARTITION BY RANGE (transaction_date)
    (
        START ('2020-01-01'::timestamp) INCLUSIVE
        END ('2028-01-01'::timestamp) EXCLUSIVE
        EVERY (INTERVAL '1 day'),
        DEFAULT PARTITION extra
    );
{% endset %}

{{
    config(
        schema='raw_vault',
        materialized='incremental',
        compresslevel=4,
        distributed_by='link_transaction_product_pk',
        fields_string=fields_string,
        raw_partition=raw_partition
    )
}}


{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "link_transaction_product_pk" -%}
{%- set src_fk = ["transaction_pk", "product_pk"] -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["incomingdate"] -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}

{{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
                 src_source=src_source, source_model=source_model,
                src_extra=src_extra, partition_cause=partition_cause) }}

В методологии DavaVault есть возможность создавать сателлиты для link, однако кажется, что если у связи есть свойства, значит можно выделить еще одну сущность. Кстати, насчет связей между более чем двумя сущностями — лично я пришел к выводу, что лучше избегать подобные структуры без острой необходимости, поскольку последующие запросы получаются нетривиальными особенно если в подобной связи встречаются NULL.

Выводы

Я надеюсь, что мне удалось вас убедить, что применение dbt в связке с datavault позволяет сильно облегчить построение хранилища DavaVault. Я буду очень рад замечаниям, вопросам и комментариям, а так же приглашаю присоединиться к улучшению адаптера для Greenplum =)

Хочу подвести небольшой итог и выделить основные рекомендации, которые я могу дать на основе совершенных ошибок:

  1. Используйте партиционирование везде, где это возможно;

  2. Изучите SQL запросы, которые компилирует dbtvault, поскольку иногда их можно оптимизировать в контексте вашей задачи;

  3. В greenplum есть потрясающая фича external table так что можно отказаться от хранения исходных данных (pure схема в статье).

У меня осталось непокрытой последняя тема о том, как автоматизировать процесс наполнения хранилища с помощью Dagster(развитие Airflow от его же создателей). Если тема актуальна, пишите в комментарии, я расскажу о ней.

Комментарии (2)


  1. gadfly_02
    17.06.2022 12:12

    Привет, спасибо за статью. Как реализовали инкрементальную загрузку, на основе снепшотов или что-то своё накрутили? Формируете ли таблицу ключей, на основе которой строится таблица сравнения для последующего инкрементельного обновления? Если да, то очень интересно, как реализовывали на dbt.


    1. p0mami Автор
      17.06.2022 12:24

      Привет, dbtvault использует материализацию incremental и при первой загрузке генерит один sql запрос (просто раскидывает по сущностям), при повторной загрузке он уже загружает
      - только новые хабы
      - только новые линки
      - только те сателлиты у которых другой hash_diff
      Конкретно в мы отказались от версионности сателлитов и подправили sql запрос, чтобы сохранялась только первая версия, связано это с тем, что это раздувает таблицу и усложняет запросы(надо выбрать самые актуальные), а в нашем случае профита не приносит.

      Если говорить про архитектуру, то "грязные" данные хранились на кластере кликхауса и мы с помощью Dagster'a создавали в нем витрину с теми данными, которые хотим залить в datavault. На стороне гринплама была external table на эту витрину. И тем же дагстером запускали все стадии построения DataVault

      Те пайплан выглядит примерно следующим образом:
      clickhouse->pure_greenplum->raw_greenplum->stage_greenplum->raw_vault_greenplum


      Лучше всего попробовать запустить на синтетических данных и посмотреть в target какие sql запросы получились

      Приведу пример для хаба:

      -- Generated by dbtvault.
      
      WITH row_rank_1 AS (
      
      SELECT * FROM (
          SELECT PRODUCT_PK, productname, LOAD_DATE, RECORD_SOURCE,
                 ROW_NUMBER() OVER(
                     PARTITION BY PRODUCT_PK
                     ORDER BY LOAD_DATE ASC
                 ) AS row_number
          FROM src_table
      
      ) as h
      WHERE row_number = 1
      ),
      
      records_to_insert AS (
          SELECT a.PRODUCT_PK, a.productname, a.LOAD_DATE, a.RECORD_SOURCE
          FROM row_rank_1 AS a
          LEFT JOIN "dvault"."dv_raw_vault"."h_product" AS d
          ON a.PRODUCT_PK = d.PRODUCT_PK
          WHERE d.PRODUCT_PK IS NULL
      )
      
      SELECT * FROM records_to_insert