Всем привет! На связи Кирилл Козлов, data‑инженер Mindbox. Наша команда регулярно пересчитывает бизнес‑метрики для клиентов. Для этого нам приходится формировать витрины данных для биллинга и аналитики на основе десятков источников. 

Долгое время мы обрабатывали данные для расчетов на PySpark — инструменте, с которым сложно работать без опыта программирования на Python. Чтобы создать любой пайплайн, приходилось привлекать разработчиков. Это затягивало процесс на несколько недельных спринтов.

В статье расскажу, как мы построили внутреннюю data‑платформу, где аналитик или продакт может создать регулярно обновляемый пайплайн, описав его в четырех YAML‑файлах.

Почему PySpark тормозил создание пайплайнов

Опишу нашу боль на примере типичного пайплайна — элементарного подсчета MAU.

На первый взгляд, задачу можно было бы решить с помощью простого SQL‑запроса — COUNT(DISTINCT customerId) по нескольким таблицам за период. Но из‑за инфраструктурного обвеса — PySpark, Airflow DAG, Spark‑ресурсы, тесты — пришлось отдать задачу разработчикам. В итоге мы ждали пайплайн подсчета MAU целую неделю.

На подготовку расчета каждой новой метрики уходило от 1 до 3 недель. И каждый раз приходилось повторять один и тот же сценарий: 

  1. Аналитик формулировал бизнес‑требования, искал свободного разработчика, передавал ему контекст задачи. 

  2. Разработчик уточнял детали, писал код на Python, проходил ревью, настраивал DAG и деплоил результат.

Мы же хотели ускорить этот процесс и решать такие задачи силами аналитиков и менеджеров продукта, которые лучше всех понимают бизнес‑логику, свободно владеют SQL и YAML, но не работают с Python и PySpark.

Раньше так создавали пайплайны на PySpark
Раньше так создавали пайплайны на PySpark

Чем заменили PySpark: Python не нужен, достаточно YAML и SQL

Чтобы использовать декларативный подход для создания пайплайнов, мы разделили весь data‑слой на три части и для каждой подобрали инструмент:

  1. dlt (data load tool) загружает данные из внешних API и БД в объектное хранилище. Конфигурируется YAML‑файлом без кода.

  2. dbt (data build tool) on Trino преобразует данные с помощью SQL. Связывает модели через ref(), сам строит граф зависимостей и управляет инкрементальными обновлениями.

  3. Airflow + Cosmos оркестрирует пайплайны. DAG для Airflow генерируется автоматически из dag.yaml и dbt‑проекта.

Trino мы уже использовали как query engine для ad‑hoc‑запросов и подключали его к Superset для BI. Он уже тогда показал себя хорошо: для запросов с базовой логикой обрабатывал огромные объемы данных быстрее и с меньшим потреблением ресурсов, чем Spark. Кроме того, Trino из коробки поддерживает федеративный доступ к нескольким хранилищам из одного SQL‑запроса. Для 90% наших пайплайнов Trino подходил отлично.

Взаимодействие новых инструментов с потоками данных в нашей data-платформе
Взаимодействие новых инструментов с потоками данных в нашей data-платформе

Как загружаем данные: dlt.yaml

Первый yaml‑файл описывает, откуда и как загружать данные для дальнейшей обработки. Вот реальный пример того, как загружаются данные биллинга из внутреннего API:

product: sg-team
feature: billing
schema: billing_tarification
dag:
  dag_id: dlt_billing_tarification
  schedule: "0 4 * * *"
  description: "Daily refresh of tarification data"
  tags:
    - billing

alerts:
  enabled: true
  severity: warning

source:
  type: rest_api
  client:
    base_url: "https://internal-api.example.com"
    auth:
      type: bearer
      token: dlt-billing.token

  resources:
    - name: tarification_data
      endpoint:
        path: /tarificationData
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
          pricingPlanLine: CurrentPlan
      write_disposition: replace
      processing_steps:
        - map: dlt_custom.billing_tarification_data.map

    - name: charges_raw
      columns:
        staffUserName:
          data_type: text
          nullable: true
      endpoint:
        path: /data-feed/charges
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

    - name: discounts_raw
      endpoint:
        path: /data-feed/discounts
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

В файле описаны четыре ресурса из одного API. Для каждого указывается endpoint, параметры запроса и стратегии записи: в нашем случае replace означает «перезаписывать каждый раз». В этом конфиге можно добавлять шаги обработки processing_steps, описывать типы колонок и настраивать алерты.

В результате у нас всего 40 строк YAML‑конфига. Без dlt каждый такой коннектор — это Python‑скрипт с запросами, обработкой пагинации, ретраями, сериализацией в Delta Table формат и загрузкой в хранилище.

Как трансформируем данные с помощью SQL: dbt_project.yaml и sources.yaml

Следующим шагом конфигурируем dbt‑модель. Для Trino это SQL‑запросы.

Приведу пример конфигурации для расчета MAU. Так выглядит подготовка событий из одного источника:

-- int_mau_events_visits.sql
{{ config(materialized='table') }}

