Всем привет, на связи IT-сообщество Газпромбанка, и меня зовут Павел. Я, у нас в банке, занимаюсь разработкой на языке программирования Python, и уже больше года, создаю загрузчики внешних данных для нашей DataFactory – внутрибанковской платформы больших данных.

DataFactory это большое количество разнообразных данных, которые использует бизнес чтобы делать всем хорошо. А мы, нашим департаментом, отвечаем за их своевременное наполнение, поддержание в жизнеспособном состоянии, доведение данных до требуемого состояния и регулярную полноценную доступность.

До того, как я пришел в Газпромбанк, доставка данных осуществлялась полноценным зоопарком разномастных загрузчиков. Часть написана на Java, часть на голом Python. Поддерживать и развивать такой зоо-загрузчико-парк становилось всё сложнее и руководство устремило свой взгляд в сторону Apache Airflow. В итоге, мы засучили рукава, и как Урфин Джюс боролся с сорняками, приступили к построению новой реальности…

Теперь все новые загрузки мы пишем на Python под Airflow. В данный момент используем версию 2.6.1. Мы следим за выходами новых версий Airflow и стараемся регулярно обновлять свои стенды.

Наши загрузчики, как пронырливые ласки, должны уметь лазать в разные норы и доставать, и доставлять, необходимое. Мы ходим за готовыми файлами, по необходимости, извлекаем файлы из любых архивов, мы забираемся в почтовые ящики и выковыриваем файлы из вложений писем, берем файлы из SAMBA-папок. Ходим в REST API за полезными JSON и XML. Морально готовы при необходимости пойти/полезть в общем-то куда угодно… Конечно туда, куда нам разрешит наш Департамент информационной безопасности.

Загрузка данных через Apache Airflow DAG

Основная структурная единица в Airflow это DAG – направленный ацикличный граф, который позволяет собирать отдельные задачи в полноценные процессы. Задачи можно выполнять последовательно, устанавливая разные правила перехода между ними. Можно запускать задачи параллельно, особенно удобно делать это внутри кластера Kubernetes, когда каждая параллельная задача оформляется в виде отдельного k8s PODа. К слову, и сам Airflow, у нас крутится тоже внутри кубер-подов.

Каждая загрузка, в общем-то, уникальна, но есть общие правила. Например, каждой из них необходим набор конфигурационных параметров. Такие параметры мы храним в Airflow Variables – эта структура позволяет сохранить произвольный JSON и получить к нему доступ, как к python-словарю внутри любой задачи работающей загрузки. Название Airflow Variables, в нашем случае, совпадает с DAG_ID – так мы фиксируем соответствие конфигурации DAGу.

Удобство Airflow Variables состоит в том, что интерфейс Airflow имеет встроенный редактор для таких JSON. Неудобство же, в том, что чтобы изменить какой-то параметр, с целью специфического запуска загрузки, приходится править JSON – но главное, не забыть все потом вернуть назад. А еще, такое полученное значение нужно отдельно обрабатывать внутри кода задач(-и) DAGa. Мы так не делаем.

Если единожды требуется запустить DAG со специфическими параметрами, то в Airflow предусмотрена возможность «Запуск DAGа с параметрами». По сути, мы получаем возможность, передать такой же JSON при отдельном ручном запуске. В ходе работы DAGа, каждая его задача, может получить доступ к этим значениям как к словарю и корректировать своё поведение.

Выглядит этот так:

Откроется новый экран, в котором

Практично, удобно, хорошо… Но как и во всём – есть нюанс…

По сути, словарь полученный при ручном запуске DAGа и словарь полученный из соответствующей Airflow Variable это какие-то настройки нашего DAGa – по крайней мере, мы их используем для этих целей.

Словарь из Airflow Variable содержит какие-то постоянные значения, которые, как правило, один раз бетонируются, и потом, меняются довольно редко. Своего рода setup.ini для реализованного процесса. Словарь из ручного запуска используется несколько иначе. Это возможность оперативного вмешательства в работу процесса. Например, большинство загрузок данных выполняются регулярно, как правило, раз в сутки в своё время. При запуске, загрузка обращается источнику данных, и поскольку её режим работы каждодневный, берёт данные, например, за вчера.

