В первой части статьи мы подключили DataHub к базе данных Oracle, во второй части рассмотрим подключение Great Expectations к DBMS Oracle, сделаем ряд проверок качества данных, а также отправим результаты проверок в DataHub.

Глоссарий

  • Профиль данных (профилирование) – исследование данных для выяснения статистических характеристик данных. Профилирование выполняют перед разработкой процедур ETL;

  • Метаданные – описание структуры данных. Какие типы объектов хранятся в базе данных, описание зависимостей между объектами и т.д.;

  • Jdbc – стандарт взаимодействия Java-приложений с различными СУБД. JDBC основан на концепции так называемых драйверов, позволяющих получать соединение с базой данных по специально описанному URL;

  • DQ (DataQuality) – контроль качества данных;

  • GE – Great Expectations;

  • DataSource – инструкция по подключению к БД в GE;

  • Asset – объект проведения проверок качества данных внутри БД. Является частью DataSource;

  • Expectation Suite – список проверок, которые будут проведены по отношению к Asset;

  • Checkpoint – список действий, которые будет делать GE.

2.1 Что такое Great Expectations

Как отмечалось в первой части статьи, в самом DataHub нет никакого интерфейса для проведения DQ проверок данных. Однако существует возможность интеграции DataHub с DQ инструментом GE. Означает это то, что GE отправляет результаты своих проверок в DataHub, в котором мы можем наблюдать, какие проверки данные прошли, а какие – нет. Это позволяет расширить и без того широкий спектр возможностей: мы можем в одном интерфейсе наблюдать не только профиль данных, которые собирает DataHub из «коробки», но и дополнить его проверками на качество данных и дополнительным профилированием данных, которые проводит GE.

Что вообще такое GE? GE - это инструмент для проверки, документирования и профилирования данных для поддержания качества. Поэтому, он является достаточно полезным для поддержания согласованности данных, хранящихся в различных источниках. Преимуществом GE является то, что он имеет возможность подключаться к любому источнику с помощью jdbc. Официально GE поддерживает не так много источников (и Oracle не входит в их число), однако есть возможность подключиться к любому источнику по строке подключения SQLAlchemy. Инструкции по подключению к различным источникам можно получить в документации. Также GE имеет большую библиотеку доступных вариантов проверок качества данных и возможность относительно просто создать свой вариант проверки данных, если это необходимо, что является чрезвычайно удобно и гибко.

2.2 Установка Great Expectations

Как и DataHub, GE является Python-библиотекой. Для установки требуется запустить команду из консоли:

pip install great_expectations

Для интеграции DataHub с GE также требуется установить дополнение DataHub в той же среде, где установлен GE:

pip install 'acryl-datahub[great-expectations]'

Дальнейшая работа с GE производится из консоли или с помощью Python-скриптов.

2.3 Проведение DQ в Great Expectations

В GE существуют три основные сущности, с которыми придется взаимодействовать: это DataSource (asset), Expectation Suite и Checkpoint. Каждая представляет собой отдельное звено в цепочке действий для DQ. Разберем их последовательно:

В отличие от DataHub, мы установили GE на OC Windows. Однако работа с ней будет одинаковая в любой ОС. Для начала работы с GE создадим проект с помощью консольной команды:

great_expectations init

В результате будет создана папка great_expectations, в которой находится наш проект. Во время создания структура будет показана в консоли:

great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
    |-- config_variables.yml
    |-- data_docs
    |-- validations
  • great_expectations.yml –  файл для получения данных из БД (список DataSource с Asset);

  • .gitignore –файл со списком игнорируемых файлов (изначально только папка uncommitted);

  • Uncommitted – папка, в которой хранятся результаты валидации, отчет о проверках в виде html страницы. Также здесь создаются шаблоны формата .jpynb;

  • Plugins – здесь хранятся скрипты .py, которые дополняют и расширяют компоненты или функциональность GE. Эту папку использовать мы не будем, почитать подробнее про нее можно здесь;

  • Expectations –  здесь хранятся инструкции для expectation suite;

  • Сheckpoints – здесь хранятся инструкции для Сheckpoint.

Дальнейшую работу будем выполнять в консоли в корне созданного проекта:

cd great_expectations

2.3.1 Datasource

