Всем привет! Я недавно выступал с докладом на VK Kubernetes Conf 2024 про нашу историю изменения подходов к эксплуатации Kubernetes Airflow и хочу поделиться им с сообществом.

image
image

Что такое airflow

Apache Airflow — это один из самых популярных ETL-шедулеров. ETL-процессы — это когда мы выбираем интересующие нас данные, приводим их к агрегированному виду и сохраняем для дальнейшего использования. И это присутствует везде, где есть необходимость анализа данных. Соответственно, Airflow предназначен для того, чтобы запускать пайплайны обработки данных.

image
image

И в рамках Airflow эти пайплайны называются DAG — Directed Acyclic Graph (направленные циклические графы), где узлы графов — это задачи по обработке данных (task), а связи — это условия перехода.

image
image

И под капотом там — Python. То есть Airflow — это фактически специализированный ETL-фреймворк с возможностью позапускать свои программки по расписанию.

image
image

Классическая архитектура

В принципе у Airflow есть очень много вариантов запуска. Вы, например, при помощи local executor можете запустить Airflow в тетрадке jypyter notebook.

Но когда мы говорим про действительно большие, многонодовые инсталляции, предназначенные для перемалывания большого количества данных, то в классическом варианте с выделенными серверами используется celery executor.

В Airflow — два основных компонента: веб-сервер для обеспечения пользовательского интерфейса и шедулер для запуска наших пайплайнов. Кстати, шедулеров может быть много, они очень легко скейлятся.

image
image

Далее: у нас есть воркеры, мы расставляем их по нашим вычислительным нодам, именно они делают всю загрузку, всю обработку данных. И для того чтобы эти компоненты общались друг с другом, нам нужен PostgreSQL.

image
image

Следующий момент: Celery — это высокопроизводительная очередь на Python, и для её работы нужен Redis. Кроме того, опционально мы можем поднять Celery UI, чтобы контролировать заполнение очереди. Scheduler помещает туда задание, воркеры их оттуда пулят, подхватывают, выполняют и отчитываются через PostgreSQL о том, что они сделали.

image
image

Поскольку у нас система распределённая, таски DAG’ов должны обмениваться друг с другом информацией и сохранять логи для их демонстрации в веб-интерфейсе. Для этого мы должны обеспечить общее сетевое хранилище (например, на базе NFS).

image
image

Поскольку Airflow — это Python, нам часто требуется устанавливать дополнительные библиотеки, набор которых должен быть синхронизирован на всех нодах нашей инсталляции.

image
image

Далее мы должны разложить код наших DAG’ов по нодам и можем запускаться.

image
image

Проблемы Airflow bare metal celery executor

Какие проблемы у этой схемы?

  1. Поддержка среды: в baremetall-серверах configuration drift, кажется, начинает накапливаться уже на этапе инсталляции сервера в стойку. То есть поддерживать версии библиотек бывает очень непросто, особенно если не сформировалась культура этой поддержки.

  2. Обновления: эту схему без простоя обновить практически нереально. Нет единого стандарта доставки кода. Я видел статейку на медиуме, где аналитики сбрасывали на FTP’шку код DAG’ов, а оттуда rsync’ом оно разливалось по всем нодам. Гигантский простор для костыльного велосипедостроения.

  3. Изоляция DAG’ов. Любой таск любого DAG’а можно запустить на любой ноде, и они могут, например, помешать друг другу, взять какие-то данные друг от друга, которые могут быть секретными.

  4. Управление ресурсами. В рамках этой схемы мы не можем ограничить ресурсы у какого-то таска, какого-то DAG’а, он может съесть ресурсы всей ноды. Плюс какой-то DAG может наплодить тысячу и один таск и поставить нашу систему в нерабочее состояние.

  5. Мониторинг. В рамках этой схемы мы фактически можем следить только за длительностью выполнения task’ов и за длительностью выполнения нашего DAG’а. Заглянуть внутрь, определить, сколько ресурсов кто ест, очень и очень сложно.

Проблемы Airflow bare metal celery executor

Мы решили поступить другим образом: развернуть Airflow в Kubernetes. Причём у Airflow есть kubernetes executor — нативная интеграция с этой платформой «из коробки».

Мы спроектировали нашу систему таким образом: в Kubernetes установили Airflow control plane и нарезали неймспейсы для наших команд. Мы изначально пошли по пути создания multitenant-системы — одного большого коммунального Airflow.