В ходе эксплуатации, особенно на этапе запуска, а особенно отладки, возникает необходимость скорректировать глубину захвата данных. Изменить «вчера» на «позавчера» или на «10 дней от сегодня».

Вариантов несколько, самый жуткий, это лезть в хард-код и хард-кодить потея пальцами. Можно вынести нужный параметр в конфигурационную Airflow Variable, а потом не забыть убрать из конфига, и может и из кода. А если такой параметр не предусмотрен внутри Airflow Variable, то для его обработки еще придется дописать код. Мы примерно так и делали, до одного момента…

date_from, date_to = vars_from(now, None)

Когда вот это вот всё стало удручать своей безысходностью, запустилось фоновое размышление, которое в итоге выдвинуло такую гипотезу. Вот бы, было бы, хорошо уметь менять значения переменных внутри работающего кода простым способом.

Значение переменной внутри блока кода должно быть назначено по следующему сценарию: сначала посмотреть, а нет ли переменной с таким именем внутри словаря из ручного запуска DAGa – если есть, взять значение, если нет – то посмотреть, а нет ли переменной с таким именем внутри Airflow Variable – и снова по уже озвученному сценарию. И наконец, если нигде такой переменной не обнаружилось, то присвоить значение из заданных в коде параметров.

Сначала казалось, что этого не сделать без продажи душ и черной магии. Души удалось сохранить, но без магии не обошлось. В итоге появилась возможность использовать такие конструкции:

POLLS_COUNT_ON_PAGE: int = 50
POLLS_DATE_FROM_DEFAULT: str = "1d"

count_on_page, date_from = vars_from(POLLS_COUNT_ON_PAGE, POLLS_DATE_FROM_DEFAULT/

Работает это следующим образом, при вызове функции vars_from, сначала будет произведен поиск ключа “count_on_page” внутри словаря ручного запуска DAGa - если таковой имеется, затем наличие ключа будет просмотрено внутри Airflow Variable соответствующей DAGу - если таковая имеется. Что значит «ключа “count_on_page”» - названием ключа, в случае использования vars_from становится название переменной или переменных, которые расположены слева от её вызова.

В приведенном примере будет производится поиск 2х ключей, их названия будут соответственно “count_on_page” и “date_from”. Но это еще не всё. Мы должны иметь возможность задать значения переменных, в случае их отсутствия во всех просматриваемых сущностях. Для этого нам поможет *args функции vars_from. Именно в них, мы и задаём значения по умолчанию. В примере выше, этими значениями станут значения констант POLLS_COUNT_ON_PAGE и POLLS_DATE_FROM_DEFAULT соответственно.

Теперь мы готовы проследить весь процесс. Итак, при вызове vars_from из примера, произойдет следующее: ищем ключ “count_on_page” внутри словаря ручного запуска DAGa – ключ найден, его значение присваивается значению переменной “count_on_page” в коде. Переходим к проверкам следующих ключей, если они есть.

Если ключа не оказалось внутри словаря ручного запуска DAGa или словарь пуст (по умолчанию, если не задан при запуске DAGa), то проверка ключа производится для Airflow Variable соответствующей DAGу.

Если и тут его нет, или такой словарь пуст, то переменной присваивается значение из POLLS_COUNT_ON_PAGE – то есть, из значения по умолчанию для переменной count_on_page, которое задано при вызове функции.

Мы получаем простую возможность, влиять извне на работу задач DAGa, не меняя ничего в коде этих задач… По-моему, это гениально!.. :о) О, боги, я не хочу внезапно проснуться

Осталось добавить, что проверка вхождения ключей реализована 3 возможными вариантами. Первый, в строгом соответствии названию переменной. Второй, по её CamelCase интерпретации, и третий через псевдоним. Что касается CamelCase варианта и возможности добавить псевдоним, то это видимо желание «подстелить соломки» и продемонстрировать «что могу» :о) Я в обычной эксплуатации эти возможности не использую, но помню про них.

Хотим еще примеров!

Рассмотрим еще несколько примеров использования:

date_to = vars_from()

Или мы где-то найдем “date_to” (либо его CamelCase вариант “dateTo”) то присвоим его значение переменной, либо date_to станет… равен.. чему?.. Конечно же None – все молодцы, что догадались!