WITH period AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

merged_customers AS (
    SELECT * FROM {{ ref('int_merged_customers') }}
),

actual_customers AS (
    SELECT * FROM {{ ref('int_actual_customers') }}
),

events AS (
    SELECT
        src._tenant,
        src.unmergedCustomerId,
        'visits' AS src_type,
        src.endpoint
    FROM {{ source('final', 'customerstracking_visits') }} src
    CROSS JOIN period p
    WHERE src.unmergedCustomerId IS NOT NULL
        AND ((src.timestamp_year > p.start_year)
             OR (src.timestamp_year = p.start_year
                 AND src.timestamp_month >= p.start_month))
        AND ((src.timestamp_year < p.end_year)
             OR (src.timestamp_year = p.end_year
                 AND src.timestamp_month <= p.end_month))
),

events_with_customer AS (
    SELECT
        e._tenant,
        COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
        e.src_type,
        e.endpoint
    FROM events e
    LEFT JOIN merged_customers mc
        ON e._tenant = mc._tenant
        AND e.unmergedCustomerId = mc.unmergedCustomerId
)

SELECT
    ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
    SELECT 1 FROM actual_customers ac
    WHERE ewc._tenant = ac._tenant AND ewc.customerId = ac.customerId
)

Все 10 источников событий построены по одному и тому же принципу. Меняется только таблица‑источник и фильтры. Далее модели собираются в единый поток:

-- int_mau_events.sql (объединение всех источников)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...и ещё 6 источников

И наконец финальная витрина, где все считается:

-- mau_period_datamart.sql
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}

{%- set months_back = var('months_back', 5) | int -%}

WITH period AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

events_resolved AS (
    SELECT * FROM {{ ref('int_mau_events') }}
),

metrics_by_tenant AS (
    SELECT
        er._tenant,
        COUNT(DISTINCT CASE WHEN src_type = 'visits'
              THEN customerId END) AS CustomersTracking_Visits,
        COUNT(DISTINCT CASE WHEN src_type = 'orders'
              THEN customerId END) AS ProcessingOrders_Orders,
        COUNT(DISTINCT CASE WHEN src_type = 'mailings'
              THEN customerId END) AS Mailings_MessageStatuses,
        -- ...остальные метрики
        COUNT(DISTINCT customerId) AS MAU
    FROM events_resolved er
    GROUP BY er._tenant
)

SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p

Для конфигурации витрины используем incremental_strategy='merge'. В этом случае dbt сам генерирует merge-запрос, подставляя unique_key для upsert. Не нужно вручную реализовывать инкрементальную загрузку.

Чтобы связать модели в один проект, собираем конфиг dbt_project.yaml:

name: mau_period
version: '1.0.0'
models:
  mau_period:
    +on_table_exists: replace
    +on_schema_change: append_new_columns

И файл sources.yaml, который описывает входные таблицы:

sources:
  - name: final
    database: data_platform
    schema: final
    tables:
      - name: inapps_targetings_v2
      - name: inapps_clicks_v2
      - name: customerstracking_visits
      - name: processingorders_orders
      - name: cdp_mergedcustomers_v2
      # ...

В итоге получаем ту же бизнес-логику, что и в PySpark, но на чистом SQL:

sources.yaml вместо typedspark-схем,

{{ ref() }} и {{ source() }} вместо .get_table(),

— автоматический порядок выполнения по графу зависимостей вместо ручного подбора Spark-ресурсов.

Как конфигурируем Airflow: dag.yaml

Четвертый файл конфигурации определяет, когда и как Airflow запустит пайплайн:

product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *"  # каждый день в 00:15 MSK

params:
  - name: start_date
    description: "Start date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: end_date
    description: "End date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: months_back
    description: "Months to look back (default: 5)"
    default: 5

alerts:
  enabled: true
  severity: warning

Затем наш Python‑скрипт парсит dag.yaml и dbt_project.yaml и через библиотеку Cosmos генерирует полноценный Airflow DAG. Это единственный кусок кода на Python. Он пишется один раз и работает для всех dbt‑проектов. Вот ключевая часть этого скрипта:

def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]:
    config_dict = yaml.safe_load(dag_config_path.read_text())
    config = DagConfig.model_validate(config_dict)

    # Параметры из YAML → Airflow Params
    params = {}
    operator_vars = {}
    for param in config.params:
        params[param.name] = Param(
            default=param.default if param.default is not None else "",
            description=param.description,
        )
        operator_vars[param.name] = f"{{{{ params.{param.name} }}}}"

    # Cosmos создаёт DAG из dbt-проекта
    with DbtDag(
        dag_id=f"dbt_{project_path.name}",
        schedule=config.schedule,
        params=params,
        project_config=ProjectConfig(dbt_project_path=project_path),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name=project_name,
            profile_mapping=TrinoLDAPProfileMapping(
                conn_id="trino_default",
                profile_args={
                    "database": profile_database,
                    "schema": profile_schema,
                },
            ),
        ),
        operator_args={"vars": operator_vars},
    ) as dag:
        # Создаём схему перед запуском моделей
        create_schema = SQLExecuteQueryOperator(
            task_id="create_schema",
            conn_id="trino_default",
            sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
        )
        # Привязываем к корневым задачам
        for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
            task = dag.tasks_map[unique_id]
            if not task.upstream_task_ids:
                create_schema >> task

