Говоря о серьезных кластерах в компаниях, нам часто приходится взаимодействовать со сторонними отделами и их данными. И зачастую, когда речь идет об ad-hoc, самый эффективный инструмент - Trino. Он удобен тем, что в платформе данных можно добавить каталог, который позволит по сути избежать настройки коннекшена для конечного пользователя. Просто в запросе указываешь название каталога данных и трино сам понимает, что нужно взять данные со сторонней базы данных. Но все меняется, когда выразительности SQL нам перестает хватать для выполнения поставленных задач и мы переходим в Spark. Точнее, менялось. С релизом Spark 3.0 появилась возможность взаимодействовать с внешними источниками так же просто, как в Trino.

Пример Trino запроса
Пример Trino запроса

На дворе 2025 год. Spark 3.0 уже почти 5 лет. Но зачастую, если задать коллеге по цеху вопрос "как собрать все orders с чеком больше 50" ответом будет примерно такой код:

df = (
    spark.read
    .format("jdbc")
    .option("url", jdbc_url)
    .option("query", "select * from orders where price > 50")
    .option("user", user)
    .option("password", password)
    .option("driver", driver)
    .load()
)

Эта конструкция в целом рабочая, но вынуждает автора кода идти к коллегам, спрашивать креды для доступа в БД. Передавать эти креды как параметр джобы. А когда эти креды меняются, джоба радостно превращается в тыкву.
Кроме того, передавая данные через stdout для запуска джобы, мы по сути светим их всему миру. И в том числе для решения этой проблемы в Airflow появилась заглушка.

Ретроспектива

Чтобы понять, откуда такой код в голове у большинства разработчиков, давайте вернемся назад, чтобы понимать как Spark развивался.

Как известно, Spark на начальных этапах был просто более прокачанной версией MapReduce. Поэтому в Spark1.0 вполне можно найти наследие SqlContext. Эта сущность отвечала за коннект с SQL базами данных, как понятно из названия. Это настолько древний легаси, что я даже не могу найти код, который с этим работал.

С релизом Spark1.3 миру был представлен DataFrame API. Главная особенность того релиза была в том, что появился Catalyst, позволяющий делать перестановки в выражениях для оптимизации работы с данными. Сложная конструкция с использованием SqlContext была заменена на привычный нам .format("jdbc"). Замечу, что внутри все осталось так же, это скорее косметическое изменение. Однако, появление Catalyst показало, что запрос можно оптимизировать путем перестановок в плане запроса!

Именно это и произошло в релизе Spark2.0. Там появилась возможность выкачивать не всю таблицу, а лишь её подмножество (тогда не знали ещё, что по умному это называется PushDown=)). И в этот момент пришло осознание необходимости унифицировать DataSource, что вылилось в интересный SPIP, который реализовали в Spark2.3.

Идея унификации работы с источниками подкреплялась появлением новых табличных форматов, а также понимаем, что нужно разделять логику работы с метаданными и логику работы с таблицей. Все это вылилось в очередной SPIP, в котором обосновывалась необходимость внедрения Catalog API. К сожалению, новый API оказался в тени AQE поддержкой и внедрением ANSI SQL, потому большинство разработчиков не в курсе нововведения.

Внедряем свой каталог

Чтобы понять, как похорошел Spark при Ryan Blue, насколько код становится приятнее, попробуем внедрить каталог на примере MySQL.
Честно говоря, на сайте Spark не нашел инструкции как пользоваться Catalog API, если кто-то нашел - поделитесь, пожалуйста. Поэтому пришлось смотреть в "кишочки" spark и начал я с CatalogPlugin, так как согласно API, каждая имплементация должна быть реализована именно через этот интерфейс. Порождается CatalogPlugin в object Catalogs. По сути это нехитрая фабрика, которая работает по следующему принципу:

Берем className каталога по имени через "spark.sql.catalog.$name"
Берем все опции каталога по паттерну "^spark\.sql\.catalog\." + name + "\.(.+)"
Инициализируем каталог
Кидаем в каталог все его опции, он сам разберется.

Поэтому для корректной настройки каталога, нужно заглядывать как он работает с опциями. В нашем случае это JDBCTableCatalog. Этот каталог работает с известными нам JDBCOptions.

Заведем docker-compose для демонстрации и экспериментов (код будет в конце).
Для этого создадим mysql таблицу с database habr.
А также будем использовать jupyter/pyspark-notebook.

Добавим в наш spark-defaults.conf следующие настройки:

spark.sql.catalog.mysql_catalog=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
spark.sql.catalog.mysql_catalog.driver=com.mysql.cj.jdbc.Driver
spark.sql.catalog.mysql_catalog.password=root
spark.sql.catalog.mysql_catalog.user=root

