Предыдущие: ч.1 Основы и расписания, ч.2 Операторы и датчики ч.3 Проектирование DAG

Оригинальная статья

Обзор

Создание шаблонов - это мощная концепция в Airflow для передачи динамической информации в экземпляры задач во время выполнения. Например, предположим, что вы хотите выводить в терминал день недели каждый раз, когда выполняете задачу:

BashOperator(
    task_id="print_day_of_week",
    bash_command="echo Today is {{ execution_date.format('dddd') }}",
)

В этом примере значение в двойных фигурных скобках {{ }} - это наш templated (т.е. шаблонный) код, который будет вычислен во время выполнения. Если мы выполним этот код в среду, BashOperator выведет “Сегодня среда”. Создание шаблонов важно во многих случаях. Например, мы можем использовать шаблоны для создания нового каталога, названного по дате выполнения задачи, для хранения ежедневных данных (например, /data/path/20210824), или выбрать определенный раздел (например, /data/path/гггг =2021/мм =08/дд =24), чтобы читать соответствующие данные для каждой заданной даты.

Airflow использует в качестве движка шаблонов Jinja, - фреймворк шаблонов на Python, В этом руководстве мы рассмотрим, как применять шаблоны Jinja в вашем коде, в том числе:

  • Какие переменные и функции доступны при создании шаблонов

  • Какие поля оператора могут быть шаблонными, а какие нет

  • Как проверять (валидировать) шаблоны

  • Как применять пользовательские переменные и функции при создании шаблонов

  • Как преобразовать шаблоны в строки и в код Python

Переменные среды выполнения в Airflow

Создание шаблонов в Airflow работает точно так же, как создание шаблонов с помощью Jinja в Python: разместите свой код, подлежащий вычислению, между двойными фигурными скобками, и выражение будет вычислено во время выполнения. Как мы видели в предыдущем фрагменте кода, execution_date - это переменная, доступная во время выполнения.

Airflow включает в себя множество переменных, которые можно использовать для создания шаблонов. Вот некоторые из наиболее часто используемых:

  • execution_date - Дата и время (datetime) начала интервала запуска DAG

  • ds - execution_date в формате “2021-08-27”

  • ds_nodash - execution_date, отформатирована как “20210827”

  • next_ds - execution_date следующего выполнения (= конец текущего интервала) даты и времени datetime

Для получения полного списка всех доступных переменных обратитесь к документации Apache Air flow.

Поля и скрипты шаблонов

Шаблоны не могут быть применены ко всем аргументам оператора. Два атрибута в BaseOperator определяют ограничения на создание шаблонов:

  • template_fields: Определяет, какие поля являются шаблонными

  • template_ext: Определяет, какие расширения файлов могут быть шаблонизированы

Давайте рассмотрим простой вариант запуска BashOperator::

