Всем привет! На связи Кирилл Козлов, data‑инженер Mindbox. Наша команда регулярно пересчитывает бизнес‑метрики для клиентов. Для этого нам приходится формировать витрины данных для биллинга и аналитики на основе десятков источников.
Долгое время мы обрабатывали данные для расчетов на PySpark — инструменте, с которым сложно работать без опыта программирования на Python. Чтобы создать любой пайплайн, приходилось привлекать разработчиков. Это затягивало процесс на несколько недельных спринтов.
В статье расскажу, как мы построили внутреннюю data‑платформу, где аналитик или продакт может создать регулярно обновляемый пайплайн, описав его в четырех YAML‑файлах.
Почему PySpark тормозил создание пайплайнов
Опишу нашу боль на примере типичного пайплайна — элементарного подсчета MAU.
На первый взгляд, задачу можно было бы решить с помощью простого SQL‑запроса — COUNT(DISTINCT customerId) по нескольким таблицам за период. Но из‑за инфраструктурного обвеса — PySpark, Airflow DAG, Spark‑ресурсы, тесты — пришлось отдать задачу разработчикам. В итоге мы ждали пайплайн подсчета MAU целую неделю.
На подготовку расчета каждой новой метрики уходило от 1 до 3 недель. И каждый раз приходилось повторять один и тот же сценарий:
Аналитик формулировал бизнес‑требования, искал свободного разработчика, передавал ему контекст задачи.
Разработчик уточнял детали, писал код на Python, проходил ревью, настраивал DAG и деплоил результат.
Мы же хотели ускорить этот процесс и решать такие задачи силами аналитиков и менеджеров продукта, которые лучше всех понимают бизнес‑логику, свободно владеют SQL и YAML, но не работают с Python и PySpark.

Чем заменили PySpark: Python не нужен, достаточно YAML и SQL
Чтобы использовать декларативный подход для создания пайплайнов, мы разделили весь data‑слой на три части и для каждой подобрали инструмент:
dlt (data load tool) загружает данные из внешних API и БД в объектное хранилище. Конфигурируется YAML‑файлом без кода.
dbt (data build tool) on Trino преобразует данные с помощью SQL. Связывает модели через ref(), сам строит граф зависимостей и управляет инкрементальными обновлениями.
Airflow + Cosmos оркестрирует пайплайны. DAG для Airflow генерируется автоматически из dag.yaml и dbt‑проекта.
Trino мы уже использовали как query engine для ad‑hoc‑запросов и подключали его к Superset для BI. Он уже тогда показал себя хорошо: для запросов с базовой логикой обрабатывал огромные объемы данных быстрее и с меньшим потреблением ресурсов, чем Spark. Кроме того, Trino из коробки поддерживает федеративный доступ к нескольким хранилищам из одного SQL‑запроса. Для 90% наших пайплайнов Trino подходил отлично.