image
image

По нашей задумке DAG’и команд запускаются в выделенных namespace’ах и таким образом изолированы друг от друга (на каждый таск DAG’а запускается свой под).

Особенности процессов в банке

Прежде чем обсуждать возникшие перед нами задачи, я хотел бы дать немного контекста внутренних процессов банка. Мы банк, и мы работаем с такими данными, которые подпадают под множество законодательных актов. В общем случае потерять данные для нас предпочтительнее, чем их скомпрометировать.

Поэтому у нас есть несколько строго изолированных друг от друга контуров для разработки и эксплуатации. И мы можем передавать артефакты строго однонаправленно между этими контурами.

image
image

Причём процедура передачи сопровождается набором мероприятий, которые называются приёмо-сдаточные испытания, в ходе которых мы подтверждаем, что наше решение действительно соответствует стандартам банка. Эта схема повлияла на то, что мы делали.

Задачи внедрения Airflow kubernetes executor

Установка Airflow

Мы пошли в Интернет и нашли два самых популярных Helm Chart: чат от разработчиков Airflow и community-чарт. И, заглянув внутрь, мы нашли там всё то, за что мы так не любим Helm. Куча какого-то спагетти-yaml-тимплейтинга.

image
image

Мы пошли по следующему пути: на основе официального чарта сгенерировали yaml под наш случай и на их основе по-быстрому сделали свой чарт, который подходил конкретно под наши условия. Это подход, который в будущем сэкономил нам очень много сил и очень много времени.

Разделение ресурсов

Здесь мы нашли несколько возможностей:

1. Template pod’ов. То есть в kubernetes executor есть шаблон, на основе которого Airflow создаёт поды task’ов, и в нём мы указали реквесты и лимиты по умолчанию.

image
image

По нашей задумке разработчики в своих DAG’aх могли бы переопределять это на уровне кода, если потребуется.

image
image

И следующий момент. Поскольку мы делали multitennant-систему, то развели наши команды по разным namespace’ам и описали в них по два объекта: resourceQuota, чтобы ограничить общее потребление ресурсов какой-то конкретной командой, и LimitRange, который не позволил бы разработчикам сбросить реквесты и лимиты или выставить их в рискованные значения.

К сожалению, эта схема имела свой сайд-эффект: разработчики должны явно указывать namespace своей команды в коде DAG’а. Эту вещь мы хотели контролировать на этапе приёмо-сдаточных испытаний.

image
image

Мы собрали нашу схему, задеплоились, и она не заработала. Мы пошли внутрь кода Airflow, благо это опенсорс, и увидели там вот такую строчку.

image
image

То есть изменения, которые мы передаём в DAG’ах, имеют более низкий приоритет, чем остальные, поэтому изменения в коде DAG’ов не работали. Мы пропачили эту строчку, отправили PR в github Airflow, его там около двух лет не принимали, в итоге пару лет с каждой новой версией мы патчили этот файлик и заменяли его в базовом образе.

Мониторинг

Прекрасный мониторинг мы получили «из коробки»: на каждый таск каждого DAG’а запускается свой pod, рабочие параметры которого (потребление памяти и процессора, например) мы можем видеть на мониторинге без дополнительных усилий. Кроме того, мы можем делать разные интересные вещи типа подсадить APM внутрь контейнера и смотреть попроцедурно, где наши аналитики написали неоптимальный код.

Доставка кода DAG’ов

Код DAG’ов фактически должен находиться в каждом из наших контейнеров, и здесь было два пути. Первый путь — «запечь» в контейнеры наши DAG’и. Этот подход показался нам достаточно инертным и негибким.

Второй путь — доставить код в контейнеры отдельным процессом. В официальном Helm-чарте для этих целей используется git-sync, который периодически может скачивать с git’а код и отдавать его в поды Airflow.

git-sync может работать в двух вариантах: для долгоживущих подов по Airflow Scheduler может работать sidecar-контейнером, для подов тасков DAG’ов он может работать init-контейнером и запускаться перед тем, когда стартует основной процесс.

image
image

Общее хранилище данных

Как говорилось выше, для совместной работы нужен сетевой диск, который должен быть смонтирован в каждый pod.

Мы решили не мудрствовать лукаво, а пошли и заказали в нашей корпоративной хранилке NFS-диск, смонтировали его на каждую ноду и прописали в pod’ах hostPath.

