Всем привет! Я недавно выступал с докладом на VK Kubernetes Conf 2024 про нашу историю изменения подходов к эксплуатации Kubernetes Airflow и хочу поделиться им с сообществом.
Что такое airflow
Apache Airflow — это один из самых популярных ETL-шедулеров. ETL-процессы — это когда мы выбираем интересующие нас данные, приводим их к агрегированному виду и сохраняем для дальнейшего использования. И это присутствует везде, где есть необходимость анализа данных. Соответственно, Airflow предназначен для того, чтобы запускать пайплайны обработки данных.
И в рамках Airflow эти пайплайны называются DAG — Directed Acyclic Graph (направленные циклические графы), где узлы графов — это задачи по обработке данных (task), а связи — это условия перехода.
И под капотом там — Python. То есть Airflow — это фактически специализированный ETL-фреймворк с возможностью позапускать свои программки по расписанию.
Классическая архитектура
В принципе у Airflow есть очень много вариантов запуска. Вы, например, при помощи local executor можете запустить Airflow в тетрадке jypyter notebook.
Но когда мы говорим про действительно большие, многонодовые инсталляции, предназначенные для перемалывания большого количества данных, то в классическом варианте с выделенными серверами используется celery executor.
В Airflow — два основных компонента: веб-сервер для обеспечения пользовательского интерфейса и шедулер для запуска наших пайплайнов. Кстати, шедулеров может быть много, они очень легко скейлятся.
Далее: у нас есть воркеры, мы расставляем их по нашим вычислительным нодам, именно они делают всю загрузку, всю обработку данных. И для того чтобы эти компоненты общались друг с другом, нам нужен PostgreSQL.
Следующий момент: Celery — это высокопроизводительная очередь на Python, и для её работы нужен Redis. Кроме того, опционально мы можем поднять Celery UI, чтобы контролировать заполнение очереди. Scheduler помещает туда задание, воркеры их оттуда пулят, подхватывают, выполняют и отчитываются через PostgreSQL о том, что они сделали.
Поскольку у нас система распределённая, таски DAG’ов должны обмениваться друг с другом информацией и сохранять логи для их демонстрации в веб-интерфейсе. Для этого мы должны обеспечить общее сетевое хранилище (например, на базе NFS).
Поскольку Airflow — это Python, нам часто требуется устанавливать дополнительные библиотеки, набор которых должен быть синхронизирован на всех нодах нашей инсталляции.
Далее мы должны разложить код наших DAG’ов по нодам и можем запускаться.
Проблемы Airflow bare metal celery executor
Какие проблемы у этой схемы?
Поддержка среды: в baremetall-серверах configuration drift, кажется, начинает накапливаться уже на этапе инсталляции сервера в стойку. То есть поддерживать версии библиотек бывает очень непросто, особенно если не сформировалась культура этой поддержки.
Обновления: эту схему без простоя обновить практически нереально. Нет единого стандарта доставки кода. Я видел статейку на медиуме, где аналитики сбрасывали на FTP’шку код DAG’ов, а оттуда rsync’ом оно разливалось по всем нодам. Гигантский простор для костыльного велосипедостроения.
Изоляция DAG’ов. Любой таск любого DAG’а можно запустить на любой ноде, и они могут, например, помешать друг другу, взять какие-то данные друг от друга, которые могут быть секретными.
Управление ресурсами. В рамках этой схемы мы не можем ограничить ресурсы у какого-то таска, какого-то DAG’а, он может съесть ресурсы всей ноды. Плюс какой-то DAG может наплодить тысячу и один таск и поставить нашу систему в нерабочее состояние.
Мониторинг. В рамках этой схемы мы фактически можем следить только за длительностью выполнения task’ов и за длительностью выполнения нашего DAG’а. Заглянуть внутрь, определить, сколько ресурсов кто ест, очень и очень сложно.
Проблемы Airflow bare metal celery executor
Мы решили поступить другим образом: развернуть Airflow в Kubernetes. Причём у Airflow есть kubernetes executor — нативная интеграция с этой платформой «из коробки».
Мы спроектировали нашу систему таким образом: в Kubernetes установили Airflow control plane и нарезали неймспейсы для наших команд. Мы изначально пошли по пути создания multitenant-системы — одного большого коммунального Airflow.
По нашей задумке DAG’и команд запускаются в выделенных namespace’ах и таким образом изолированы друг от друга (на каждый таск DAG’а запускается свой под).
Особенности процессов в банке
Прежде чем обсуждать возникшие перед нами задачи, я хотел бы дать немного контекста внутренних процессов банка. Мы банк, и мы работаем с такими данными, которые подпадают под множество законодательных актов. В общем случае потерять данные для нас предпочтительнее, чем их скомпрометировать.
Поэтому у нас есть несколько строго изолированных друг от друга контуров для разработки и эксплуатации. И мы можем передавать артефакты строго однонаправленно между этими контурами.
Причём процедура передачи сопровождается набором мероприятий, которые называются приёмо-сдаточные испытания, в ходе которых мы подтверждаем, что наше решение действительно соответствует стандартам банка. Эта схема повлияла на то, что мы делали.
Задачи внедрения Airflow kubernetes executor
Установка Airflow
Мы пошли в Интернет и нашли два самых популярных Helm Chart: чат от разработчиков Airflow и community-чарт. И, заглянув внутрь, мы нашли там всё то, за что мы так не любим Helm. Куча какого-то спагетти-yaml-тимплейтинга.
Мы пошли по следующему пути: на основе официального чарта сгенерировали yaml под наш случай и на их основе по-быстрому сделали свой чарт, который подходил конкретно под наши условия. Это подход, который в будущем сэкономил нам очень много сил и очень много времени.
Разделение ресурсов
Здесь мы нашли несколько возможностей:
1. Template pod’ов. То есть в kubernetes executor есть шаблон, на основе которого Airflow создаёт поды task’ов, и в нём мы указали реквесты и лимиты по умолчанию.
По нашей задумке разработчики в своих DAG’aх могли бы переопределять это на уровне кода, если потребуется.
И следующий момент. Поскольку мы делали multitennant-систему, то развели наши команды по разным namespace’ам и описали в них по два объекта: resourceQuota, чтобы ограничить общее потребление ресурсов какой-то конкретной командой, и LimitRange, который не позволил бы разработчикам сбросить реквесты и лимиты или выставить их в рискованные значения.
К сожалению, эта схема имела свой сайд-эффект: разработчики должны явно указывать namespace своей команды в коде DAG’а. Эту вещь мы хотели контролировать на этапе приёмо-сдаточных испытаний.
Мы собрали нашу схему, задеплоились, и она не заработала. Мы пошли внутрь кода Airflow, благо это опенсорс, и увидели там вот такую строчку.
То есть изменения, которые мы передаём в DAG’ах, имеют более низкий приоритет, чем остальные, поэтому изменения в коде DAG’ов не работали. Мы пропачили эту строчку, отправили PR в github Airflow, его там около двух лет не принимали, в итоге пару лет с каждой новой версией мы патчили этот файлик и заменяли его в базовом образе.
Мониторинг
Прекрасный мониторинг мы получили «из коробки»: на каждый таск каждого DAG’а запускается свой pod, рабочие параметры которого (потребление памяти и процессора, например) мы можем видеть на мониторинге без дополнительных усилий. Кроме того, мы можем делать разные интересные вещи типа подсадить APM внутрь контейнера и смотреть попроцедурно, где наши аналитики написали неоптимальный код.
Доставка кода DAG’ов
Код DAG’ов фактически должен находиться в каждом из наших контейнеров, и здесь было два пути. Первый путь — «запечь» в контейнеры наши DAG’и. Этот подход показался нам достаточно инертным и негибким.
Второй путь — доставить код в контейнеры отдельным процессом. В официальном Helm-чарте для этих целей используется git-sync, который периодически может скачивать с git’а код и отдавать его в поды Airflow.
git-sync может работать в двух вариантах: для долгоживущих подов по Airflow Scheduler может работать sidecar-контейнером, для подов тасков DAG’ов он может работать init-контейнером и запускаться перед тем, когда стартует основной процесс.
Общее хранилище данных
Как говорилось выше, для совместной работы нужен сетевой диск, который должен быть смонтирован в каждый pod.
Мы решили не мудрствовать лукаво, а пошли и заказали в нашей корпоративной хранилке NFS-диск, смонтировали его на каждую ноду и прописали в pod’ах hostPath.
Обеспечение безопасности
Airflow работает неавтономно, он работает с данными, и источники этих данных — фактически всё, что у вас есть в рамках вашего предприятия. Для того же, чтобы иметь туда доступ, нам нужны учётные данные.
В Airflow есть механизм по умолчанию, который называется connections.
В веб-интерфейсе Airflow мы можем заводить эти учётные данные, причём это сделано достаточно защищённо: в системе используется «Femet key», который есть только в рантайме. Он шифрует данные перед тем, как они попадают в базу.
Но этот ключ — общий на экземпляр, а у нас команды изолированы, и мы не хотели, чтобы какие-то соседние команды использовали секреты друг друга. Защита, построенная на том, что кто-то просто не знает имя connection’а, — это плохая защита. Поэтому мы решили поступить следующим образом: мы создали Kubernetes-secret’ы в каждом неймспейсе у каждой команды, и разработчики DAG’ов могут монтировать их в коде DAG’а, если возникнет такая необходимость.
Разделение прав пользователей
Airflow легко интегрируется с LDAP/Active Directory. Но в Airflow — своя ролевая модель. Есть способ сделать «mapping» групп active derectory в группы Airflow, которые мы нарезали для команд.
Но здесь опять вышел побочный эффект: разработчики должны были прописывать права на DАG’и в коде для своей команды.
Это мы тоже собирались контролировать на уровне приёмо-сдаточных испытаний.
Организация разработки
Я думаю, понятно, что особенности работы kubernetes executor и подход к работе с секретными данными очень сильно мешают использовать Airflow локально. Поэтому нам, кроме прода, нужен был отдельный стенд разработки. Мы его сделали в изолированном дев-контуре. Кроме того, мы нарезали git-репозитории для DAG’ов в каждой команде(отдельный для dev и для prod). Мы точно так же не хотели, чтобы команды могли смотреть/изменять/переиспользовать/ломать код друг друга.
Для деплоя мы сделали один большой репозиторий. Процесс доставки кода происходил следующим образом: аналитики пушили свои DАG’и в git-репозитории своих команд. Teamcity (наша основная CI/CD-система) видела это и запускала пайплайн, который обновлял git submodules (которыми были подключены эти репозитории) в основном репозитории, и уже этот репозиторий пулил git-sync в Airflow.
Мы запустились в этой конфигурации, и на начальном этапе всё было хорошо. А потом пришли проблемы роста.
Проблемы «коммунального» Airflow и их решение
Airflow control plane получился очень громоздким и высоконагруженным. Было реально страшно что-то делать, т. к. боялись уронить всю вот эту конструкцию.
Сложно переносить изменения. В каждой команде был репозиторий для дев, репозиторий для прод, и, поскольку это многопользовательские репозитории, история комитов в них состояла из fix-fix-fix, когда разработчики просто забрасывали туда какие-то маленькие изменения для тестирования. Собрать всё это воедино для переноса на продакшен часто было нетривиальной задачей. И у нас из-за этого начал накапливаться environment drift, когда дев-репозиторий сильно отличался от прод-репозитория.
Общая хрупкость этой схемы: git submodules — это очень хрупкий механизм, который легко ломается. Например, были случаи, когда разработчики в свои git-репозитории для DAG’ов пытались подключить какие-то другие сабмодули, и эти транзитивные сабмодули легко ломали основной репозиторий.
Кроме того, Airflow оказался тоже достаточно хрупкой системой, которую можно положить неправильно написанным кодом или каким-то левым файликом. Допустим, аналитики приносили в репозиторий pkl-файлы весов для моделей, какие-то excel-файлы как шаблоны для отправки по почте. И это ломало sheduler, и это было очень сложно отдебажить.
Airflow на команду
И мы решили: а давайте распилим этот монолит. То есть уйдём от концепции, когда у нас есть общий коммунальный Аirflow. Взамен мы сделаем выделенный экземпляр Airflow для каждой команды. Разработчикам не нужно будет прописывать неймспейс и права в DAG’ах, неправильным комитом они смогут сломать только свой экземпляр. В итоге каждый экземпляр по отдельности будет более лёгким и стабильным.
У нас были следующие задачи по распилу монолита:
1. Общее хранилище логов и данных. В парадигме коммунального Airflow у нас был общий NFS-волюм, который был подключён на все ноды. Мы решили, что не будем изобретать велосипед, пошли в сторону выделения своего волюма для каждой команды и решили сделать это через механизм CSI-плагинов.
Мы нашли Kubernetes NFS Subdir External Provisioner, который из одного NFS-диска, автоматически нарезая в нём папочки, может создавать и монтировать PV в pod’ы. В дальнейшем нам это очень сильно помогло. Сейчас мы делаем новую платформу данных, в которой сетевые диски будут на базе longhorn. И переписывать код Helm Chart’а с таким подходом нам не пришлось.
2. Доставка кода DAG’ов. Git-репозиторий в прошлой парадигме, когда в каждом pod’е есть свой git-sync, стал единой точкой отказа. Если git лежит, то DAG’и не запускаются. Поэтому мы решили использовать существующий общий NFS: сделали один выделенный git-sync, который скачивал бы туда код DAG’ов в определённую папку, куда смотрели бы все запускающиеся pod’ы.
3. Универсальный Helm Chart. Нам нужно было переписать Helm Chart, который деплоил наш коммунальный Airflow на создание выделенного экземпляра на команду. В этом плане нам очень сильно помог подход с созданием собственного чарта на основе yaml-файлов, сгенерированных из официального: не нужно было бороться с кучей лишнего кода, покрывающего ненужный нам функционал.
4. Типовой деплой.
У нас в качестве CI-системы — TeamCity, и мы используем её в парадигме экранных форм, то есть «накликиваем» наши пайплайны.
Нам хотелось бы не «накликивать» их каждый раз, а сделать какую-то единую точку, из которой мы могли бы унифицированно создавать и управлять изменениями во всех пайплайнах. В TeamCity есть такой механизм, как build configuration template.
Это шаблон, создав пайплайн по которому мы можем практически только переопределить имя инстанса, который хотим задеплоить в этом пайплайне, а всё остальное сделается «магией» Хелма.
Проблемы схемы Airflow на команду
Для разработки мы сделали схему, когда у нас один репозиторий: в dev у нас идёт ветка develop, а в prod идёт main.
На самом деле это несильно решило проблему истории комитов, потому что всё равно при многопользовательской разработке история в ветке develop — это фикс-фикс-фикс. Сложно переносить, накапливается environment drift. В итоге у нас возникла следующая идея: а что если нам устанавливать Airflow не на команду, а на задачу?
Согласно нашей задумке разработчик получает задачу в Jira запилить какую-то фичу, форкает репозитории с DAG’ами в фича-ветку, и на эту фича-ветку поднимается свой экземпляр Airflow. Любой другой разработчик со своим таском форкается, поднимается другой экземпляр, таким образом они друг другу не мешают.
Они смогут спокойно всё отладить, сделать внятный pull-реквест, отослать его тимлиду на ревью. А когда тимлид примет PR, мы просто сносим экземпляр, соответствующий задаче, которая сделана, и на освободившихся ресурсах поднимается экземпляр, от которого зависит уже последующая задача.
Реализация схемы Airflow на задачу
Прежде всего нужно было сделать выкат feature-окружений, и на удивление изменений для этого случилось достаточно мало. Все нужные переменные уже содержатся в свойствах пайплайна. Фактически нужно просто определить веточку, куда форкнулся наш разработчик, на основе её имени сформировать namespace, куда мы деплоим наш экземпляр, и в git-sync поправить веточку, с которой он будет брать изменения.
Следующий момент — создание connections и variables. Поскольку мы распилили наш Airflow, проблема использования connections соседними командами ушла. Connections — это очень удобный вариант для разработчиков. Поэтому мы сделали следующим образом: connections и variables в Airflow очень легко экспортируются
Мы просто сделали какой-то эталонный файлик с connections, который зашифровали, положили в гит и накатываем в пайплайне при создании нового окружения.
И последнее — удаление окружения после мержа. У нас для хранения кода используется Bitbucket. К сожалению TeamCity не может понимать, что ветка в Bitbucket удалена или пул-реквест смержен. После вдумчивого чтения документации в Bitbucket был найден механизм вебхуков.
То есть при появлении нужного события Bitbucket сам может триггернуть наш TeamCity, чтобы она что-то сделала. В TeamCity приходит сообщение в формате json, которое достаточно просто распарсить, посмотреть имя ветки, действия, которое над ней совершается, и если это удаление, то просто снести наше окружение, которое создано по этой ветке. Достаточно тривиальная операция.
Выводы
Прежде всего мы сделали правильно, пойдя с Airflow в Kubernеtes. То есть «из коробки» мы уже получили все преимущества по управлению инфраструктурой, которую даёт эта платформа.
На основе механизмов нативной интеграции Airflow с Kubernеtes и на основе внутренних механизмов Airflow мы сделали достаточно хорошую коммунальную платформу для наших разработчиков, которая в принципе удовлетворяла наши потребности, но была недостаточно гибкой и предполагала следование определённым стандартам при написании DAG’ов, которые было сложно контролировать (человеческий фактор).
Но, спустившись чуть ниже — на уровень платформы Kubernetes, на уровень DevOps-инструментов и тулинга вокруг экосистемы Kubernetes, — мы смогли создать гораздо более удобную, гибкую и безопасную платформу для наших аналитиков.
Комментарии (3)
Loggus66
16.08.2024 12:02+1Вы не рассказали про опыт. Кроме одного патча, который вы подменяете в Airflow, всё стандартно. Как насчёт проблем?
1) Поды падают во время выполнения, перерабатывая текст в csv мегов на 20 (OOMKilled). Разработчик клянётся, что там нечему жрать память. Поднимаю память, запуская процесс снова и снова. На трёх гигах ОЗУ Pandas наконец-то начинает шевелиться и отрабатывает. Наказаний за такой жрущий код нет, подразумевается "если такой умный - покажи, как надо". Хотя перерабатывается в CSV по сути pg dump, надо всего лишь заглянуть в доку и написать copy to stdout format CSV, например.
2) Некоторые операторы - самописные и декларация ресурсов в DAG не делает ничего, внутри них просто нет кода для обработки этого, но при этом внутри шага с оператором запускается тот же Pandas. Стискиваю зубы и поднимаю в worker template лимиты до 2ГБ вообще для всех.
3) Поды начинают падать до начала выполнения, но пишут лог в s3. Оказывается, запускать сразу много подов в полночь чревато, они начинают толпиться и некоторые умирают в давке, потому что узлы не могут сразу принять все поды, когда они требуют по 2ГБ на каждую задачу внутри DAG. Поднимаю worker_pods_pending_timeout до трёх часов, чтобы когда-нибудь да выполнились.
4) Поды вроде бы падают неизвестно когда, но ничего не пишут в s3. Airflow отображает "falling back to local log" и 404 - пода уже нет. О, это самый интересный случай, нетривиальный.
Стал собирать вывод подов в лог каждую минуту, посмотреть на статус. OutOfMemory. Не OOMKilled, а OutOfMemory. Оказывается, ошибка характерна для k8s 1.22, и это наша версия, обновить нельзя, legacy на последнем издыхании, съезжать тоже - свои причины. За неделю удалось уговорить проверить идею не запускать все поды в одну секунду. Помогло.
5) Поды падают с ошибкой upstream_failed. Оказалось, что если родительская задача падает, то в k8s executor следующая задача прекращает свою работу, когда срабатывает worker_pods_pending_timeout для неё, потому что момент начала работы (видимо) начинается с ожидания успешного выполнения родительской задачи.
https://github.com/apache/airflow/discussions/18395
https://stackoverflow.com/questions/73852012/airflow-task-improperly-has-an-upstream-failed-status-after-previous-task-succ
Но тут я не уверен, самый неисследованный случай, который просто перестал происходить, когда разобрались с историями 3 и 4.
Съел я на нём уже пуд соли, очень хочу обратно на Celery Executor на ВМ. Там и памяти завались, и процессы запускаются без контейнеров и вышеперечисленных органичений. А на бумаге хорошо, да.
evgeny_vasyuk
Здравствуйте,
Сспасибо за статью. Я не совсем понял как вы решаете вопрос установки новых Python модулей. Через обновление имеджей или нашли более удобный способ?
seasadm Автор
Самый правильный способ обновлять модули это обновление имиджей. К сожалению, так получается не всегда и некоторые слишком большие библиотеки типа хадуповских парселей мы всё-таки располагаем на нодах и монтируем по hostpath.