recipients = vars_from(recipients="sql_injection_report_mail")

Здесь пример с псевдонимом. Все примеры, кстати, привожу из реально действующих DAGов наших внешних загрузок. Здесь псевдоним маршрутизирует значение из ключа “sql_injection_report_mail” в переменную “recipients”. А вот более расширенный, и чуть посложнее, пример:

dag_run_conf = {"sqlInjectionReportMail": "blabla", "recipients": "123"}
dag_variable = {"sql_injection_report_mail": "gluk-gluk"}

recipients = vars_from(recipients="sql_injection_report_mail") # assert recipients == "gluk-gluk"

Значение, в итоге, было взято из Airflow Variable соответствующей DAGу, потому что snake_case имеет больший приоритет!

Как мы выяснили ранее, имеется определенный порядок, в соответствии с которым исследуются сущности на предмет наличия ключа. То есть имеет место, приоритет этих сущностей. Приоритет есть и у стиля написания ключа. Если в словаре ручного запуска DAGa или Airflow Variable соответствующей DAGу есть одинаковые значения, то... Рассмотрим пример:

dag_run_conf = {"dateFrom": "blabla"}
dag_variable = {"date_from": "gluk-gluk"}

В этом случае применится значение в snake_case(значение из dag_variable) поскольку, опять же, приоритет snake_case больше. Сейчас смотрю повторно на это всё и начинаю думать, что это какие-то лишние грабли. Если считаете так же, дайте знать в комментариях.

Вот пример чуть заковыристее, угнал его из docstring функции

dag_run_conf = {"dateFrom": "blabla", "date_to": 23}
dag_variable = {"date_from": "gluk-gluk", "b": 16, "date_to": 32}

# внутри другой функции, которая является Airflow Task
a = 13

date_from, date_to, a, b, c = vars_from()

assert date_from == "gluk-gluk" # взят из dag_variable, потомучто snake_case приоритетнее
assert date_to == 23 # потому что dag_run_conf приоритетнее
assert a == 13 # был взят из значения установленного внутри функции вызвавшей vars_from
assert b == 16
assert c is None # не задан в функции до вызова vars_from и не задан в dag_run_conf или dag_variable

«А у вас дым из под капота…»

В этом месте, пытливому читателю уже хочется сказать «парашют-ап и вот мои деньги!» или скорее «довольно болтовни – покажите код». Самая волнительная часть. Пришло время заявленной черной магии.

Да, без неё не обошлось, поэтому функция имеет ровно одну внешнюю зависимость. Для определения названий переменных, вызвавших функцию, используются возможности библиотеки sorcery, которая находится здесь.

Вся магия сводится к следующему участку кода.

frame_info, defaults = defaults[0], defaults[1:]
# кортеж названий переменных, которые стоят слева от вызова этой функции,
# например:
# a, b, c = vars_from() # assert varnames == ('a', 'b', 'c')
varnames: t.Tuple[str] = frame_info.assigned_names(allow_one=True)[0]
# локальные переменные функции вызвавшей эту - f_back 2 раза потому что обращаемся через декоратор
caller_locals: t.MutableMapping[str, t.Any] = inspect.currentframe().f_back.f_back.f_locals

Чтобы стало более понятно, что происходит в мэджик-коде приведу сигнатуру функции vars_from.

@spell
def vars_from(*defaults, **aliases):

Собственно это всё что в ней есть. Кортеж defaults это аналог *args только в другой руке, а декоратор spell помещает в нулевой элемент *defaults ссылку на frame-обьект функции vars_from.

Это самая остросюжетная часть функции vars_from, остальное обыденно и скучно. Текст функции привести полностью, к сожалению, не дозволяет наше ИЬ.

Желаю вам более эластичных DAGов Airflow. Спасибо за внимание!

Комментарии (1)


  1. Taragolis
    08.09.2023 20:59

    "Ничего не понятно, но очень интересно"

    Правильно ли я понял, что вы в условной первой таске в DAGe выполняете всю свою магию и потом передаете в остальные через XCom/XСomArg?

    Если единожды требуется запустить DAG со специфическими параметрами, то в Airflow предусмотрена возможность «Запуск DAGа с параметрами».

    Тогда Вас может неприятно удивить Airflow 2.7+