DataSource – это объект, вокруг которого строится подключение к вашим данным. Он содержит в себе информацию для подключения к базе данных. Внутри DataSource существует такой объект, как Asset – это ссылка на определенную таблицу. В одном Datasource может быть несколько asset, однако для интеграции с DataHub требуется, чтобы в одном DataSource был только один asset.

GE – это Python-библиотека, поэтому, как отмечали ранее, основное взаимодействие с инструментом реализуется через Python-скрипты. Сам GE предлагает взаимодействовать с собой через Jupyter Notebook, создавая шаблоны разрешением .jpynb. Шаблон для создания DataSource можно создать с помощью консольной команды, ответив при этом на заданные в выводе вопросы:

great_expectations datasource new

Нам нужно создать отдельный DataSource с одним Asset для каждой таблицы в исследуемой схеме. Используемый для этого скрипт представлен ниже:

Hidden text
import great_expectations as gx
from great_expectations.core.yaml_handler import YAMLHandler
from sqlalchemy import create_engine
import pandas as pd

yaml = YAMLHandler()
data_context = gx.get_context()

connection_string = "oracle+cx_oracle://HR:HR@localhost:1521/?service_name=XEPDB1"
schema_name = "HR"

engine = create_engine(connection_string)
table_df = pd.read_sql_query("SELECT table_name from all_tables where owner = 'HR'", engine)

for table in table_df['table_name'].iloc:
    datasource_config: dict = {
        "name": f"oracle.hr.{table.lower()}" ,
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "module_name": "great_expectations.execution_engine",
            "connection_string": connection_string,
            "create_temp_table": False,

        },
        "data_connectors": {
            "connector_CASDC": {
                "class_name": "ConfiguredAssetSqlDataConnector",
                "assets": {
                    table.lower():{
                        "table_name": table,
                        "schema_name": "HR",
                    },
                },
            },
        },
    }
    data_context.add_datasource(**datasource_config)

Проверить, что скрипт успешно отработал, можно консольной командой, которая выведет список всех объектов DataSource:

great_expectations datasource list

Примечание: GE для своих собственных нужд создает временные таблицы в исследуемой базе данных. Однако Oracle не поддерживается полностью, поэтому он не может самостоятельно их подчищать. Этого можно избежать, добавив необязательный ключ create_temp_table в словаре execution_engine со значением False: в этом случае GE не будет создавать временные таблицы, и чистить их не потребуется.

2.3.2 Expectation suite (или просто suite)

Такой объект – не что иное, как инструкция у тому – какие проверки нужно делать. Они хранятся в json формате в папке Expectations. Мы берем один Asset и прописываем проверки, которые он проходит. Они могут быть двух типов: на уровне таблицы (например, количество столбцов) или на уровне колонки (например, количество уникальных значений в колонке). Число всех вариантов проверок поражает (с ними можно ознакомиться в Expectations Store). Если же по каким-то причинам там нет проверки, которая вам нужна, можно создать свою. В документации для этого приведена подробная инструкция. Мы же не будем менять стандартные проверки и воспользуемся теми, которые заданы в GE по умолчанию.

Hidden text
import great_expectations as gx
from sqlalchemy import create_engine
from great_expectations.core.batch import BatchRequest
from great_expectations.core.yaml_handler import YAMLHandler
import pandas as pd

yaml = YAMLHandler()
data_context: gx.DataContext = gx.get_context()

connection_string = "oracle+cx_oracle://HR:HR@localhost:1521/?service_name=XEPDB1"

engine = create_engine(connection_string)
table_df = pd.read_sql_query("SELECT table_name from all_tables where owner = 'HR'", engine)

for asset_name in table_df['table_name'].iloc:

    multi_batch: BatchRequest = BatchRequest(
        datasource_name=f"oracle.hr.{asset_name.lower()}",
        data_connector_name="connector_CASDC",
        data_asset_name=f"{asset_name}".lower(),
    )
    expectation_suite_name = f"oracle.hr.{asset_name.lower()}.suite"
    expectation_suite = data_context.add_or_update_expectation_suite(
        expectation_suite_name=expectation_suite_name
    )

    column_df = pd.read_sql_query(f"SELECT column_name FROM USER_TAB_COLUMNS WHERE table_name = '{asset_name}'", engine)

    exclude_column_names = column_df['column_name'].to_list()

    data_assistant_result = data_context.assistants.onboarding.run(
        batch_request=multi_batch,
        exclude_column_names=exclude_column_names,
    )

    expectation_suite = data_assistant_result.get_expectation_suite(
        expectation_suite_name=expectation_suite_name
    )

    data_context.add_or_update_expectation_suite(expectation_suite=expectation_suite)

