Всем привет! Меня зовут Дмитрий Третьяков, я ML Engineer в компании «Лента». Мы регулярно запускаем PySpark‑приложения в Kubernetes‑кластере, используя Airflow. Этот процесс важен для нашей ежедневной работы с данными, но в какой‑то момент мы столкнулись с тем, что стандартный подход через SparkKubernetesOperator стал сдерживать развитие: не хватало гибкости, возникали сложности в сопровождении и процесс настройки был излишне сложным для разработчиков.
Чтобы решить эти проблемы, мы создали собственную библиотеку — Dagify. Это расширение для Airflow, которое упрощает работу с DAGами, устраняет рутину, связанную с конфигурацией SparkApplication, и делает запуск PySpark‑приложений по расписанию удобнее. В этой статье я расскажу, зачем мы пошли на разработку собственного Airflow‑оператора, как устроен Dagify и какие задачи он помогает решать.

Проблемы стандартного подхода
При запуске Spark‑job в Kubernetes через стандартные средства Airflow выяснилось, что это решение имеет несколько существенных недостатков:
две таски на одну Spark‑задачу. Приходилось запускать две задачи в даге: одну для подачи SparkApplication в кластер, и вторую — сенсор для ожидания завершения приложения. Это усложняет код дага, запутывает зависимости и ведет к избыточности — по сути, одна логическая задача разделена на две.
отсутствие стриминга логов Spark. Стандартный оператор не транслировал логи Spark в интерфейс Airflow во время выполнения. В логах Airflow был виден только статус или финальный результат, но не ход выполнения Spark‑приложения. Отладка и мониторинг длительных Spark‑job при этом были затруднены: чтобы посмотреть логи драйвера, нужно было идти в Kubernetes или в специализированный дашборд, что неудобно. Нам хотелось видеть логи выполнения прямо в Airflow.
неудобство работы с Yaml‑конфигами. SparkKubernetesOperator от Airflow требует передать yaml/json файл с описанием SparkApplication. Поддерживать эти манифесты неудобно: любая правка конфигурации требует редактировать статический yaml. Возникает дублирование настроек между кодом разных команд и проектов.