Обеспечение безопасности

Airflow работает неавтономно, он работает с данными, и источники этих данных — фактически всё, что у вас есть в рамках вашего предприятия. Для того же, чтобы иметь туда доступ, нам нужны учётные данные.

image
image

В Airflow есть механизм по умолчанию, который называется connections.

image
image

В веб-интерфейсе Airflow мы можем заводить эти учётные данные, причём это сделано достаточно защищённо: в системе используется «Femet key», который есть только в рантайме. Он шифрует данные перед тем, как они попадают в базу.

image
image

Но этот ключ — общий на экземпляр, а у нас команды изолированы, и мы не хотели, чтобы какие-то соседние команды использовали секреты друг друга. Защита, построенная на том, что кто-то просто не знает имя connection’а, — это плохая защита. Поэтому мы решили поступить следующим образом: мы создали Kubernetes-secret’ы в каждом неймспейсе у каждой команды, и разработчики DAG’ов могут монтировать их в коде DAG’а, если возникнет такая необходимость.

image
image

Разделение прав пользователей

Airflow легко интегрируется с LDAP/Active Directory. Но в Airflow — своя ролевая модель. Есть способ сделать «mapping» групп active derectory в группы Airflow, которые мы нарезали для команд.

image
image

Но здесь опять вышел побочный эффект: разработчики должны были прописывать права на DАG’и в коде для своей команды.

image
image

Это мы тоже собирались контролировать на уровне приёмо-сдаточных испытаний.

Организация разработки

Я думаю, понятно, что особенности работы kubernetes executor и подход к работе с секретными данными очень сильно мешают использовать Airflow локально. Поэтому нам, кроме прода, нужен был отдельный стенд разработки. Мы его сделали в изолированном дев-контуре. Кроме того, мы нарезали git-репозитории для DAG’ов в каждой команде(отдельный для dev и для prod). Мы точно так же не хотели, чтобы команды могли смотреть/изменять/переиспользовать/ломать код друг друга.

Для деплоя мы сделали один большой репозиторий. Процесс доставки кода происходил следующим образом: аналитики пушили свои DАG’и в git-репозитории своих команд. Teamcity (наша основная CI/CD-система) видела это и запускала пайплайн, который обновлял git submodules (которыми были подключены эти репозитории) в основном репозитории, и уже этот репозиторий пулил git-sync в Airflow.

image
image

Мы запустились в этой конфигурации, и на начальном этапе всё было хорошо. А потом пришли проблемы роста.

Проблемы «коммунального» Airflow и их решение

image
image

Airflow control plane получился очень громоздким и высоконагруженным. Было реально страшно что-то делать, т. к. боялись уронить всю вот эту конструкцию.

Сложно переносить изменения. В каждой команде был репозиторий для дев, репозиторий для прод, и, поскольку это многопользовательские репозитории, история комитов в них состояла из fix-fix-fix, когда разработчики просто забрасывали туда какие-то маленькие изменения для тестирования. Собрать всё это воедино для переноса на продакшен часто было нетривиальной задачей. И у нас из-за этого начал накапливаться environment drift, когда дев-репозиторий сильно отличался от прод-репозитория.

Общая хрупкость этой схемы: git submodules — это очень хрупкий механизм, который легко ломается. Например, были случаи, когда разработчики в свои git-репозитории для DAG’ов пытались подключить какие-то другие сабмодули, и эти транзитивные сабмодули легко ломали основной репозиторий.

Кроме того, Airflow оказался тоже достаточно хрупкой системой, которую можно положить неправильно написанным кодом или каким-то левым файликом. Допустим, аналитики приносили в репозиторий pkl-файлы весов для моделей, какие-то excel-файлы как шаблоны для отправки по почте. И это ломало sheduler, и это было очень сложно отдебажить.

Airflow на команду

И мы решили: а давайте распилим этот монолит. То есть уйдём от концепции, когда у нас есть общий коммунальный Аirflow. Взамен мы сделаем выделенный экземпляр Airflow для каждой команды. Разработчикам не нужно будет прописывать неймспейс и права в DAG’ах, неправильным комитом они смогут сломать только свой экземпляр. В итоге каждый экземпляр по отдельности будет более лёгким и стабильным.

image
image

У нас были следующие задачи по распилу монолита:

