Привет! Меня зовут Василий, я работаю в роли Data Engineer в подразделении Big Data Solutions компании Neoflex. Так сложилось, что уже на двух проектах за прошедший год мне довелось заниматься разработкой систем, помогающих контролировать качество данных в Data Lakes. В ходе работы над проектами было изучено и реализовано довольно много идей, поэтому хотелось бы поделиться этим опытом с вами. Буду признателен, если в комментариях вы выскажите свои замечания или предложите более удачные варианты реализации.

Прежде чем приступить к основной части моего повествования, кратко обозначу, что же стоит за понятием «качество данных» (Data Quality, DQ) в контексте данной статьи.

«Качество данных» можно рассматривать как одну из функции управления данными (Data Governance). Эта тема довольно хорошо раскрыта в своде знаний DMBoK Международной ассоциации управления данными (DAMA). Данное руководство стало для меня основным источником получения теоретических знаний по этой теме. С конспектом DMBoK на русском языке вы можете ознакомиться подробнее здесь.

Рис. 1. Качество данных как одна из функций управления данными согласно DAMA DMBoK
Рис. 1. Качество данных как одна из функций управления данными согласно DAMA DMBoK

По поводу важности обеспечения высокого качества данных хотел бы отметить лишь несколько основных, на мой взгляд, моментов:

  • необходимо тесно работать с конечными потребителями данных, чтобы определить их потребности и совместно установить ключевые характеристики качества данных;

  • если вы не смогли обеспечить высокое качество данных, то все усилия на сбор, хранение и защиту данных были потрачены напрасно;

  • нужно всегда помнить, что некачественные данные могут привести к неверным решениям и большим потерям;

  • необходимо контролировать качество данных на всех этапах их жизненного цикла. На начальных этапах загрузки – простые технические проверки, на последующих – более сложные проверки логической целостности и соблюдения бизнес-логики;

  • без должного уровня контроля качества данных можно очень быстро превратить самое прекрасное «озеро» данных в никому не нужное «болото».

На проектах в компании Neoflex мы строили Data Lakes на Hadoop, загружали и обрабатывали данные из различных источников. Также одной из наших задач было обеспечение требуемого уровня качества данных в хранилище на постоянной основе. Для этого мы разрабатывали необходимые инструменты автоматизации с использованием средств, имеющихся в распоряжении у заказчика.

На первом проекте мы не рассматривали готовые системы для контроля DQ, т. к. задача была – разработать инструмент контроля с использованием Spark, Oozie, Hive. На втором проекте был сделан выбор в пользу системы, разработанной собственными силами, уже осознанно. Основным доводом за разработку стало то, что рассматриваемые инструменты, в частности, Great Expectations, предполагали работу с конфигурационными файлами для настройки новых проверок. Если учесть, что в хранилище будут сотни таблиц с сотнями полей в каждой, то управлять всеми проверками с помощью конфигурационных файлов нам показалось не самым удобным. Необходимо было сделать настройку, запуск и остановку проверок максимально быстрой и простой. Также для нас было важно обеспечить сбор детальных результатов по всем проверкам в одном месте для расчета общих метрик и вывода их на dashboard.

К началу разработки системы DQ у нас уже было довольно много работающих ETL-потоков под управлением Oozie. Качество загруженных в хранилище данных проверялось в отдельных потоках, которые никак друг от друга не зависели и запускались каждый по своему расписанию. Поэтому мы выбрали подход, когда задачи по проверке DQ управляются централизованно и запускаются группами с частотой один раз в день, отдельно от самих потоков обработки данных. Основная часть проверок выполнялась в часы наименьшей загрузки кластера Hadoop.

Для управления всеми задачами по контролю DQ был разработан на Python «управляющий поток»: скрипт, который запускался каждый час по расписанию в Oozie. При запуске с его
помощью выполнялись следующие контроли:

  • какие проверки уже отработали за текущий день и с какими результатами они завершились;

  • работают ли какие-то проверки на данный момент и как долго, чтобы завершить те из них, которые не уложились в отведенное время или, вдруг, зависли;

  • не было ли проверок, которые запустились, но завершились с ошибками из-за неправильных настроек или сбоев, чтобы перезапустить их еще раз;

  • какие проверки необходимо запустить на текущий час в соответствии с настроенными параметрами запуска для каждой группы.