Здесь мы создали проверки для всех таблиц и для всех колонок в этих таблицах. Каждому Asset соответствует свой Expectation Suite. Вы можете поменять варианты проверок командой консоли, в результате которой появится шаблон блокнота:

great_expectations suite edit NAME_OF_YOUR_SUITE_HERE

Изменяя этот блокнот, можно добавлять или удалять проверки, которые не нужны. Проверить корректность всех созданных Expectation Suite можно командой, которая выведет список их полный список:

great_expectations suite list

2.3.3 Checkpoint

Сheckpoint в GE – это совокупность действий, которые нужно сделать. Основная единица чекпоинта – это action. Например, мы будем использовать action для сохранения данных валидации на диске (StoreValidationResultAction), сохранение параметров валидации (StoreEvaluationParametersAction), обновление отчета (UpdateDataDocsAction) и отправку результатов в работающий экземпляр DataHub (DataHubValidationAction). С возможными action можно ознакомиться в документации.

Hidden text
import great_expectations as gx
from great_expectations.core.yaml_handler import YAMLHandler
from sqlalchemy import create_engine
import pandas as pd
import re

yaml = YAMLHandler()
data_context: gx.DataContext = gx.get_context()

connection_string = "oracle+cx_oracle://HR:HR@localhost:1521/?service_name=XEPDB1"

engine = create_engine(connection_string)
table_df = pd.read_sql_query("SELECT table_name from all_tables where owner = 'HR'", engine)

for asset_name in table_df['table_name'].iloc:
    config = f"""
    name: oracle.hr.{asset_name.lower()}.checkpoint
    config_version: 1
    class_name: Checkpoint
    validations:
      - batch_request:
          datasource_name: oracle.hr.{asset_name.lower()}
          data_connector_name: connector_CASDC
          data_asset_name: {asset_name.lower()}
          data_connector_query:
            index: -1
        expectation_suite_name: oracle.hr.{asset_name.lower()}.suite
        action_list:
          - name: store_validation_result
            action:
              class_name: StoreValidationResultAction
          - name: store_evaluation_params
            action:
              class_name: StoreEvaluationParametersAction
          - name: update_data_docs
            action:
              class_name: UpdateDataDocsAction
          - name: datahub_action
            action:
              module_name: datahub.integrations.great_expectations.action
              class_name: DataHubValidationAction
              server_url: http://127.0.0.1:8080
              env: PROD
              platform_instance_map:
                oracle.hr.{asset_name.lower()}: ''
              graceful_exceptions: false
              convert_urns_to_lowercase: true
              """
    data_context.add_or_update_checkpoint(**yaml.load(config))
    data_context.run_checkpoint(checkpoint_name = f'oracle.hr.{asset_name.lower()}.checkpoint')

Аналогично с Expectation Suite и DataSource в качестве проверки успешного выполнения скрипта можно запустить команду:

great_expectations checkpoint list

После этого запускать скрипты повторно не нужно. Они служат для создания необходимых объектов в GE. Для последующего запуска проверок можно воспользоваться консольной командой:

great_expectations checkpoint run my_checkpoint

Либо же Python-скриптом:

import great_expectations as gx

data_context: gx.DataContext = gx.get_context()
data_context.run_checkpoint(checkpoint_name=my_checkpoint)

После успешного выполнения checkpoint’ов в интерфейсе DataHub активируется вкладка Validations, в котором наблюдаем результаты проверок, которые были проведены в GE:

2.4 Чистка того, что создали

Возможно, вы решили не отключать создание временных таблиц, которое было сделано на этапе создания dataset’ов. Как упоминалось ранее, в этом случае GE будет создавать временные таблицы, которые он не чистит за собой:

Иногда дополнительно приходилось удалять все объекты GE. Через командную строку это делать долго, поэтому был написан ноутбук, который удаляет все объекты в GE, а заодно – и все временные таблицы, которые он не почистил за собой в исследуемой БД. 

Hidden text
import great_expectations as gx
from sqlalchemy import create_engine
import pandas as pd
import re

data_context: gx.DataContext = gx.get_context()