1. Общее хранилище логов и данных. В парадигме коммунального Airflow у нас был общий NFS-волюм, который был подключён на все ноды. Мы решили, что не будем изобретать велосипед, пошли в сторону выделения своего волюма для каждой команды и решили сделать это через механизм CSI-плагинов.

image
image

Мы нашли Kubernetes NFS Subdir External Provisioner, который из одного NFS-диска, автоматически нарезая в нём папочки, может создавать и монтировать PV в pod’ы. В дальнейшем нам это очень сильно помогло. Сейчас мы делаем новую платформу данных, в которой сетевые диски будут на базе longhorn. И переписывать код Helm Chart’а с таким подходом нам не пришлось.

2. Доставка кода DAG’ов. Git-репозиторий в прошлой парадигме, когда в каждом pod’е есть свой git-sync, стал единой точкой отказа. Если git лежит, то DAG’и не запускаются. Поэтому мы решили использовать существующий общий NFS: сделали один выделенный git-sync, который скачивал бы туда код DAG’ов в определённую папку, куда смотрели бы все запускающиеся pod’ы.

3. Универсальный Helm Chart. Нам нужно было переписать Helm Chart, который деплоил наш коммунальный Airflow на создание выделенного экземпляра на команду. В этом плане нам очень сильно помог подход с созданием собственного чарта на основе yaml-файлов, сгенерированных из официального: не нужно было бороться с кучей лишнего кода, покрывающего ненужный нам функционал.

4. Типовой деплой.

image
image

У нас в качестве CI-системы — TeamCity, и мы используем её в парадигме экранных форм, то есть «накликиваем» наши пайплайны.

image
image

Нам хотелось бы не «накликивать» их каждый раз, а сделать какую-то единую точку, из которой мы могли бы унифицированно создавать и управлять изменениями во всех пайплайнах. В TeamCity есть такой механизм, как build configuration template.

image
image

Это шаблон, создав пайплайн по которому мы можем практически только переопределить имя инстанса, который хотим задеплоить в этом пайплайне, а всё остальное сделается «магией» Хелма.

image
image

Проблемы схемы Airflow на команду

Для разработки мы сделали схему, когда у нас один репозиторий: в dev у нас идёт ветка develop, а в prod идёт main.

image
image

На самом деле это несильно решило проблему истории комитов, потому что всё равно при многопользовательской разработке история в ветке develop — это фикс-фикс-фикс. Сложно переносить, накапливается environment drift. В итоге у нас возникла следующая идея: а что если нам устанавливать Airflow не на команду, а на задачу?

Согласно нашей задумке разработчик получает задачу в Jira запилить какую-то фичу, форкает репозитории с DAG’ами в фича-ветку, и на эту фича-ветку поднимается свой экземпляр Airflow. Любой другой разработчик со своим таском форкается, поднимается другой экземпляр, таким образом они друг другу не мешают.

Они смогут спокойно всё отладить, сделать внятный pull-реквест, отослать его тимлиду на ревью. А когда тимлид примет PR, мы просто сносим экземпляр, соответствующий задаче, которая сделана, и на освободившихся ресурсах поднимается экземпляр, от которого зависит уже последующая задача.

Реализация схемы Airflow на задачу

Прежде всего нужно было сделать выкат feature-окружений, и на удивление изменений для этого случилось достаточно мало. Все нужные переменные уже содержатся в свойствах пайплайна. Фактически нужно просто определить веточку, куда форкнулся наш разработчик, на основе её имени сформировать namespace, куда мы деплоим наш экземпляр, и в git-sync поправить веточку, с которой он будет брать изменения.

image
image

Следующий момент — создание connections и variables. Поскольку мы распилили наш Airflow, проблема использования connections соседними командами ушла. Connections — это очень удобный вариант для разработчиков. Поэтому мы сделали следующим образом: connections и variables в Airflow очень легко экспортируются

image
image

Мы просто сделали какой-то эталонный файлик с connections, который зашифровали, положили в гит и накатываем в пайплайне при создании нового окружения.

И последнее — удаление окружения после мержа. У нас для хранения кода используется Bitbucket. К сожалению TeamCity не может понимать, что ветка в Bitbucket удалена или пул-реквест смержен. После вдумчивого чтения документации в Bitbucket был найден механизм вебхуков.

image
image

То есть при появлении нужного события Bitbucket сам может триггернуть наш TeamCity, чтобы она что-то сделала. В TeamCity приходит сообщение в формате json, которое достаточно просто распарсить, посмотреть имя ветки, действия, которое над ней совершается, и если это удаление, то просто снести наше окружение, которое создано по этой ветке. Достаточно тривиальная операция.