Рис. 2. Система DQ на основе «управляющего потока»
Рис. 2. Система DQ на основе «управляющего потока»

Таким образом, «управляющий поток» отвечал за то, какие проверки нужно было запустить или перезапустить. Для каждой из них мы генерировали workflow в Oozie по заранее определенному шаблону и запускали их параллельно. Чтобы ограничить число одновременно работающих workflow, мы разделили их на группы. Для каждой группы определялись:

  • заинтересованные лица в получении отчетов с результатами проверок;

  • время запуска проверок в группе;

  • число одновременно работающих проверок для данной группы.

Если вам требуется управлять данными более «проактивно», т. е. выявлять проблемы с данными до того, как они могут повлиять на конечных бизнес-пользователей, для этого можно выбрать другой подход к контролю DQ – встраивать проверки DQ в ETL-потоки обработки данных. При таком подходе отпадает потребность в отдельном «управляющем потоке». На замену ему можно разработать поток, который будет забирать результаты проверок из общей таблицы и отправлять их всем заинтересованным лицам на регулярной основе.

 В случаях, когда встроить проверки в основной поток нет возможности, например, когда идет
потоковая загрузка данных из Kafka, тесты также можно запускать в отдельных
потоках по своему собственному расписанию.

Рис. 3. Система DQ, как часть ETL-потоков
Рис. 3. Система DQ, как часть ETL-потоков

Предлагаю теперь подробнее рассмотреть основные функции cистемы контроля качества данных:

  • хранение описания логики по всем доступным проверкам;

  • хранение описания логики по всем доступным метрикам для выполнения профилирования данных;

  • назначение задач по профилированию и выполнению проверок для требуемых датасетов (наборов данных или таблиц) в Data Lake;

  • хранение информации о результатах всех выполненных проверок;

  • массовый импорт списка запланированных проверок и их параметров из csv-файла;

  • профилирование данных для использования в проверках на выявление аномалий;

  • выполнение всех запланированных проверок для всех необходимых датасетов по расписанию или сразу после загрузки данных в хранилище;

  • рассылка отчетов о выполненных проверках с результатами заинтересованным лицам.

Для реализации перечисленных выше функций вам потребуются:

  • СУБД (любая) – для хранения метаданных о проверках;

  • Python – для написания логики и запуска проверок;

  • PySpark (опционально) – для проверок, проводимых на большом объеме данных. Если данных, требующих проверку, немного (до 10 млн записей), то можно, например, обойтись только средствами пакета Pandas;

  • оркестратор (любой, например, Airflow, Oozie) – для запуска проверок по расписанию, если проверки будут запускаться отдельно от ETL-потоков;

  • BI-система (опционально) – для построения dashboard-а по всем датасетам в хранилище с результатами всех проверок и метриками.

Теперь углубимся в детали реализации.

Мне показалось логичным разделить систему контроля качества данных на пять модулей:

  • all_metrics.py – модуль, содержащий логику всех метрик для профилирования данных;

  • all_tests.py – модуль, содержащий логику всех проверок качества данных;

  • run_profiling.py – модуль, предназначенный для запуска задач по профилированию данных;

  • run_tests.py – модуль, предназначенный для запуска задач по проверке данных;

  • utils.py – модуль, содержащий вспомогательные функции по работе с данными.

Скрипты модулей запуска задач по профилированию и проверкам вызываются из ETL-потоков через spark-submit. Они одинаковые для всех датасетов, поэтому их легко можно встроить там, где требуется провести профилирование и последующие проверки DQ. Даже если проверки пока еще не назначены для данного датасета, то задачи будут просто пропущены. При этом на работе основного потока это никак не скажется. Как только ответственные за DQ добавят проверки в настроечную таблицу dq_assertions, они начнут запускаться при каждом вызове run_tests.py.  

Такой подход позволит вам предусмотреть запуск проверок по мере их готовности, не будет задерживать запуск потоков и не потребует менять код основного ETL-потока в будущем.

В качестве единственного параметра для скрипта запуска проверок необходимо указать название проверяемого датасета, который создается в результате выполнения задач потока. Для проверок на выявление аномалий в загрузках данных предварительно требуется провести профилирование данных. Для этого нужно запланировать сбор необходимых метрик с помощью настроечной таблице dq_profiling.