connection_string = "oracle+cx_oracle://HR:HR@localhost:1521/?service_name=XEPDB1"
engine = create_engine(connection_string)
table_df = pd.read_sql_query("SELECT table_name from all_tables where owner = 'HR'", engine)
pattern = re.compile(r'G[EX]_*')

datasource_list = list(data_context.datasources.keys())[:]
for datasource in datasource_list:
    data_context.delete_datasource(datasource)

suite_list = data_context.list_expectation_suite_names()[:]
for suite in suite_list:
    data_context.delete_expectation_suite(suite)

checkpoint_list = data_context.checkpoint_store.list_checkpoints()[:]
for checkpoint in checkpoint_list:
    data_context.delete_checkpoint(checkpoint)

conn = engine.connect()
for asset_name in (i for i in table_df['table_name'].iloc if pattern.match(i)):
    try:
        conn.execute(f'DROP TABLE {asset_name}')
    except:
        pass
conn.close()

Примечание: результаты валидации таким образом удалить нельзя. Это можно сделать, удалив внутренности папки по пути из корня проекта ./uncommited/validations (за исключением файла .ge_store_backend_id) и ./uncommited/data_docs/local_site/validations. Потом пересоберите отчет, чтобы эти изменения отразились, командой консоли:

great_expectations docs builds

2.5 Исправление ошибок

Во время выполнения проверок Сheckpoint мы столкнулись с ошибкой. Большинство проверок, которые проходили в таблицах, которые имеют поля типа date, падали с ошибкой ORA-01861: literal does not match format string. Связана она с несоответствием строки, которую нужно преобразовать в дату с шаблоном NLS_DATE_FORMAT.

Простого решения этой ошибки мы не нашли. Возможно, можно решить простым методом, нежели это сделали мы (конечно, можно просто поменять глобально NLS_DATE_FORMAT, но такой вариант решения может повлиять на работу остальных инструментов, взаимодействующих с Oracle).

Для решения проблемы мы решили все-таки поменять NLS_DATE_FORMAT, но не глобально, а в пределах сессии. Для этого нужно изменить скрипт:

$PYTHON_HOME/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py

В этом файле найти строку:

for query in queries.values():

(Это будет метод resolve_metric_bundle)

Прямо перед циклом добавить следующее условие:

if self.connection_string.split("+")[0] == 'oracle':
  self.engine.execute("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD\"T\"HH24:MI:SS'")

После таких изменений проверки выполняются без каких-либо ошибок.

Следует обратить внимание, что обновление библиотеки удалит это изменение, а также оно возможно повлияет на другие проекты GE. Впрочем, этого можно избежать, создав для этого проекта виртуальную среду Python.

Заключение

Совместное использование DataHub и GE может иметь широкое применение для использования в коммерческих целях. DataHub является очень гибким и универсальным инструментом для сбора метаданных и проведения профилирования объектов данных. Он может работать с огромным числом источников, собирает большое число статистик, и к тому же работает достаточно быстро за счет перекладывания основных расчетов на сервер, где расположен источник данных. Скорость профилирования объектов данных в БД Oracle может достигать 870 тыс. строк в минуту на не самом мощном сервере. Это OpenSource инструмент, который позволяет создать собственное решение под свои потребности.

В дополнение к этому, он может принимать результаты DQ проверок, проводимых GE. Это расширяет и без того широкий функционал DataHub.

Однако есть и недостатки. Во-первых, недостаточная популярность инструментов, из-за чего решение возникшей проблемы может затянутся. В случае DataHub существует официальное сообщество в Slack, что частично решает эту проблему – в случае чего, можно обратится к разработчикам для совместного поиска решения. У GE такой возможности нет. Также инструменты не совсем подходят для работы с БД Oracle. У DataHub эта проблема связана с профилированием численных полей. В случае GE существует проблема с полями типа date – решить ее возможно только изменив исходный код библиотеки.

Хоть связка инструментов и является полезной, для использования с СУБД Oracle она не подходит. Исправить возникающие ошибки в GE возможно, но вот баг с DataHub решить своими силами не представляется возможным (по крайней мере, на момент написания статьи). Если вы используете другую популярную СУБД, например, PostgreSQL, как источник данных, то эти инструменты должны показать себя очень хорошо. Удобство использования, гибкость и универсальность подталкивают к их использованию и изучению.

Комментарии (0)