spark.sql.catalog.mysql_catalog.url=jdbc:mysql://mysql_db:3306/habr

И не забываем добавить драйвер в classpath при создании сборки:

RUN wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/9.4.0/mysql-connector-j-9.4.0.jar -P /usr/local/spark/jars/

Как видно из названия, я назвал свой каталог mysql_catalog.
Попробуем поработать с ним в уже привычном нам sql синтаксисе.

простой запрос через sql
простой запрос через sql

Как видите, для запроса больше не нужно знать креды, достаточно знать имя каталога. Если подключить какой-нибудь sparkmagic, то фактически можно делать запросы без обертки spark.sql, используя всю мощность spark кластера!

Перспективы использования каталогов

Развитие DataSource

Когда я рассказывал про историческое развитие, я вовсе не просто так упомянул Catalyst и DataSourceV2. Согласно новой концепции, Spark умеет прокидывать pushDownPredicate.

Целью Spark3.0 было внедрение каталогов, для упрощения взаимодействия, однако уже в 3.5 версии в JDBCOptions появились параметры:

spark.sql.catalog.mysql_catalog.pushDownPredicate=true
spark.sql.catalog.mysql_catalog.pushDownAggregate=true

Внедряя эти параметры, мы теперь можем писать код вроде такого:

пример с pushdown
пример с pushdown

Что на практике означает, что Spark не гоняет всю таблицу по сети, а делает выборку:

Другими словами, внедряя сейчас систему каталогов, завтра мы можем ускорять наши приложения, просто меняя spark-defaults. Без необходимости менять код.

Темплейтирование

В наше время, крупные компании переходят на Spark-on-kubernetes. И это ещё одна точка применения Catalog API!

При таком подходе разумно рассмотреть spark-defaults как ConfigMap, а использование Helm чартов серьезно упрощает построение datamesh архитектуры. Helm позволяет менять коннекшен к БД в зависимости от окружения на котором работает Spark-приложение. Например, на UAT окружении можно обращаться в uat инстанс, не меняя код. Тем самым, мы можем гарантировать, что любой код, запущенный в UAT стенде не приведет к утечкам sensitive данных.

Также для каждой команды можно выставлять свою учетную запись, под которой обращаемся в БД и тем самым создавать свои уникальные параметры подключения для каждой команды. Например, Platform команда может ходить в БД под platform_de учеткой с доступом в таблицу с метаданными, а Product использовать product_de без доступа в эту таблицу. А MlOps - вообще под ml_ops, которому разрешено всего 2 коннекшена одновременно.

И прелесть в том, что для всех коллег код выглядит одинаково и лаконично. И можно отделить бизнес-логику нашего ETL от настройки его подключения.

Заключение

Таким образом, сделав несколько несложных манипуляций, мы смогли заметно упростить наш код, а также дали возможность для его ускорения в будущем без необходимости что-то переписывать.
Полный код для домашних экспериментов можно найти по ссылке.

Буду рад узнать ваш опыт использования Catalog API.

Комментарии (5)


  1. eigrad
    22.10.2025 08:08

    Так имена таблиц остаются захардкоженными, можно пойти дальше, и во всей реализации бизнес-логики использовать pyspark.DataFrame как входные параметры, или дойти до подхода dagster с ассетами и iomanager'ами. Но в целом, контракт на уровне схемы каталога это тоже вариант, со своими плюсами.


  1. eigrad
    22.10.2025 08:08

    Только это все хорошо, а на практике все равно spark.read.parquet - и вперёд... Или, что ещё интереснее, df.write.mode("append").parquet(hive_table_location) из рандомного пайплайна :-).


    1. ozero17 Автор
      22.10.2025 08:08

      Тут скорее речь про подключение внешних источников, вроде Oracle/ClickHouse, и тд.
      Плюс, мы стараемся не писать пайплайны, что вы привели в пример. Такой код точно не пройдет ревью у наших коллег.


      1. eigrad
        22.10.2025 08:08

        ClickHouse

        Вы кстати тоже ходите в него через JDBC который внутри всё в tsv перегоняет?

        Недавно у Clickhouse появился Arrow Flight SQL в качестве нового интерфейса, через spark-flight-connector тратится на порядок меньше CPU на десериализацию vs старый JDBC.


      1. eigrad
        22.10.2025 08:08

        Такой код точно не пройдет ревью у наших коллег.

        Это хорошо, когда процессы выстроены так, что без ревью код рандомного отдела не залезет в общий даталейк.