Как загружаем данные: dlt.yaml
Первый yaml‑файл описывает, откуда и как загружать данные для дальнейшей обработки. Вот реальный пример того, как загружаются данные биллинга из внутреннего API:
product: sg-team feature: billing schema: billing_tarification dag: dag_id: dlt_billing_tarification schedule: "0 4 * * *" description: "Daily refresh of tarification data" tags: - billing alerts: enabled: true severity: warning source: type: rest_api client: base_url: "https://internal-api.example.com" auth: type: bearer token: dlt-billing.token resources: - name: tarification_data endpoint: path: /tarificationData method: POST json: firstPeriod: "{{ previous_month_date }}" lastPeriod: "{{ previous_month_date }}" pricingPlanLine: CurrentPlan write_disposition: replace processing_steps: - map: dlt_custom.billing_tarification_data.map - name: charges_raw columns: staffUserName: data_type: text nullable: true endpoint: path: /data-feed/charges method: POST json: firstPeriod: "{{ previous_month_date }}" lastPeriod: "{{ previous_month_date }}" write_disposition: replace - name: discounts_raw endpoint: path: /data-feed/discounts method: POST json: firstPeriod: "{{ previous_month_date }}" lastPeriod: "{{ previous_month_date }}" write_disposition: replace
В файле описаны четыре ресурса из одного API. Для каждого указывается endpoint, параметры запроса и стратегии записи: в нашем случае replace означает «перезаписывать каждый раз». В этом конфиге можно добавлять шаги обработки processing_steps, описывать типы колонок и настраивать алерты.
В результате у нас всего 40 строк YAML‑конфига. Без dlt каждый такой коннектор — это Python‑скрипт с запросами, обработкой пагинации, ретраями, сериализацией в Delta Table формат и загрузкой в хранилище.
Как трансформируем данные с помощью SQL: dbt_project.yaml и sources.yaml
Следующим шагом конфигурируем dbt‑модель. Для Trino это SQL‑запросы.
Приведу пример конфигурации для расчета MAU. Так выглядит подготовка событий из одного источника:
-- int_mau_events_visits.sql {{ config(materialized='table') }} WITH period AS ( SELECT YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year, MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month, YEAR(CURRENT_DATE) AS end_year, MONTH(CURRENT_DATE) AS end_month ), merged_customers AS ( SELECT * FROM {{ ref('int_merged_customers') }} ), actual_customers AS ( SELECT * FROM {{ ref('int_actual_customers') }} ), events AS ( SELECT src._tenant, src.unmergedCustomerId, 'visits' AS src_type, src.endpoint FROM {{ source('final', 'customerstracking_visits') }} src CROSS JOIN period p WHERE src.unmergedCustomerId IS NOT NULL AND ((src.timestamp_year > p.start_year) OR (src.timestamp_year = p.start_year AND src.timestamp_month >= p.start_month)) AND ((src.timestamp_year < p.end_year) OR (src.timestamp_year = p.end_year AND src.timestamp_month <= p.end_month)) ), events_with_customer AS ( SELECT e._tenant, COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId, e.src_type, e.endpoint FROM events e LEFT JOIN merged_customers mc ON e._tenant = mc._tenant AND e.unmergedCustomerId = mc.unmergedCustomerId ) SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint FROM events_with_customer ewc WHERE EXISTS ( SELECT 1 FROM actual_customers ac WHERE ewc._tenant = ac._tenant AND ewc.customerId = ac.customerId )
Все 10 источников событий построены по одному и тому же принципу. Меняется только таблица‑источник и фильтры. Далее модели собираются в единый поток:
-- int_mau_events.sql (объединение всех источников) SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }} UNION ALL SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }} UNION ALL SELECT * FROM {{ ref('int_mau_events_visits') }} UNION ALL SELECT * FROM {{ ref('int_mau_events_orders') }} -- ...и ещё 6 источников
И наконец финальная витрина, где все считается:
-- mau_period_datamart.sql {{ config( materialized='incremental', incremental_strategy='merge', unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month'] ) }} {%- set months_back = var('months_back', 5) | int -%} WITH period AS ( SELECT YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year, MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month, YEAR(CURRENT_DATE) AS end_year, MONTH(CURRENT_DATE) AS end_month ), events_resolved AS ( SELECT * FROM {{ ref('int_mau_events') }} ), metrics_by_tenant AS ( SELECT er._tenant, COUNT(DISTINCT CASE WHEN src_type = 'visits' THEN customerId END) AS CustomersTracking_Visits, COUNT(DISTINCT CASE WHEN src_type = 'orders' THEN customerId END) AS ProcessingOrders_Orders, COUNT(DISTINCT CASE WHEN src_type = 'mailings' THEN customerId END) AS Mailings_MessageStatuses, -- ...остальные метрики COUNT(DISTINCT customerId) AS MAU FROM events_resolved er GROUP BY er._tenant ) SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month FROM metrics_by_tenant m CROSS JOIN period p
Для конфигурации витрины используем incremental_strategy='merge'. В этом случае dbt сам генерирует merge-запрос, подставляя unique_key для upsert. Не нужно вручную реализовывать инкрементальную загрузку.
Чтобы связать модели в один проект, собираем конфиг dbt_project.yaml:
name: mau_period version: '1.0.0' models: mau_period: +on_table_exists: replace +on_schema_change: append_new_columns
И файл sources.yaml, который описывает входные таблицы:
sources: - name: final database: data_platform schema: final tables: - name: inapps_targetings_v2 - name: inapps_clicks_v2 - name: customerstracking_visits - name: processingorders_orders - name: cdp_mergedcustomers_v2 # ...
В итоге получаем ту же бизнес-логику, что и в PySpark, но на чистом SQL:
— sources.yaml вместо typedspark-схем,
— {{ ref() }} и {{ source() }} вместо .get_table(),
— автоматический порядок выполнения по графу зависимостей вместо ручного подбора Spark-ресурсов.
Как конфигурируем Airflow: dag.yaml
Четвертый файл конфигурации определяет, когда и как Airflow запустит пайплайн:
product: sg-team feature: billing schema: mau schedule: "15 21 * * *" # каждый день в 00:15 MSK params: - name: start_date description: "Start date (YYYY-MM-DD). Leave empty for auto" default: "" - name: end_date description: "End date (YYYY-MM-DD). Leave empty for auto" default: "" - name: months_back description: "Months to look back (default: 5)" default: 5 alerts: enabled: true severity: warning
Затем наш Python‑скрипт парсит dag.yaml и dbt_project.yaml и через библиотеку Cosmos генерирует полноценный Airflow DAG. Это единственный кусок кода на Python. Он пишется один раз и работает для всех dbt‑проектов. Вот ключевая часть этого скрипта:
def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]: config_dict = yaml.safe_load(dag_config_path.read_text()) config = DagConfig.model_validate(config_dict) # Параметры из YAML → Airflow Params params = {} operator_vars = {} for param in config.params: params[param.name] = Param( default=param.default if param.default is not None else "", description=param.description, ) operator_vars[param.name] = f"{{{{ params.{param.name} }}}}" # Cosmos создаёт DAG из dbt-проекта with DbtDag( dag_id=f"dbt_{project_path.name}", schedule=config.schedule, params=params, project_config=ProjectConfig(dbt_project_path=project_path), profile_config=ProfileConfig( profile_name="default", target_name=project_name, profile_mapping=TrinoLDAPProfileMapping( conn_id="trino_default", profile_args={ "database": profile_database, "schema": profile_schema, }, ), ), operator_args={"vars": operator_vars}, ) as dag: # Создаём схему перед запуском моделей create_schema = SQLExecuteQueryOperator( task_id="create_schema", conn_id="trino_default", sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...", ) # Привязываем к корневым задачам for unique_id, _ in dag.dbt_graph.filtered_nodes.items(): task = dag.tasks_map[unique_id] if not task.upstream_task_ids: create_schema >> task
Cosmos читает manifest.json из dbt-проекта, разбирает граф зависимостей моделей и создаёт отдельную Airflow-задачу для каждой модели. Зависимости между задачами выстраиваются автоматически на основе ref() в SQL.
Как создаем пайплайны, не привлекая разработчиков
Теперь, когда аналитик хочет создать новый регулярный пайплайн, он может собрать его за несколько шагов:
Шаг 1. Создает папку в репозитории dbt-projects/my_new_pipeline/.
Шаг 2. Если нужна загрузка из внешнего источника, пишет YAML-конфиг для dlt.
Шаг 3. Формирует SQL-модели в папке models/ и описывает источники в sources.yaml.
Шаг 4. Создаёт dbt_project.yaml и dag.yaml.
Шаг 5. Пушит в Git, проходит ревью, мержит.
CI/CD собирает dbt‑проект и отправляет артефакты на S3. Airflow читает DAG‑файлы оттуда, Cosmos парсит dbt‑проект и генерирует граф задач. По расписанию dbt запускает модели на Trino в правильном порядке. В итоге имеем обновленную витрину в хранилище, доступную в Superset.
Что изменилось после перехода на новую архитектуру пайплайнов
PySpark |
dlt, Trino, Airflow+Cosmos |
Пайплайн создавался 1–3 недели |
Аналитик создает пайплайн за 1 день |
Чтобы создать пайплайн, приходилось обращаться к разработчикам |
Аналитики решают большую часть задач самостоятельно |
Пайплайны требовали большой кодовой базы на Python |
Пайплайны описывают в SQL и YAML, а объем кода уменьшился в 2 раза |
Чтобы аналитик мог создавать пайплайны своими силами, он должен понимать концепции ref() и source(), разницу между table и incremental и знать основы Git. Для этого мы провели несколько внутренних воркшопов и составили пошаговые инструкции для каждого типа задач.

Почему новые инструменты не заменяют PySpark полностью
Для 10% наших пайплайнов PySpark все еще остается единственным вариантом, если трансформация не ложится в SQL. dbt поддерживает Jinja‑макросы, но это не замена полноценному Python. Кроме того, было бы нечестно умалчивать об ограничениях новых инструментов.
dlt + Delta: экспериментальный upsert. Мы используем Delta-формат в хранилище. У dlt коннектор для Delta помечен как experimental, поэтому merge-стратегия не работала. Пришлось придумать обходные пути: в одних случаях использовали replace вместо merge и теряли инкрементальность, в других — писали кастомные processing_steps.
Низкая отказоустойчивость Trino. У Trino есть механизм fault tolerance, но он работает через запись промежуточных результатов на S3. При наших терабайтных объемах данных использовать механизм нецелесообразно — дорого из-за огромного количества S3-операций. Если не включать fault tolerance, то при падении воркера Trino роняет весь запрос. В отличие от Spark, который перезапускает только упавшую задачу. Мы решили эту проблему через ретраи в DAG и декомпозицию тяжелых моделей на цепочку промежуточных.
UDF и кастомная логика. В Spark кастомная логика пишется на Python прямо в пайплайне, это удобно. С новой архитектурой делать такое сложнее. dbt поверх Trino не спасает: Jinja генерирует лишь SQL, а Python-модели работают только со Snowflake, Databricks и BigQuery. В самом Trino можно писать UDF, но только на Java со всеми вытекающими последствиями: отдельный репозиторий, сборка, деплой JAR на все воркеры. Таким образом, когда трансформация не ложится в SQL, UDF для Trino становится либо неподдерживаемым SQL-монстром, либо отдельным скриптом с потерей lineage.
Что планируем улучшить: тесты, шаблоны для моделей и обучение
Доделать тестирование. В PySpark у нас было отлажено тестирование пайплайнов, а в новой архитектуре тестов пока не хватает. В свежих версиях dbt появилась возможность юнит-тестирования. Теперь можно проверять логику SQL-моделей на мок-данных, не поднимая полный пайплайн. Хотим добавить dbt-тесты и на уровне моделей, и как отдельный слой мониторинга.
Внедрить шаблоны для типовых задач. Многие наши dbt-модели похожи. Один конфиг может описывать десяток моделей с одинаковым паттерном: разница только в таблице-источнике и применяемых фильтрах. Планируем вынести общую логику в dbt-макросы.
Расширить круг пользователей платформы. Мы хотим, чтобы больше инженеров и аналитиков могли самостоятельно работать с данными. Планируем регулярно проводить внутреннее обучение, готовить документацию и онбординг-гайды, чтобы новые пользователи могли быстро подключаться к платформе и строить свои модели.