Ниже приведена выдержка из кода для генерации задач по проверке данных внутри DAG-а Airflow для нескольких таблиц, где:

  • VENV_PATH – путь к виртуальному окружению Python;

  • LIBS_PATH – путь к папке, где лежат дополнительные jar-файлы, расширяющие возможности Spark, в частности, postgresql-42.3.6.jar –для подключения к СУБД PostgreSQL с помощью Spark;

  • DQ_RUN_TESTS_SCRIPT – название модуля со скриптом запуска проверок;

  • table – название проверяемой таблицы (в моем случае – полное имя совпадает с путем в HDFS);

  • {{ logical_date }} – дата запуска задачи в Airflow, передаваемая в качестве параметра.

test_tasks = []
for table in all_table_full_names:
    test_task = BashOperator(
        task_id=f"run_dq_tests_for_{table.replace('.', '_')}",
        bash_command=f"/opt/dev/spark/spark-3.2.1/bin/spark-submit \
        --driver-memory 2g \
        --executor-memory 2g \
        --executor-cores 2 \
        --conf spark.dynamicAllocation.maxExecutors=4 \
        --conf spark.dynamicAllocation.initialExecutors=2 \
        --conf spark.sql.shuffle.partitions=8 \
        --conf spark.sql.sources.partitionOverwriteMode=dynamic \
        --conf spark.pyspark.virtualenv.enabled=true \
        --conf spark.pyspark.virtualenv.type=native \
        --jars {LIBS_PATH}/postgresql-42.3.6.jar \
        --conf spark.pyspark.python={VENV_PATH}/bin/python3.9 \
        --conf spark.pyspark.virtualenv.bin.path={VENV_PATH}/bin/ \
        {SCRIPTS_PATH}/{RUN_TESTS_SCRIPT} {table}" + " {{ logical_date }}",
        dag=dag,
        trigger_rule=TriggerRule.ALL_DONE
    )
    test_tasks.append(test_task)

При каждом вызове скрипта запуска делается запрос к базе с метаинформацией для получения списка назначенных проверок для указанного датасета (таблицы) и всех необходимых параметров для их выполнения. Поиск в хранилище метаинформации производится по имени датасета. Все найденные проверки для запуска будут выполняться последовательно. Можно при необходимости сделать параллельный запуск, например, формируя отдельные задачи на каждую проверку в Airflow. Поскольку проверки реализованы на PySpark и используется кеширование данных проверяемого датасета, всё работает достаточно быстро.

По окончании работы всех проверок – краткая сводка результатов. Если все критичные проверки отработали успешно, то ETL-поток продолжит обработку данных, а при выявлении проблем обработка данных – останавливается. Ответственные получают уведомления, отправляемые штатными средствами Airflow. В конце скрипта запуска вызываются функции для сохранения результатов в хранилище в таблицу Hive.  

Модуль all_tests.py является основным. Это репозиторий всех проверок, где реализована логика их выполнения. По аналогии с unit-тестами все проверки являются методами класса DQTest. Все тесты построены по одному принципу: подсчет количества записей, которые не удовлетворяют каким-либо требованиям, например, по типу данных, содержанию, диапазону значений, количеству и т. п. Если записи, не удовлетворяющие требованиям, удалось найти, значит проверка не пройдена. В этом случае инициируется процедура устранения найденных дефектов совместно с аналитиками и разработчиками ETL-потока.  

По большому счету, всё, что нужно сделать для начала работы – реализовать всего один метод запуска проверок с помощью произвольных SQL-запросов, которые будут запускаться через Spark SQL API на временных представлениях, созданных на основе Spark DataFrame. Таким образом, вы дадите вашим ответственным за DQ очень гибкий механизм и сможете быстро запустить основные контроли качества данных, в том числе благодаря тому, что многие хорошо знакомы с языком SQL. 

Ниже пример кода метода для запуска ключевой проверки для PySpark и Spark SQL API с использованием заранее подготовленного SQL-запроса:

def check_for_custom_query(self):
	"""Check for table with custom SQL-query"""
	sql_query_prepared = self.sql_query\
		.replace('{check_date}', self.check_date)\
		.replace('{table_name}', self.table_name) \
		.replace('{column_name}', self.column_name)
	print('\n', sql_query_prepared, '\n')
	result_df, self.count_rows = \
		get_data_from_parquet(spark_session=self.spark_session,
				       hdfs_location=self.hdfs_location,
				       sql_query=sql_query_prepared,
				       table_name=self.table_name,
				       load_date=self.filter_datekey)
	try:
		self.count_rows_with_errors = result_df.collect()[0][0]
		self.status = False if self.count_rows_with_errors else True
	except IndexError:
		self.count_rows_with_errors = 0
		self.status = True
	except AttributeError:
		self.count_rows_with_errors = None

