Первая часть - Все, что вам нужно знать об Airflow DAGs — Основы и расписания
Добро пожаловать в полное руководство по Apache Airflow DAG, представленное командой Astronomer. Эта электронная книга охватывает все, что вам нужно знать для работы с
DAG, от строительных блоков, из которых они состоят, до рекомендаций по
их написанию, динамической генерации, тестированию, отладке и многому
другому. Это руководство, написанное практикующими для практикующих.
Операторы
Операторы являются основными строительными блоками DAG Airflow. Это классы, которые содержат логику выполнения единичной работы.
Вы можете использовать операторы в Airflow, создав их экземпляры в задачах. Задача определяет работу, выполняемую оператором в контексте DAG.
Чтобы просмотреть и выполнить поиск по всем доступным операторам в Airflow, посетите Astronomer Registry. Ниже приведены примеры операторов, которые часто используются в проектах Airflow.
BashOperetor
BashOperator используется для запуска простых bash комманд и выводов на экран типа “Hello World”
.
Github: BashOperator Code
from airflow.operators.bash_operator import BashOperator
t1 = BashOperator(
task_id='bash_hello_world',
dag=dag,
bash_command='echo “Hello World”'
)
Из документации:
Необходимо передавать операторы через env kwargs и использовать двойные кавычки внутри bash_command
, как показано ниже:
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo \"here is the message: '$message'\"",
env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
)
PythonOperetor
PythonOperator вызывает функцию python, определенную ранее в нашем коде. Вы можете передать параметры функции через параметр op_kwargs
. При выполнении этой задачи будет выведено сообщение “Привет от Airflow”
.
Github: Код PythonOperator
from airflow.operators.python import PythonOperator
def hello(**kwargs):
print('Hello from {kw}'.format(kw=kwargs['my_keyword']))
t2 = PythonOperator(
task_id='python_hello',
dag=dag,
python_callable=hello,
op_kwargs={'my_keyword': 'Airflow'}
)
PostgresOperator
Этот оператор выдает инструкцию SQL для базы данных Postgres. Учетные данные для базы данных хранятся в соединении Airflow с именем my_postgres_connection. Если вы посмотрите на код оператора Postgres, он использует PostgresHook для взаимодействия с базой данных.
Github: PostgresOperetor
from airflow.providers.postgres.operators.postgres import PostgresOperator
t3 = PostgresOperator(
task_id='PythonOperator',
sql='CREATE TABLE my_table (my_column varchar(10));',
postgres_conn_id='my_postgres_connection',
autocommit=False
)
Из документации:
При запуске вашего вызываемого объекта Airflow передаст набор аргументов (ключевых слов), которые могут быть использованы в вашей функции. Этот набор kwargs в точности соответствует тому, что вы можете использовать в своих шаблонах jinja. Чтобы это сработало, вам нужно определить **kwargs
в заголовке вашей функции, либо вы можете напрямую добавить аргументы, которые вы хотели бы получить.
Например, с помощью приведенного ниже кода ваш вызываемый объект получит значения контекстных переменных ti
и next_ds
.
С явными аргументами:
from airflow.operators.python import PythonOperator
def my_python_callable(ti, next_ds):
pass
Через kwargs:
from airflow.operators.python import PythonOperator
def my_python_callable(**kwargs):
ti = kwargs["ti"]
next_ds = kwargs["next_ds"]
SSH Operator
Как и BashOperator
, SSHOperator позволяет запускать команды bash, но имеет встроенную поддержку подключения SSH к удаленной машине для выполнения там команд.
Закрытый ключ для аутентификации на удаленном сервере хранится в канале Air flow как my_ssh_connection
. На этот ключ можно ссылаться во всех DAG, поэтому самому оператору нужна только та команда, которую вы хотите выполнить. Этот оператор использует SSHHook для установления ssh-соединения и выполнения команды.
from airflow.providers.ssh.operators.ssh import SSHOperator
t4 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='my_ssh_connection',
command='echo “Hello from SSH Operator”'
)
S3 To RedShifht Operator
S3ToRedshiftOperator загружает данные из S3 в Redshift через команду COPY Redshift-а. Этот оператор является “Оператором передачи” (“Transfer Operator”), он представляет собой тип оператора, предназначенного для перемещения данных из одной системы в другую. В этом случае мы перемещаем данные из S3 в Redshift, используя два отдельных соединения Airflow: одно для S3 и одно для Redshift.
При этом также используется другая концепция - макросы и шаблоны. В параметре s3_key
нотация шаблона Jinja используется для передачи даты выполнения для этого запуска Dag и отформатирована как «строка без тире» (ds_nodash
— предопределенный макрос в Airflow). Он будет искать ключ, в формате my_s3_bucket/20190711/my_file.csv
, с меткой времени, зависящей от времени запуска файла.
Шаблоны можно использовать для установки параметров среды выполнения (например, диапазона данных для вызова API), а также для того, чтобы сделать ваш код идемпотентным (каждый промежуточный файл называется по диапазону данных, который он содержит).`
Github: S3ToRedshiftOperator Code
Хуки (Hooks)
Обзор
Хуки (Hooks) являются одним из основных строительных блоков Airflow. На высоком уровне хук - это абстракция определенного API, который позволяет Airflow взаимодействовать с внешней системой. Хуки встроены во многие операторы, но их также можно использовать непосредственно в коде DAG.
В этом руководстве мы рассмотрим основы использования хуков в Airflow и когда их следует использовать непосредственно в коде DAG. Мы также рассмотрим пример реализации двух разных хуков в DAG.
Примечание: В настоящее время в реестре Astronomer зарегистрировано более 200 хуков. Если для вашего случая подходящего хука еще нет, вы можете написать свой собственный и поделиться им с сообществом!
Основы Hooks
Хуки охватывают API-интерфейсы и предоставляют методы для взаимодействия с различными внешними системами. Поскольку хуки стандартизируют способ взаимодействия с внешними системами, их использование делает ваш код DAG более чистым, легким для чтения и менее подверженным ошибкам.
Чтобы использовать хук, вам обычно требуется только ID подключения для подключения к внешней системе. Более подробную информацию о том, как настроить подключения, можно найти в разделе Управление подключениями в Apache Airflow или в разделе примеров ниже.
Все перехватчики наследуются от класса класса BaseHook, который содержит логику для настройки внешнего соединения с заданным идентификатором соединения. Помимо подключения к внешней системе, каждый хук может содержать дополнительные методы для выполнения различных действий внутри этой системы. Эти методы могут задействовать различные библиотеки Python для этих взаимодействий.
Например, S3Hook
, который является одним из наиболее широко используемых хуков, использует библиотеку boto3
для управления своим соединением с S3.
S3Hook
содержит более 20 методов для взаимодействия с сегментами S3, включая такие методы, как:
check_for_bucket
: Проверяет, существует ли корзина с определенным именем.list_prefixes
: Список префиксов в корзине в соответствии с указанными параметрами.list_keys
: Список ключей в корзине в соответствии с заданными параметрами.load_file
: Загружает локальный файл в S3.download_file
: Загружает файл из папки S3 в локальную файловую систему.
Когда использовать Hooks
Поскольку хуки являются строительными блоками операторов, их использование в Airflow часто абстрагируется от самого DAG. Однако есть некоторые случаи, когда вы должны использовать хуки непосредственно в функции Python в вашем DAG. Ниже приведены общие рекомендации при использовании хуков в Airflow:
Хуки всегда следует использовать поверх ручного взаимодействия с API для подключения к внешним системам.
Если вы пишете пользовательский оператор для взаимодействия с внешней системой, он должен использовать для этого хук.
Если для вашего конкретного случая использования существует оператор со встроенными хуками, то лучше всего использовать оператор вместо настройки хуков вручную.
Если вам регулярно требуется подключиться к API, для которого еще не существует хук, подумайте о том, чтобы написать свой собственный и поделиться им с сообществом!
Пример реализации:
В следующем примере показано, как можно использовать два хука (S3 Hook и Slack-Hook), чтобы извлечь значения из файлов в корзине S3, выполнить их проверку, опубликовать результат проверки в Slack и зарегистрировать ответ Slack API.
В этом случае мы используем хуки непосредственно в наших функциях Python, потому что ни один из существующих операторов S3 не может считывать данные из нескольких файлов в корзине S3. Аналогично, ни один из существующих операторов Slack не может возвращать ответ на вызов Slack API, который вы, возможно, захотите зарегистрировать для целей мониторинга.
Полный исходный код используемых хуков можно найти здесь:
Перед запуском примера DAG убедитесь, что у вас установлены необходимые Airflow провайдеры. Если вы используете Astro CLI, вы можете сделать это, добавив следующие пакеты в свой requirements.txt
:
apache-airflow-providers-amazon
apache-airflow-providers-slack
Далее вам нужно будет настроить подключения к корзине S3 и Slack в пользовательском интерфейсе Airflow.
Перейдите в раздел Admin -> Connections и нажмите на знак «плюс», чтобы добавить новое подключение.
Выберите Amazon S3 в качестве типа подключения для корзины S3 (если тип подключения не отображается, дважды проверьте правильность установки поставщика) и укажите для подключения ID ключа доступа AWS в качестве логина и секретный ключ доступа AWS в качестве пароля (см. Документацию AWS для как получить ваш идентификатор ключа доступа AWS и секретный ключ доступа AWS).
Создайте новое соединение. Выберите Slack Webhook в качестве типа подключения и укажите свой Bot User OAuth Token в качестве пароля. Этот токен можно получить, перейдя в Features > OAuth & Permissions и разрешения на
api.slack.com/apps
.
Приведенный ниже DAG использует Airflow Decorators для определения задач и XCom для передачи информации между ними. Имя корзины S3 и имена файлов, которые считывает первая задача, сохраняются в качестве переменных среды в целях безопасности.
# importing necessary packages
import os
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# import environmental variables for privacy (set in Dockerfile)
S3BUCKET_NAME = os.environ.get('S3BUCKET_NAME')
S3_EXAMPLE_FILE_NAME_1 = os.environ.get('S3_EXAMPLE_FILE_NAME_1')
S3_EXAMPLE_FILE_NAME_2 = os.environ.get('S3_EXAMPLE_FILE_NAME_2')
S3_EXAMPLE_FILE_NAME_3 = os.environ.get('S3_EXAMPLE_FILE_NAME_3')
# task to read 3 keys from your S3 bucket
@task.python
def read_keys_form_s3():
s3_hook = S3Hook(aws_conn_id='hook_tutorial_s3_conn')
response_file_1 = s3_hook.read_key(key=S3_EXAMPLE_FILE_NAME_1, bucket_name=S3BUCKET_NAME)
response_file_2 = s3_hook.read_key(key=S3_EXAMPLE_FILE_NAME_2, bucket_name=S3BUCKET_NAME)
response_file_3 = s3_hook.read_key(key=S3_EXAMPLE_FILE_NAME_3, bucket_name=S3BUCKET_NAME)
response = {'num1' : int(response_file_1), 'num2' : int(response_file_2), 'num3' : int(response_file_3)}
return response
# task running a check on the data retrieved from your S3 bucket
@task.python
def run_sum_check(response):
if response['num1'] + response['num2'] == response['num3']:
return (True, response[‘num3’])
# task posting to slack depending on the outcome of the above check
# and returning the server response
@task.python
def post_to_slack(sum_check_result):
slack_hook = SlackHook(slack_conn_id='hook_tutorial_slack_conn')
if sum_check_result[0] == True:
server_response = slack_hook.call(api_method='chat.post-Message',
json={"channel": "#test-airflow",
"text": f"""All is well in your bucket! Correct sum: {sum_check_result[1]}!"""})
else:
server_response = slack_hook.call(api_method='chat.post-Message',
json={"channel": "#test-airflow",
"text": f"""A test on your bucket contents failed! Target sum not reached: {sum_check_re-sult[1]}""""})
# return the response of the API call (for logging or use downstream)
return server_response
# implementing the DAG
with DAG(dag_id='hook_tutorial',
start_date=datetime(2022,5,20),
schedule_interval='@daily',
catchup=False,
) as dag:
# the dependencies are automatically set by XCom
response = read_keys_form_s3()
sum_check_result = run_sum_check(response)
post_to_slack(sum_check_result)
Приведенный выше DAG выполняет следующие шаги:
Использует декорированный PythonOperator с реализованным вручную
S3Hook
для чтения трех определенных ключей из S3 с помощью методаread_key
. Возвращает словарь с содержимым файла, преобразованным в целые числа.
Используя результаты (1), с помощью второго декорированного PythonOperator для выполняет простую проверку суммы.
Отправляет результат проверки в канал Slack, используя
call
метод Slackbook, и возвращает ответ из Slack API.
Датчики (Sensors)
Датчики (Sensors)
Датчики - это особый вид операторов. Когда они выполняются, они проверяют, выполняется ли определенный критерий, прежде чем разрешить выполнение последующих задач. Это отличный способ заставить ожидать часть задач вашего DAG завершения какого-либо внешнего процесса или выполнения условия.
Чтобы просмотреть и выполнить поиск по всем доступным датчикам Airflow, посетите реестр Astronomer. Возьмем в качестве примера следующий датчик:
S3KeySensor
S3KeySensor проверяет наличие указанного ключа в S3 каждые несколько секунд, пока не найдет его или не истечет время ожидания. Если он найдет ключ, он будет отмечен как успешный и позволит выполнять последующие задачи. Если время ожидания истечет, он завершится сбоем и предотвратит выполнение последующих задач.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
s1 = S3KeySensor(
task_id='s3_key_sensor',
bucket_key='{{ ds_nodash }}/my_file.csv',
bucket_name='my_s3_bucket',
aws_conn_id='my_aws_connection',
)
Параметры датчика
Существуют датчики для многих вариантов использования, например, для проверки базы данных на наличие определенной строки, ожидания определенного времени суток или перехода в режим ожидания в течение определенного промежутка времени. Все датчики наследуются от BaseSensorOperator и имеют 4 параметра, которые вы можете установить для любого датчика:
soft_fail: Установите значение true, чтобы пометить задачу, как пропускаемую (SKIPPED) при сбое.
poke_interval: Время в секундах, в течение которого происходит ожидание между каждой попыткой. Интервал ожидания должен составлять более одной минуты, чтобы исключить слишком большую нагрузку на планировщик.
timeout: время в секундах до истечения времени ожидания задачи и фиксации сбоя.
mode: Как работает датчик. Параметры
{ poke | reschedule }
, по умолчаниюpoke
. При установке наpoke
датчик будет занимать рабочий слот в течение всего времени выполнения (даже между нажатиями). Используйте этот режим, если ожидаемое время работы датчика короткое или требуется короткий интервал между нажатиями. Обратите внимание, что датчик будет удерживаться в рабочем слоте и слоте пула в течение всего времени работы датчика в этом режиме. Если задано значениеreschedule
, задача датчика освобождает рабочий слот, когда критерии еще не выполнены, и она переносится на более позднее время. Используйте этот режим, если ожидается, что время до выполнения критериев будет довольно продолжительным. poke_interval должен составлять более одной минуты, чтобы предотвратить слишком большую нагрузку на планировщик.
Отложенные операторы (Deferrable Operators)
До версии Airflow 2.2 все выполнение задач выполнялось в пределах ваших рабочих ресурсов. Для задач, работа которых происходила вне Airflow (например, задание Spark), ваши задачи простаивали бы в ожидании сигнала об успехе или сбое. Эти незанятые задачи будут занимать рабочие слоты на все время их выполнения, потенциально ставя в очередь другие задачи и задерживая время их запуска.
С выпуском Airflow 2.2 в Airflow представлен новый способ выполнения задач в вашей среде: отложенные операторы. Эти операторы используют библиотеку asyncio Python для эффективного выполнения задач, ожидающих внешнего ресурса для завершения. Это освобождает ваших работников, позволяя вам более эффективно использовать эти ресурсы. В этом руководстве мы рассмотрим концепции отложенных операторов, а также новые компоненты, введенные в Airflow, связанные с этой функцией.
Концепция отложенных операторов
Существуют некоторые термины и понятия, которые важно понимать при обсуждении отложенных операторов:
asyncio: Эта библиотека Python используется в качестве основы для нескольких асинхронных фреймворков. Библиотека является основой функциональности отложенных операторов и используется при написании триггеров.
Триггеры: Это небольшие асинхронные фрагменты кода Python. Из-за их асинхронной природы они эффективно сосуществуют в одном процессе, известном как инициатор (triggerer).
Triggerer: Это новая служба airflow (как планировщик или рабочая функция), которая запускает цикл событий asyncio (asyncio event loop) в вашей среде Airflow. Запуск триггера необходим для использования отложенных операторов.
Deferred: (Отложено) Это новое состояние задачи воздушного потока (средненасыщенный фиолетовый цвет), введенное для указания того, что задача приостановила свое выполнение, освободила рабочий слот и отправила триггер, который должен быть принят triggerer process.time. Все датчики наследуются от BaseSensorOperator и имеют 4 параметра, которые вы можете установить на любом датчике.
Примечание: Термины “отложенный” и “async” или “асинхронный” часто используются как взаимозаменяемые. В данном контексте они означают одно и то же.
С помощью традиционных операторов задача может отправить задание во внешнюю систему (например, в кластер Spark), а затем опрашивать статус этого задания до его завершения. Даже при том, что задача, возможно, не выполняет значительной работы, она все равно будет занимать рабочее место во время процесса опроса. По мере того как рабочие слоты заполняются, задачи могут ставиться в очередь, что приводит к задержке времени запуска. Схематически это представлено ниже:
С помощью операторов с возможностью отсрочки рабочие слоты могут быть освобождены во время опроса статуса задания. Когда задача откладывается (приостанавливается, suspended), процесс опроса выгружается в качестве триггера (trigger) для инициатора (triggerer), освобождая рабочее место. Инициатор может выполнять множество асинхронных задач опроса одновременно, не позволяя этой работе занимать ваши рабочие ресурсы. Когда получен статус терминала для задания, задача возобновляется, занимая рабочий слот во время ее завершения. Схематически это представлено ниже:
Когда и почему следует использовать отложенные операторы
В общем случае, отложенные операторы следует использовать всякий раз, когда у вас есть задачи, которые занимают рабочий слот во время опроса условия во внешней системе.
Например, использование отложенных операторов для задач датчиков (таких, как поиск файла на сервере SFTP) может привести к повышению эффективности и снижению эксплуатационных затрат. В частности, если вы в настоящее время работаете с умными датчиками, вам следует рассмотреть возможность использования вместо них отложенных операторов. По сравнению с умными датчиками, которые устарели в версии 2.2.4 Airflow, отложенные операторы более гибкие и лучше поддерживаются Airflow.
В настоящее время в Airflow доступны следующие отложенные операторы:
Однако этот список будет быстро расти по мере того, как сообщество Airflow будет вкладывать больше усилий в разработку этих операторов. Тем временем вы также можете создать свой собственный (подробнее об этом в последнем разделе этого руководства). Кроме того, Astronomer поддерживает некоторые отложенные операторы, доступные только в Astro Runtime.
Использование операторов с отсрочкой имеет множество преимуществ. Из них наиболее заметные:
Снижение потребления ресурсов: В зависимости от доступных ресурсов и рабочей нагрузки ваших триггеров вы можете запускать от сотен до тысяч отложенных задач в одном процессе-инициаторе. Это может привести к сокращению числа рабочих модулей, необходимых для выполнения задач в периоды, когда есть много параллельных задач. Соответственно, вы можете сократить базовую инфраструктуру вашей среды Airflow.
Устойчивость к перезапускам: Триггеры по своей конструкции не имеют состояния. Это означает, что ваши отложенные задачи не будут переведены в состояние сбоя, если инициатору (triggerer) потребуется перезапуск из-за проблемы с развертыванием или инфраструктурой. Как только triggerer будет восстановлен и запущен в вашей среде, ваши отложенные задачи возобновятся.
Возможность реализации DAG на основе событий: наличие
asyncio
в ядре Airflow является потенциальной основой для создания DAG, инициируемых событиями.
Пример рабочего процесса с использованием отложенных операторов
Допустим, у нас есть DAG, который должен запускать датчик каждую минуту, где каждая задача может занимать до 20 минут. Используя настройки по умолчанию с 1 рабочим модулем, мы видим, что через 20 минут у нас выполняется 16 задач, каждая из которых занимает рабочее место:
Поскольку рабочие слоты удерживаются во время выполнения задачи, нам потребуется не менее 20 рабочих слотов, доступных для этого DAG, чтобы гарантировать, что будущие запуски не будут отложены. Чтобы увеличить число параллельно выполняемых задач, нам нужно было бы добавить дополнительные ресурсы в нашу инфраструктуру Airflow (например, другой рабочий модуль).
Используя отложенного оператора для этого датчика, мы можем добиться полного параллелизма, позволяя нашему процессу выполнять дополнительную работу в нашей среде Airflow. С помощью нашего обновленного DAG ниже мы видим, что все 20 задач перешли в состояние отложенных, что указывает на то, что эти задания на основе датчиков (триггеры) были зарегистрированы для запуска в triggerer-процессе.
from datetime import datetime
from airflow import DAG
from airflow.sensors.date_time import DateTimeSensor
with DAG("sync_dag",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 19),
schedule_interval="* * * * *",
catchup=True,
max_active_runs=32,
max_active_tasks=32
) as dag:
sync_sensor= DateTimeSensor(
task_id="sync_task",
target_time="""{{ macros.datetime.utcnow() + macros.time - delta(minutes=20) }}""",
)
from datetime import datetime
from airflow import DAG
from airflow.sensors.date_time import DateTimeSensorAsync
with DAG("async_dag",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 19),
schedule_interval="* * * * *",
catchup=True,
max_active_runs=32,
max_active_tasks=32
) as dag:
async_sensor = DateTimeSensorAsync(
task_id="async_task",
target_time="""{{ macros.datetime.utcnow() + macros.time -delta(minutes=20) }}""",
)
Выполнение отложенных задач в среде вашего Airflow
Чтобы запустить запущенный процесс, запустите airflow triggerer
в среде Airflow. Вы должны увидеть результат, аналогичный приведенному ниже изображению.
Обратите внимание, что если вы используете Airflow на Astro, триггер запускается автоматически, если вы используете Astro Runtime 4.0+. Если вы используете Astronomer Software 0.26+, вы можете добавить триггер к развертыванию Airflow 2.2+ во вкладке Deployment Settings. В этом руководстве подробно описаны шаги по настройке этой функции на платформе.
Когда задачи переводятся в отложенное состояние, триггеры регистрируются в triggerer. Вы можете задать количество одновременных триггеров, которые могут выполняться в одном процессе-инициаторе (triggerer process), с помощью параметра конфигурации default_capacity
в Airflow. Это также можно установить с помощью переменной среды AIRFLOW__TRIGGERER__DEFAULT_CAPACITY
. По умолчанию значение этой переменной равно 1000
.
Высокая доступность
Обратите внимание, что триггеры разработаны таким образом, чтобы быть высокодоступными. Вы можете реализовать это, запустив несколько процессов-инициаторов. Подобно HA scheduler, представленному в Airflow 2.0, Airflow гарантирует, что они сосуществуют с правильной блокировкой и HA. Вы можете обратиться к документации Airflow для получения дополнительной информации по этому вопросу.
Создание собственного Отложенного оператора
Если у вас есть оператор, который выиграл бы от асинхронности, но еще не существует в OSS Airflow или Astro Runtime, вы можете создать свой собственный. В документации Airflow есть отличные инструкции, которые помогут вам начать работу.
Следующая часть - Проектирование DAG