После анализа всех этих проблем мы решили, что начнём разработку собственного оператора, который объединит запуск и мониторинг Spark‑job в одной задаче Airflow, избавит от перечисленных неудобств с конфигурацией и позволит добавлять кастомные решения в наши даги.
Почему бы просто не ждать обновлений Airflow
Исходя из предыдущих пунктов возникает вопрос: почему бы не ждать обновления Airflow, в котором часть функционала может появиться?
Поддержка нескольких инстансов Airflow и обратная совместимость
У нас в компании есть несколько инстансов Airflow: дев/прод среда для Data Science команды, дев/прод среда для Data Analyst команды и отдельные инстансы для крупных продуктов. Обновление Airflow — это всегда потенциально болезненный процесс (мы, например, часто сталкивались с проблемами обратной совместимости даже минорных версий), особенно в продакшене. Мы выбрали более контролируемый путь — кастомный оператор, который решает нашу задачу без вмешательства в остальную инфраструктуру.
Решение — библиотека Dagify и кастомный SparkKubernetesOperator
С учетом всего описанного ранее, в качестве решения мы разработали библиотеку Dagify, внутри которой создали кастомный Airflow‑оператор SparkKubernetesOperator, расширяющий возможности стандартного. Он запускает Spark‑приложение в Kubernetes и сам же отслеживает его выполнение. Расскажем по пунктам, как устроено наше решение и какие фичи в нем реализованы.
Конфигурация через Python вместо yaml
Вместо описания Spark‑задачи в yaml мы перешли на формирование конфигурации в коде (Python). Мы разделили настройки на две части: Kubernetes‑конфиг (K8sConf) и Spark‑конфиг (SparkJobConf), чтобы логически разграничить параметры кластера и параметры самого Spark‑приложения. При этом, какая‑то общая логика, согласованная для большинства проектов, вынесена в саму библиотеку, но также существует и возможность ее переопределения.
На практике это выглядит так: в Python коде есть возможность добавить объекты конфигурации для Spark и для Kubernetes, а затем передать их в наш оператор. Ниже приведена иллюстрация инициализации SparkJobConf и передачи его в SparkKubernetesOperator:
Определяем Spark-конфигурацию приложения:
spark_conf=SparkJobConf(
name='task-name',
image='registry/image_name',
node_selector='spark_node_selector',
main_application_file='/path/to/file',
env_from='sercet_ref',
executor_cores=4,
num_executors=20,
executor_memory='16G',
driver_memory='32G',
spark_conf_overrides=['spark.sql.broadcastTimeout=1500s']
)
Инициализируем таску с нашим кастомным оператором:
spark_task = SparkKubernetesOperator(
spark_conf=spark_conf,
dag=dag,
sla=timedelta(minutes=30)
)
В реальном коде конфигурации могут быть более детальными, но ключевое то, что все задается в Python. Наша библиотека Dagify внутри оператора преобразует эти структуры в нужный формат для Kubernetes. При этом возникает возможность переиспользовать конфиги и избегать дублирования: можно подготовить шаблонные SparkJobConf и K8sConf для типовых задач и импортировать/расширять их в разных дагах.
Гибкая работа с Docker‑образами (develop/latest)
Еще одна удобная функция нашего оператора — это автоматическая подстановка тегов Docker‑образов develop или latest в зависимости от окружения. В нашей команде принята практика: для тестовых запусков использовать образ с тегом:develop (собранный из актуального кода), а для прода — стабильный тег (например,:latest или определенную версию). Чтобы разработчики лишний раз не забывали сменить тег, оператор делает это сам.
Алгоритм простой: если задача запускается в прод‑окружении Airflow, то в конфиге автоматически подставится образ с тегом latest, если не указано иного. А в дев‑окружении, соответственно, develop.
Такая мелочь заметно облегчила жизнь: Data Scientist‑ам достаточно один раз указать правильное имя образа, а актуальный тег для нужного контура подставится автоматически. Плюс, это снижает риск случайно запустить в продакшне экспериментальный код.
Дополнительные фичи кастомного оператора
Помимо основных функций запуска и мониторинга, мы добавили в SparkKubernetesOperator ряд дополнительных возможностей, повышающих надежность и удобство эксплуатации.
Автоматическое удаление пода при остановке задачи
Мы реализовали корректную обработку прерывания задачи Airflow. Если пользователь вручную останавливает таску в Airflow, наш оператор обнаруживает это и удаляет запущенный Spark‑под из кластера Kubernetes. Таким образом, не остается приложений, которые продолжали бы потреблять ресурсы кластера после отмены задачи.
Кастомные уведомления в Telegram
Мониторинг Spark‑задач стал проще благодаря уведомлениям о важных событиях. В нашей компании принята отправка оповещений в Telegram‑чаты при сбоях и проблемах пайплайнов. Автоматически присылаются уведомления о следующих ситуациях:
падение Spark‑приложения. Если job завершился с ошибкой, то сразу после получения статуса приходит сообщение в Telegram‑чат.
длительное ожидание запуска. Если Spark‑job долго находится в состоянии Pending (например, не хватает ресурсов кластера, и задача стоит в очереди), отслеживается время ожидания. Если порог (в нашем случае, 10 минут) превышен, в Telegram летит предупреждение, что задача «висит» в Pending слишком долго.
другие кастомные события. По необходимости можно настроить дополнительные уведомления (например, sla).
Единая логика и повторное использование конфигурации
Dagify объединяет всю вспомогательную логику в одной библиотеке, что дает существенный выигрыш в удобстве, поддержке и унификации: теперь запуск любого Spark‑приложения через Airflow происходит по одному и тому же сценарию, различаясь лишь параметрами конфигурации.
Единая точка входа в виде оператора Dagify позволяет добавить новый функционал (допустим, поддержку другой системы нотификаций или особой политики рестартов) один раз в коде оператора, и это сразу станет доступно всем проектам с актуальной версией.
Результаты и преимущества нашего решения
Внедрение кастомного SparkKubernetesOperator в связке с библиотекой Dagify оправдало себя по многим направлениям:
снижение порога входа для DS/DA. Специалисты по данным теперь могут запускать Spark‑задачи, не погружаясь в детали Kubernetes и не мучаясь с yaml. Это устранило множество мелких ошибок конфигурации и сэкономило время при разработке новых задач.
прозрачность и отладка. Благодаря стримингу логов, можно наблюдать за ходом выполнения Spark‑приложения в реальном времени в веб‑интерфейсе Airflow. Любые проблемы (ошибки, задержки) сразу заметны и сопровождаются автоматическими уведомлениями. В итоге — обнаружение и решение инцидентов стало происходить значительно быстрее.
централизация настроек. Мы избавились от разрозненных yaml файлов. Все настройки консолидированы либо в коде дага, либо в общей библиотеке. Это облегчает поддержку: все изменения версионируются вместе с кодом.
контроль и расширяемость. Собственный оператор мы полностью контролируем: он не зависит от сторонних изменений и адаптируется под любые требования команд.

В итоге новое решение показало себя эффективным: число «ручных» действий сократилось, надежность пайплайнов возросла, а разработка новых задач ускорилась. Проектные команды теперь тратят меньше времени на борьбу с инфраструктурными нюансами и больше на обработку и анализ данных.
Заключение
Опыт разработки собственного Airflow‑оператора для Spark в Kubernetes в нашей команде продемонстрировал, что не стоит бояться отходить от стандартных решений, а выигрыш в удобстве и надежности окупает затраты на разработку. От небольшого инструмента, который в первую очередь помогал команде MLE запускать решения в продакшн, мы пришли к тому, что практически все Spark‑job в нашей компании используют наш продукт.
Если ваша команда сталкивается с ограничениями встроенных операторов Airflow, имеет смысл рассмотреть написание своего. Начните с изучения кода базового оператора, выделите требования, которых нет «из коробки» и постепенно внедряйте их в виде собственного класса оператора.
На этом все, поделитесь опытом, приходилось ли вам разрабатывать свои решения, когда в стандартных библиотеках не хватало функционала?