class BashOperator(BaseOperator):
    template_fields = ('bash_command', 'env')  # defines which fields are templateable
    template_ext = ('.sh', '.bash')  # defines which file extensions are templateable

    def __init__(
        self,
        *,
        bash_command,
        env: None,
        output_encoding: 'utf-8',
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bash_command = bash_command  # templateable (can also give path to .sh or .bash script)
        self.env = env  # templateable
        self.output_encoding = output_encoding  # not templateable

template_fields содержит список атрибутов, которые могут быть шаблонизированы. Вы также можете найти этот список в документации Airflow или в пользовательском интерфейсе Airflow, если у вас запущена задача, в разделе Instance Details —> template_fields::

template_ext содержит список расширений файлов, которые могут быть прочитаны и шаблонизированы во время выполнения. Например, вместо предоставления команды Bash для bash_command вы могли бы предоставить скрипт .sh::

run_this = BashOperator(
    task_id="run_this",
    bash_command="script.sh",  # .sh extension can be read and templated
)

Эта задача считывает содержимое script.sh , создает его шаблон и выполняет его:

# script.sh
echo "Today is {{ execution_date.format('dddd') }}"

Создание шаблонов из файлов упрощает разработку (особенно когда ваши скрипты становятся все больше), поскольку среда IDE может применять к скрипту подсветку синтаксиса, зависящую от конкретного языка. Это было бы невозможно, если бы ваш скрипт был определен как большая строка в коде Airflow.

По умолчанию Airflow выполняет поиск script.sh относительно каталога, в котором определен файл DAG. Если ваш DAG хранится в /path/to/dag.py , а ваш сценарий - в /path/to/scripts/script.sh , нужно установить значение bash_command равным scripts/script.sh .

При желании, дополнительными “путями поиска” можно управлять на уровне DAG с помощью аргумента template_searchpath:

with DAG(..., template_searchpath="/tmp") as dag:
    run_this = BashOperator(task_id="run_this", bash_command="script.sh")

Код выше определяет, что вы можете сохранить свой скрипт Bash в /tmp, и Airflow найдет его.

Проверка (валидация) шаблонов

Выходные данные шаблонов можно проверить как в пользовательском интерфейсе Airflow, так и в CLI. Одним из преимуществ CLI является то, что вам не нужно запускать какие-либо задачи, прежде чем увидеть результат.

Команда Airflow CLI airflow tasks render отображает все шаблонные атрибуты данной задачи. Получая dag_id, task_id и условную execution_date,, команда выводит что-то вроде этого:

$ airflow tasks render example_dag run_this 2021-01-01

# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "Today is Friday"

# ----------------------------------------------------------
# property: env
# ----------------------------------------------------------
None

Чтобы эта команда сработала, Airflow должен иметь доступ к специальному хранилищу (metastore). Для достижения этой цели вы можете быстро настроить локальное хранилище на основе SQLite:

cd [your project dir]
export AIRFLOW_HOME=$(pwd)
airflow db init  # generates airflow.db, airflow.cfg, and webserver_config.py in your project dir

# airflow tasks render [dag_id] [task_id] [execution_date]

Для большинства шаблонов этого будет достаточно. Однако, если в логике создания шаблонов доступны какие-либо внешние системы (например, переменная в вашем хранилище), у вас должно быть подключение к этим системам.

В пользовательском интерфейсе Air flow вы можете просмотреть результат атрибутов шаблона после выполнения задачи. Нажмите в экземпляре задачи на кнопку "Rendered", чтобы увидеть результат:

При нажатии на эту кнопку отображается вид шаблона и выходные данные атрибутов шаблона:

Использование пользовательских функций и переменных в шаблонах

Как показано выше, у нас есть несколько переменных (например, execution_date и ds), доступных во время создания шаблонов. По разным причинам, например, с точки зрения безопасности, среда Jinja отличается от среды выполнения Airflow. Вы можете рассматривать среду Jinja как очень урезанную среду Python. Это, среди прочего, означает, что модули не могут быть импортированы. Например, этот код не работает в шаблоне Jinja:

from datetime import datetime

BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ datetime.now() }}",  # raises jinja2.exceptions.UndefinedError: 'datetime' is undefined
)

Тем не менее, можно внедрить функции в вашу среду Jinja. В Airflow для создания шаблонов имеется несколько стандартных модулей Python под названием “макросы”. Например, приведенный выше ошибочный код можно исправить с помощью macros.datetime::

BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ macros.datetime.now() }}",  # It is currently 2021-08-30 13:51:55.820299
)

Для получения полного списка всех доступных макросов обратитесь к документации Apache Airflow.

Помимо предоставленных функций, вы также можете использовать самостоятельно определенные переменные и функции в своих шаблонах. Airflow обеспечивает удобный способ введения их в среду Jinja. Допустим, в нашем DAG мы хотим вывести количество дней с 1 мая 2015 года и написали для этого удобную функцию:

def days_to_now(starting_date):
    return (datetime.now() - starting_date).days

Чтобы использовать это внутри шаблона Jinja, вы можете передать словарь в user_defined_macros в DAG:

def days_to_now(starting_date):
    return (datetime.now() - starting_date).days


with DAG(
    dag_id="demo_template",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    user_defined_macros={
        "starting_date": datetime(2015, 5, 1),  # Macro can be a variable
        "days_to_now": days_to_now,  # Macro can also be a function
    },
) as dag:
    print_days = BashOperator(
        task_id="print_days",
        bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}",  # Call user defined macros
    )
    # Days since 2015-05-01 00:00:00 is 2313

Также можно внедрять функции в качестве фильтров Jinja, используя user_defined_filters. Вы можете использовать фильтры в качестве операций конвейера (pipe). В следующем примере выполняется та же работа, что и в предыдущем примере, только на этот раз с использованием фильтров:

with DAG(
    dag_id="bash_script_template",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    user_defined_filters={"days_to_now": days_to_now},  # Set user_defined_filters to use function as pipe-operation
    user_defined_macros={"starting_date": datetime(2015, 5, 1)},
) as dag:
    print_days = BashOperator(
        task_id="print_days",
        bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}",  # Pipe value to function
    )
    # Days since 2015-05-01 00:00:00 is 2313