Cosmos читает manifest.json из dbt-проекта, разбирает граф зависимостей моделей и создаёт отдельную Airflow-задачу для каждой модели. Зависимости между задачами выстраиваются автоматически на основе ref() в SQL.

Как создаем пайплайны, не привлекая разработчиков

Теперь, когда аналитик хочет создать новый регулярный пайплайн, он может собрать его за несколько шагов:

Шаг 1. Создает папку в репозитории dbt-projects/my_new_pipeline/.

Шаг 2. Если нужна загрузка из внешнего источника, пишет YAML-конфиг для dlt. 

Шаг 3. Формирует SQL-модели в папке models/ и описывает источники в sources.yaml.

Шаг 4. Создаёт dbt_project.yaml и dag.yaml.

Шаг 5. Пушит в Git, проходит ревью, мержит.

CI/CD собирает dbt‑проект и отправляет артефакты на S3. Airflow читает DAG‑файлы оттуда, Cosmos парсит dbt‑проект и генерирует граф задач. По расписанию dbt запускает модели на Trino в правильном порядке. В итоге имеем обновленную витрину в хранилище, доступную в Superset.

Что изменилось после перехода на новую архитектуру пайплайнов

PySpark

dlt, Trino, Airflow+Cosmos

Пайплайн создавался 1–3 недели

Аналитик создает пайплайн за 1 день

Чтобы создать пайплайн, приходилось обращаться к разработчикам

Аналитики решают большую часть задач самостоятельно

Пайплайны требовали большой кодовой базы на Python

Пайплайны описывают в SQL и YAML, а объем кода уменьшился в 2 раза

Чтобы аналитик мог создавать пайплайны своими силами, он должен понимать концепции ref() и source(), разницу между table и incremental и знать основы Git. Для этого мы провели несколько внутренних воркшопов и составили пошаговые инструкции для каждого типа задач.

Как создаем пайплайны на dbt + Trino
Как создаем пайплайны на dbt + Trino

Почему новые инструменты не заменяют PySpark полностью

Для 10% наших пайплайнов PySpark все еще остается единственным вариантом, если трансформация не ложится в SQL. dbt поддерживает Jinja‑макросы, но это не замена полноценному Python. Кроме того, было бы нечестно умалчивать об ограничениях новых инструментов.

dlt + Delta: экспериментальный upsert. Мы используем Delta-формат в хранилище. У dlt коннектор для Delta помечен как experimental, поэтому merge-стратегия не работала. Пришлось придумать обходные пути: в одних случаях использовали replace вместо merge и теряли инкрементальность, в других — писали кастомные processing_steps.

Низкая отказоустойчивость Trino. У Trino есть механизм fault tolerance, но он работает через запись промежуточных результатов на S3. При наших терабайтных объемах данных использовать механизм нецелесообразно — дорого из-за огромного количества S3-операций. Если не включать fault tolerance, то при падении воркера Trino роняет весь запрос. В отличие от Spark, который перезапускает только упавшую задачу. Мы решили эту проблему через ретраи в DAG и декомпозицию тяжелых моделей на цепочку промежуточных.

UDF и кастомная логика. В Spark кастомная логика пишется на Python прямо в пайплайне, это удобно. С новой архитектурой делать такое сложнее. dbt поверх Trino не спасает: Jinja генерирует лишь SQL, а Python-модели работают только со Snowflake, Databricks и BigQuery. В самом Trino можно писать UDF, но только на Java со всеми вытекающими последствиями: отдельный репозиторий, сборка, деплой JAR на все воркеры. Таким образом, когда трансформация не ложится в SQL, UDF для Trino становится либо неподдерживаемым SQL-монстром, либо отдельным скриптом с потерей lineage.

Что планируем улучшить: тесты, шаблоны для моделей и обучение

Доделать тестирование. В PySpark у нас было отлажено тестирование пайплайнов, а в новой архитектуре тестов пока не хватает. В свежих версиях dbt появилась возможность юнит-тестирования. Теперь можно проверять логику SQL-моделей на мок-данных, не поднимая полный пайплайн. Хотим добавить dbt-тесты и на уровне моделей, и как отдельный слой мониторинга.

Внедрить шаблоны для типовых задач. Многие наши dbt-модели похожи. Один конфиг может описывать десяток моделей с одинаковым паттерном: разница только в таблице-источнике и применяемых фильтрах. Планируем вынести общую логику в dbt-макросы.

Расширить круг пользователей платформы. Мы хотим, чтобы больше инженеров и аналитиков могли самостоятельно работать с данными. Планируем регулярно проводить внутреннее обучение, готовить документацию и онбординг-гайды, чтобы новые пользователи могли быстро подключаться к платформе и строить свои модели.

Комментарии (0)