Выводы

Прежде всего мы сделали правильно, пойдя с Airflow в Kubernеtes. То есть «из коробки» мы уже получили все преимущества по управлению инфраструктурой, которую даёт эта платформа.

На основе механизмов нативной интеграции Airflow с Kubernеtes и на основе внутренних механизмов Airflow мы сделали достаточно хорошую коммунальную платформу для наших разработчиков, которая в принципе удовлетворяла наши потребности, но была недостаточно гибкой и предполагала следование определённым стандартам при написании DAG’ов, которые было сложно контролировать (человеческий фактор).

Но, спустившись чуть ниже — на уровень платформы Kubernetes, на уровень DevOps-инструментов и тулинга вокруг экосистемы Kubernetes, — мы смогли создать гораздо более удобную, гибкую и безопасную платформу для наших аналитиков.

image
image

Комментарии (3)


  1. evgeny_vasyuk
    16.08.2024 12:02

    Здравствуйте,
    Сспасибо за статью. Я не совсем понял как вы решаете вопрос установки новых Python модулей. Через обновление имеджей или нашли более удобный способ?


    1. seasadm Автор
      16.08.2024 12:02

      Самый правильный способ обновлять модули это обновление имиджей. К сожалению, так получается не всегда и некоторые слишком большие библиотеки типа хадуповских парселей мы всё-таки располагаем на нодах и монтируем по hostpath.


  1. Loggus66
    16.08.2024 12:02
    +1

    Вы не рассказали про опыт. Кроме одного патча, который вы подменяете в Airflow, всё стандартно. Как насчёт проблем?

    1) Поды падают во время выполнения, перерабатывая текст в csv мегов на 20 (OOMKilled). Разработчик клянётся, что там нечему жрать память. Поднимаю память, запуская процесс снова и снова. На трёх гигах ОЗУ Pandas наконец-то начинает шевелиться и отрабатывает. Наказаний за такой жрущий код нет, подразумевается "если такой умный - покажи, как надо". Хотя перерабатывается в CSV по сути pg dump, надо всего лишь заглянуть в доку и написать copy to stdout format CSV, например.

    2) Некоторые операторы - самописные и декларация ресурсов в DAG не делает ничего, внутри них просто нет кода для обработки этого, но при этом внутри шага с оператором запускается тот же Pandas. Стискиваю зубы и поднимаю в worker template лимиты до 2ГБ вообще для всех.

    3) Поды начинают падать до начала выполнения, но пишут лог в s3. Оказывается, запускать сразу много подов в полночь чревато, они начинают толпиться и некоторые умирают в давке, потому что узлы не могут сразу принять все поды, когда они требуют по 2ГБ на каждую задачу внутри DAG. Поднимаю worker_pods_pending_timeout до трёх часов, чтобы когда-нибудь да выполнились.

    4) Поды вроде бы падают неизвестно когда, но ничего не пишут в s3. Airflow отображает "falling back to local log" и 404 - пода уже нет. О, это самый интересный случай, нетривиальный.

    Стал собирать вывод подов в лог каждую минуту, посмотреть на статус. OutOfMemory. Не OOMKilled, а OutOfMemory. Оказывается, ошибка характерна для k8s 1.22, и это наша версия, обновить нельзя, legacy на последнем издыхании, съезжать тоже - свои причины. За неделю удалось уговорить проверить идею не запускать все поды в одну секунду. Помогло.

    5) Поды падают с ошибкой upstream_failed. Оказалось, что если родительская задача падает, то в k8s executor следующая задача прекращает свою работу, когда срабатывает worker_pods_pending_timeout для неё, потому что момент начала работы (видимо) начинается с ожидания успешного выполнения родительской задачи.

    https://github.com/apache/airflow/discussions/18395

    https://stackoverflow.com/questions/73852012/airflow-task-improperly-has-an-upstream-failed-status-after-previous-task-succ

    Но тут я не уверен, самый неисследованный случай, который просто перестал происходить, когда разобрались с историями 3 и 4.

    Съел я на нём уже пуд соли, очень хочу обратно на Celery Executor на ВМ. Там и памяти завались, и процессы запускаются без контейнеров и вышеперечисленных органичений. А на бумаге хорошо, да.