SQL-запросы для данной проверки нужно писать и проверять заранее, например, в Jupyter Hub и заносить в конфигурационную таблицу dq_assertions как параметр для последующего регулярного запуска. 

По мере роста уровня зрелости в области управления DQ вы можете заниматься реализацией своего набора «стандартных» проверок, чтобы не писать SQL-запросы для каждой из них, а лишь задавать необходимые параметры:

  • название проверяемого датасета (которое одновременно является путем в HDFS);

  • список проверяемых колонок;

  • специфические параметры для проверки (минимальное значение, время задержки при загрузке данных, шаблон регулярного выражения и т.п.).

Ниже пример метода для запуска одной из стандартных проверок с помощью регулярного выражения, сохраненного в dq_assertions в виде одного из параметров:

def check_for_column_values_to_match_regexp_pattern(self):
        """Check for column values to match regular expression pattern"""
        where_condition = '' if not self.where_condition else '\nAND ' + self.where_condition
        # WARNING: do not use f-string with regular expressions (self.regexp)
        sql_query = f"""
            SELECT COUNT(*)        
            FROM {self.table_name} 
            WHERE {self.column_name} NOT RLIKE""" + ' "' + rf"{self.regexp}" + '" ' + where_condition
        print(sql_query)
        result_df, self.count_rows = get_data_from_parquet(spark_session=self.spark_session,
                                                           hdfs_location=self.hdfs_location,
                                                           sql_query=sql_query,
                                                           table_name=self.table_name,
                                                           load_date=self.filter_datekey)
        self.sql_query = sql_query
        collected_df = result_df.collect() if result_df else []
        if collected_df:
            self.count_rows_with_errors = collected_df[0][0]
            self.status = False if self.count_rows_with_errors else True

Из интересных находок при реализации логики проверок можно отметить следующие:

  • для всех проверок можно добавить параметр «where condition», с помощью которого сразу фильтровать лишние записи на этапе загрузки данных. Это могут быть, например, записи, помеченные на удаление, и проверять их не нужно;

  • если нужно проверять каждый раз не все данные в датасете, а лишь те, что были добавлены на дату проверки, то можно добавить признак «check_all_rows». Если он будет равен False, то при загрузке данных мы берем только партицию с датой загрузки, которая равна дате проверки. Можно также использовать для целей фильтрации условие where с переменной «дата проверки», подставлять его в шаблон запроса;

  • поскольку даты в разных источниках могут храниться в разных форматах, можно добавить специальное поле «date_formatter». В нем задавать выражение для приведения даты к стандартному виду «YYYY-MM-DD». Это выражение также подставляется в шаблон SQL-запроса для последующего сравнения с заданным минимальным или максимальным значением даты. Например, в этом поле может содержаться следующее выражение: «SUBSTR(STRING({column_name}), 1, 10)»;

  • если для проверки указать сразу массив проверяемых полей, а не одно поле, то подключение новых проверок значительно ускоряется. При запуске самих проверок мы итерируемся по списку полей и запускаем тест для каждого поля по отдельности.

Ниже выдержка из кода, где показано, как запускаются проверки с параметрами, полученными предварительно из таблицы dq_assertions в список tests, состоящий из словарей:

for test in tests:        
        dq_test = DQTest(**test)  # передаем параметры в экземпляр теста
        dq_test.log_time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
        dq_test.check_date = str((dt.strptime(EXECUTION_DATE, '%Y-%m-%d') -
                                  timedelta(days=int(test.get('delay_days')))).date())  # дата проверки смещена относительно даты запуска назад
        dq_test.load_date = LOAD_DATE  # это просто дата загрузки результатов проверок в хранилище
        dq_test.filter_date_load = None if dq_test.check_all_rows else dq_test.check_date.replace('-', '')  # проверяем все данные или только инкремент
        dq_test.metrics_table = METRICS_HIVE_TABLE  # имя таблицы, где будут собираться результаты профилирования данных
        dq_test.metrics_hdfs_path = METRICS_HDFS_PATH
        dq_test.schema_name = TABLE_FULL_NAME.split('.')[2]  # выделяем из имени таблицы схему, где она расположена
        dq_test.spark_session = spark
        if not dq_test.test_columns or dq_test.test_columns == 'None':
            dq_test.test_columns = [None]
        for column in dq_test.test_columns:
            try:
                dq_test.status = None
                dq_test.count_rows_with_errors = None
                dq_test.percent_errors = None
                dq_test.clear_query = None
                dq_test.column_name = column
                run_data_quality_test(dq_test)  # запускаем тест по каждой колонке по отдельности