Функции, внедряемые с помощью user_defined_filters и user_defined_macros доступны для использования в среде Jinja. Хотя они дают тот же результат, мы рекомендуем использовать фильтры, когда вам нужно импортировать несколько пользовательских функций, поскольку формат фильтра улучшает читаемость вашего кода. Вы можете убедиться в этом, сравнивая два метода:

"{{ name | striptags | title }}"  # chained filters are read naturally from left to right
"{{ title(striptags(name)) }}"  # multiple functions are more difficult to interpret because reading right to left

Рендеринг нативного кода Python

По умолчанию шаблоны Jinja всегда отображаются в виде строк Python. Это нормально почти во всех ситуациях в Airflow, но иногда желательно отображать шаблоны в нативном коде Python. Если код, который вы вызываете, не работает со строками, у вас появляются проблемы. Давайте рассмотрим пример:

def sum_numbers(*args):
    total = 0
    for val in args:
        total += val
    return total

sum_numbers(1, 2, 3)  # returns 6
sum_numbers("1", "2", "3")  # TypeError: unsupported operand type(s) for +=: 'int' and 'str'

Рассмотрим сценарий, в котором вы передаете список значений этой функции, вызывая DAG с конфигурацией, содержащей некоторые числа:

with DAG(dag_id="failing_template", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
    sumnumbers = PythonOperator(
        task_id="sumnumbers",
        python_callable=sum_numbers,
        op_args="{{ dag_run.conf['numbers'] }}",
    )

Мы хотим запустить DAG со следующим JSON для конфигурации запуска DAG:

{"numbers": [1,2,3]}

Отображаемое значение будет представлять собой строку. Поскольку функция sum_numbers распаковывает заданную строку, она в конечном итоге пытается сложить каждый символ в строке:

('[', '1', ',', ' ', '2', ',', ' ', '3', ']')

Это не сработает, поэтому мы должны сказать Jinja вернуть собственно список Python вместо строки. Jinja поддерживает это с помощью окружения. Среда Jinja по умолчанию выводит строки, но мы можем настроить нативную среду, которая отображает шаблоны как нативный код Python.

Поддержка Jinja NativeEnvironment была добавлена в Airflow 2.1.0 с помощью аргумента render_template_as_native_obj в классе DAG. Этот аргумент принимает логическое значение, которое определяет, следует ли отображать шаблоны в среде Jinja по умолчанию или NativeEnvironment. Например:

def sum_numbers(*args):
    total = 0
    for val in args:
        total += val
    return total


with DAG(
    dag_id="native_templating",
    start_date=datetime.datetime(2021, 1, 1),
    schedule_interval=None,
    render_template_as_native_obj=True,  # Render templates using Jinja NativeEnvironment
) as dag:
    sumnumbers = PythonOperator(
        task_id="sumnumbers",
        python_callable=sum_numbers,
        op_args="{{ dag_run.conf['numbers'] }}",
    )

Передача той же конфигурации JSON {"numbers": [1,2,3]} теперь отображает список целых чисел, которые функция sum_numbers обрабатывает правильно:

[2021-08-26 11:53:12,872] {python.py:151} INFO - Done. Returned value was: 6

В заключение отметим, что среда Jinja должна быть настроена на уровне DAG. Это означает, что все задачи в DAG визуализируются либо с использованием среды Jinja по умолчанию, либо с использованием NativeEnvironment.

В завершение

В итоге, вы можете использовать шаблоны для придания вашим задачам определенного поведения путем вычисления шаблонных выражений во время выполнения. Есть несколько моментов, которые следует помнить при создании шаблонов в Airflow:

Airflow предоставляет несколько удобных настроек для Jinja, движка шаблонов Python
Во время выполнения доступно несколько переменных, которые можно использовать при создании шаблонов (см. документацию)

  • template_fields и template_ext определяют, что является шаблонами для оператора

  • В среду Jinja внедрено несколько библиотек Python (см. документацию).

  • Пользовательские переменные и функции могут быть введены с помощью user_defined_filters и user_defined_macros

  • По умолчанию шаблон Jinja возвращает строки, но нативный код Python можно вернуть, установив render_template_as_native_obj=True в вашем DAG

  • Для получения информации обо всех функциях создания шаблонов Jinja обратитесь к документации Jinja

Комментарии (0)