Dust-n-Rust by Spiritofdarkness

Команда разработки Cloud Big Data от VK Cloud Solution перевела статью с советами, которые касаются общих понятий работы с пайплайнами. Неважно, какую систему управления рабочими процессами вы используете, эти идеи можно применять везде. Сам автор пользуется Apache Airflow и приводит примеры кода на ее основе.

Эта статья будет полезна не только дата-инженерам, но и дата-сайентистам, так как хороший дата-сайентист тоже понимает принципы работы пайплайнов данных.

Подберите подходящие триггеры для разных заданий


В традиционной работе с пайплайнами есть базовые задания ETL/ELT — для их запуска всегда нужны триггеры, которые бывают двух типов: на основе расписания или алгоритма.

Триггер на основе расписания. Планировщики есть в любой системе управления рабочими процессами. С их помощью можно запускать задания регулярно: раз в минуту, в час, в день или в неделю. Такие триггеры полезны для обязательных заданий — тех, которые нужно запустить независимо от условий. Обычно это задания, которые не сбоят ни при каких обстоятельствах.

Триггеры на основе алгоритмов. Обычно такие триггеры зависят от других задач и запускают задание при определенных условиях. Например:

  • файл импортирован в папку;
  • строка добавлена в таблицу;
  • завершено несколько зависимых процессов.

Триггеры на основе алгоритмов полезны для заданий, выполнение которых зависит от соблюдения конкретных условий.

Тип триггера выбирают в зависимости от характера задания. Спросите себя:

  • Нужно ли выполнять задание каждый день?
  • Тратится ли на задание больше ресурсов, чем необходимо?
  • Как часто нужно выполнять задание?
  • Зависит ли успешное выполнение задания от нескольких условий?

При этом помните: мы живем не в идеальном мире, где все задания выполняются безупречно. Используйте возникающие ошибки и сбои для обучения и улучшения ваших пайплайнов.

Выберите подходящую систему оповещений


Пайплайны ломаются, но это еще не конец света. В момент поломки нам нужно получить уведомление об ошибке, и базовых оповещений для этого недостаточно. Понадобятся еще настраиваемые оповещения.

Электронная почта — базовый минимум. Письмо приходит на почту всем членам команды и информирует об ошибке при выполнении задания. 
Это не самое эффективное решение, поскольку письмо легко пропустить — не все из нас регулярно проверяют почту.

Вот так выглядит базовая форма оповещения по электронной почте в Airflow:

from airflow.utils.email import send_email

email_to = 'receivers@email.com'
email_subject = '(FAILED) ' + jobname + date
email_content = 'Your job has failed.'
send_email(email_to, email_subject, email_content)

Slack. Если вы работаете в технологической компании, то наверняка используете для общения Slack. Можно создать отдельный канал для оповещений, в который ваши пайплайны будут отправлять сообщения об ошибках. Ваша команда сразу узнает, что что-то пошло не так.

Вот так выглядит простая форма оповещения в Slack на базе его API.

from slack import WebClient
client = WebClient(token = 'your token here')
response = client.chat_postMessage(
                channel = slack_cannel,
                text = message
                )

Настраивайте логирование


В программировании «логировать» значит подробно регистрировать все, что вы делаете. В работе с пайплайнами есть два типа логов: задач и заданий.

Журналы задач. В них хранится вся важная информация, которая появляется в ходе работы над задачей. Конечно, переносить каждую строчку выполненного кода не хочется, но подробные записи очень важны. Вот пример журнала:

Import PostgresHook

#Extraction Job
def ExtractFromSource(query):
    query_to_run = query
    logging.info("Query : %" query_to_run)
    
    cursor = PostgresHook(connection).get_conn().cursor()
    logging.info("Connecting to Postgres Connection %" connection)
cursor.execute(query_to_run)
    result = cursor.fetchall()

Логи выполняют роль контрольной точки для отладки. В случае сбоя в них можно найти источник ошибки.

Например, в приведенном выше задании по извлечению данных мы логируем запрос и подключение. Лог выполняет две функции:

  • сообщает, что задача выполняется нормально вплоть до каждой точки;
  • показывает передаваемую переменную запроса и соединения.

Если что-то пойдет не так, мы сможем точно определить, на каком этапе произошла ошибка. Плюс можно проверить, нет ли ошибки в переменной запроса или подключения.

Логи заданий — записи запущенных заданий. В них регистрируют время начала и завершения задания и его статус — это набор-минимум. Даже если  в вашей системе управления рабочими процессами не настроено логирование, эта информация наверняка где-то у вас хранится, например в хранилище данных.

Вот пример лога заданий:



Чтобы журнал был полезнее, создайте таблицу аудита в хранилище данных. Добавляйте в эту таблицу запись на каждое задание с параметрами, которые мы привели выше в примере. Дата-сайентистам и аналитикам будет нетрудно пройтись по этой таблице и проверить, какое задание выполнили последним и было ли оно успешным.

Настройте проверку дубликатов


Если вы извлекаете или преобразуете данные, то наверняка сталкивались с дублями. У их появления несколько причин, в том числе:

  • ошибки в запросе, например в join-операции;
  • задание было запущено дважды;
  • некорректные данные.

Чтобы исправить ситуацию, в пайплайн нужно добавить этап «проверки на дубли». Для этого можно проверить количество записей по первичным ключам.

Select max(count) from (
Select primary_key, count(*) from table group by primary_key)

Если значение больше «1», значит, есть дубликаты.
Если у ваших данных нет первичных ключей, их можно присвоить: 

alter table TABLENAME add primary key (ID,NAME,PMID) 

Этих четырех хитростей достаточно, чтобы усовершенствовать ваши пайплайны по обработке данных. А закончить позвольте цитатой Даниэля Тункеланга: «Наша работа как дата-сайентистов заключается в том, чтобы извлечь сигнал из шума». 

Команда VK Cloud Solutions развивает экосистему для построения Big-Data-решений и Machine Learning. Будет здорово, если вы протестируете наши решения и дадите обратную связь. Для тестирования пользователям при регистрации начисляем 3000 бонусных рублей.

Что почитать по теме:

  1. Как и зачем разворачивать приложение на Apache Spark в Kubernetes.
  2. Форматы файлов в больших данных: краткий ликбез.

Комментарии (2)


  1. StasTukalo
    20.04.2022 11:26
    +3

    Статья ради статьи.. ((


  1. alexey_girin
    20.04.2022 18:21

    Не освещены моменты обработки озёр данных для получения доносов для товарища майора ради отправки граждан в тюрячку. Очень широкая и интересная тема.
    Сколько доносов получается с гигабайта юзер данных? Какой коофициент конверсии доносов в посадки? Какой стек используется? Аудируются ли доносы перед посылкой или люди совсем не вовлекаются
    Так много вопросов и так мало ответов.