Для начала можно реализовать в виде отдельных методов самые базовые технические проверки на предмет:

  • отсутствия дублирующихся записей;

  • отсутствия пустых значений в полях, которые являются первичными или внешними ключами;

  • отсутствия значений, попадающих за пределы указанного диапазона или превышающие заданные пороговые значения;

  • отсутствия значений, которые не содержатся в некотором заранее определенном множестве (справочнике);

  • наличия записей на указанную дату, если есть отставание в загрузке данных в хранилище от источника;

  • совпадения количества загруженных записей в хранилище с количеством записей в источнике за период с момента предыдущей загрузки данных.

Далее, чтобы повысить уровень контроля над качеством данных и свою уверенность, что загрузка данных в хранилище происходит без сбоев, а также чтобы иметь возможность оперативно выявлять различные аномалии в данных и их количестве, можно начать собирать статистику по каждой загрузке и делать «профилирование» данных с помощью функционала в модулях all_metrics.py и run_profiling.py. 

Удобнее всего, на мой взгляд, организовать сбор всех необходимых метрик в одну общую таблицу. Расчет метрик можно производить сразу после завершения загрузки данных в хранилище или в отдельных специальных DAG-ах. Например, если данные забираются из источника по API один или несколько раз в день, то сначала мы получаем данные, затем делаем на них расчет метрик и сохраняем результаты в общую для всех таблицу с полями:

Название таблицы (путь в hdfs)

Название поля

Метрика

Значение метрики

Дата расчета

table_1

Количество записей

100500

2022-09-14

table_1

price

Минимальное значение

100

2022-09-14

table_1

price

Среднее значение

1000

2022-09-14

table_1

price

Сумма по колонке

10005000

2022-09-14

table_1

id

Количество пустых записей

0

2022-09-14

Имея на руках такую статистику по предыдущим загрузкам данных, можно выявлять различные аномалии (см. код проверок, в названии которых есть слово «anomalies» в all_tests.py):

  • отклонения в количестве загруженных или измененных записей по сравнению с предыдущими днями;

  • отклонения по количеству пустых значений;

  • отклонения по минимальным, максимальным, средним значениям в числовых полях или полях с датами.

Если пойти еще дальше, то можно аналогичное профилирование делать и на стороне СУБД некоторых источников. Затем эти расчеты также загружать в хранилище и сравнивать с расчетами, которые сделаны на ранее полученных данных. Это может добавить уверенности, что в процессе обработки данных на пути от источника до хранилища мы не внесли в них каких-либо искажений. Модуль utils.py содержит все вспомогательные функции, которые используются в первых двух модулях, например, получение метаданных о проверках, сохранение результатов, чтения данных из parquet-файлов в HDFS и др.

Как вы можете сами заметить, в разработке собственной системы контроля DQ нет ничего сложного. Можно развивать ее поэтапно по мере роста уровня зрелости процесса контроля качеством данных.

В заключении хотелось бы отметить, что контроль качества данных лучше всего
организовать как процесс, т. е. деятельность, которая должна быть хорошо
организована, выполняться регулярно и четко. На основе накопленных данных о
результатах проверок можно рассчитывать различные метрики по «измерениям»
качества данных, о которых подробнее написано в конспекте DMBoK. Результаты измерений качества данных можно визуализировать с помощью dashboard-ов на радость менеджерам разного уровня.

Пожалуй, это пока все. Надеюсь, данный материал помог вам почерпнуть полезное для использования на своих проектах. Пусть вас всегда сопровождают только качественные данные, а если это не так – теперь вы знаете немного больше о том, что с этим можно сделать.

Комментарии (1)


  1. boopiz
    21.11.2022 15:48

    NOT и LIKE в одном запросе. больше костов богу костов!!