Меня зовут Артем Шнайдер, и я занимаюсь DataScience в Бланке. Сегодня я хочу рассказать вам о том, как можно интегрировать два мощных инструмента – Dagster и Great Expectations.
Great Expectations позволяет определить так называемые ожидания от ваших данных, то есть задать правила и условия, которым данные должны соответствовать.
Dagster, с другой стороны, это платформа с открытым исходным кодом для управления данными, которая позволяет создавать, тестировать и развертывать пайплайны данных. Написан на python, что позволяет пользователям гибко настраивать и расширять его функциональность.
Исходный код к этой статье на GitHub.
Давайте начнем?)
Подготовка
Нам понадобятся какие-нибудь данные, которые мы загрузим в базу данных. Я решил взять набор данных транзакций с Kaggle.
Собрал для вас готовый репозиторий, который вы можете клонировать по этой ссылке.
Перейдем в директорию /simple‑postgres‑container
и запустим нашу базу данных:
docker compose -f "docker-compose.yaml" up -d --build
Теперь у нас есть локальный экземпляр базы данных Postgres с табличкой transactions.
Идем дальше)
1. Great Expectations шаг за шагом
В этой части мы установим пакет great-expectations
, подключимся к нашей базе данных и создадим простой набор ожиданий.
Выполним установку great-expectations
и инициализируем проект:
pip install great-expectations
great_expectations init
Команда создаст директорию со следующей структурой:
great_expectations/
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│ └── custom_data_docs
├── profilers
└── uncommitted
├── config_variables.yml
├── data_docs
└── validations
Дальнейшие команды будем выполнять из директории /great_expectations
:
cd great_expectations
Great Expectations предлагает использовать интерфейс командной строки для автоматического создания предварительно настроенных блокнотов Jupyter, которые проведут через конкретные этапы работы с Great Expectations.
1.1 Настроим источник данных
Выполним команду:
great_expectations datasource new
GE предложит варианты подключения к данным, выбираем Relational database (SQL)
:
P.S. Может потребоваться установка дополнительных пакетов - их GE установит сам в процессе.
Далее выбираем нашу базу данных:
GE сгенерирует Jupyter ноутбук, в котором мы должны заполнить учетные данные для подключения:
Больше ничего менять не потребуется, выполняем ноутбук до конца, закрываем и прерываем команду в терминале. Конфигурацию нашего источника данных можно увидеть в файле great_expectations.yaml.
great_expectations.yaml
# Welcome to Great Expectations! Always know what to expect from your data.
#
# Here you can define datasources, batch kwargs generators, integrations and
# more. This file is intended to be committed to your repo. For help with
# configuration please:
# - Read our docs: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview/#2-configure-your-datasource
# - Join our slack channel: http://greatexpectations.io/slack
# config_version refers to the syntactic version of this config file, and is used in maintaining backwards compatibility
# It is auto-generated and usually does not need to be changed.
config_version: 3.0
# Datasources tell Great Expectations where your data lives and how to get it.
# You can use the CLI command `great_expectations datasource new` to help you
# add a new datasource. Read more at https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview
datasources:
my_datasource:
class_name: Datasource
module_name: great_expectations.datasource
data_connectors:
default_runtime_data_connector_name:
batch_identifiers:
- default_identifier_name
class_name: RuntimeDataConnector
module_name: great_expectations.datasource.data_connector
default_inferred_data_connector_name:
include_schema_name: true
class_name: InferredAssetSqlDataConnector
module_name: great_expectations.datasource.data_connector
introspection_directives:
schema_name: public
default_configured_data_connector_name:
class_name: ConfiguredAssetSqlDataConnector
assets:
transactions:
class_name: Asset
schema_name: public
module_name: great_expectations.datasource.data_connector.asset
module_name: great_expectations.datasource.data_connector
execution_engine:
class_name: SqlAlchemyExecutionEngine
credentials:
host: localhost
port: '5432'
username: postgres
password: postgres
database: postgres
drivername: postgresql
module_name: great_expectations.execution_engine
# This config file supports variable substitution which enables: 1) keeping
# secrets out of source control & 2) environment-based configuration changes
# such as staging vs prod.
#
# When GX encounters substitution syntax (like `my_key: ${my_value}` or
# `my_key: $my_value`) in the great_expectations.yml file, it will attempt
# to replace the value of `my_key` with the value from an environment
# variable `my_value` or a corresponding key read from this config file,
# which is defined through the `config_variables_file_path`.
# Environment variables take precedence over variables defined here.
#
# Substitution values defined here can be a simple (non-nested) value,
# nested value such as a dictionary, or an environment variable (i.e. ${ENV_VAR})
#
#
# https://docs.greatexpectations.io/docs/guides/setup/configuring_data_contexts/how_to_configure_credentials
config_variables_file_path: uncommitted/config_variables.yml
# The plugins_directory will be added to your python path for custom modules
# used to override and extend Great Expectations.
plugins_directory: plugins/
stores:
# Stores are configurable places to store things like Expectations, Validations
# Data Docs, and more. These are for advanced users only - most users can simply
# leave this section alone.
#
# Three stores are required: expectations, validations, and
# evaluation_parameters, and must exist with a valid store entry. Additional
# stores can be configured for uses such as data_docs, etc.
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_store:
class_name: CheckpointStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: checkpoints/
profiler_store:
class_name: ProfilerStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: profilers/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
data_docs_sites:
# Data Docs make it simple to visualize data quality in your project. These
# include Expectations, Validations & Profiles. The are built for all
# Datasources from JSON artifacts in the local repo including validations &
# profiles from the uncommitted directory. Read more at https://docs.greatexpectations.io/docs/terms/data_docs
local_site:
class_name: SiteBuilder
show_how_to_buttons: true
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/data_docs/local_site/
site_index_builder:
class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
enabled: true
data_context_id: ffd790a7-4454-4b53-b9b1-428bfcfb4e64
notebooks:
include_rendered_content:
globally: false
expectation_validation_result: false
expectation_suite: false
1.2 Создадим набор ожиданий
Возвращаемся в командую строку и выполняем:
great_expectations suite new
GE на выбор предоставит несколько способов создания наборов проверок. В документации рекомендуется автоматическое создание с использованием Data Assistant — предварительно настроенной утилиты, упрощающей создание ожиданий.
Также GE предлагает три типа DataConnector
классов. Про выбор коннекторов можно почитать здесь, а нам нужен default_configured_data_connector_name
:
Выбираем ассет данных и задаем имя нашему набору проверок:
Наборы ожиданий, созданные с помощью ноутбука, не предназначены для использования в производственной среде. Мы можем отредактировать сгенерированный файл/expectations/transactions/suite.json
и добавить туда еще пару ожиданий: проверим, что все транзакции имеют ID, а количество купленных товаров больше 0.
Итоговый suite.json
{
"data_asset_type": null,
"expectation_suite_name": "transactions.suite",
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"max_value": 2000000,
"min_value": 0
},
"meta": {
"profiler_details": {
"metric_configuration": {
"domain_kwargs": {},
"metric_name": "table.row_count",
"metric_value_kwargs": null
},
"num_batches": 1
}
}
},
{
"expectation_type": "expect_table_columns_to_match_set",
"kwargs": {
"column_set": [
"ItemCode",
"UserId",
"NumberOfItemsPurchased",
"Country",
"TransactionTime",
"ItemDescription",
"TransactionId",
"CostPerItem"
],
"exact_match": null
},
"meta": {
"profiler_details": {
"success_ratio": 1.0
}
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "TransactionId"
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "NumberOfItemsPurchased",
"min_value": 1
}
}
],
"ge_cloud_id": null,
"meta": {
"citations": [
{
"citation_date": "2023-07-06T00:08:01.886955Z",
"comment": "Created by effective Rule-Based Profiler of OnboardingDataAssistant with the configuration included.\n"
}
],
"great_expectations_version": "0.17.2"
}
}
С множеством доступных ожиданий можно ознакомиться в Expectations Store.
Таким образом, у нас есть две проверки на уровне таблицы, и еще две — на уровне колонок. Идем дальше.
1.3 Создание Checkpoint
Checkpoint (контрольная точка) — это основное средство проверки данных при развертывании Great Expectations в производственной среде. Выполним команду:
great_expectations checkpoint new my_checkpoint
После выполнения всех шагов в ноутбуке в директории /checkpoints
появится файл my_checkpoint.yaml:
my_checkpoint.yaml
name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name:
batch_request: {}
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_params
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
- batch_request:
datasource_name: my_datasource
data_connector_name: default_inferred_data_connector_name
data_asset_name: public.transactions
data_connector_query:
index: -1
expectation_suite_name: transactions.suite
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:
Если в ноутбуке вы не проверяли работоспособность чекпоинта, это можно сделать в терминале:
great_expectations checkpoint run my_checkpoint
Мы должны увидеть результаты проверки:
В терминале не будет отображаться информация о том, какая именно проверка не прошла, однако имеется возможность автоматически сгенерировать документацию в виде файлов HTML, что позволит посмотреть результаты в любом браузере. Подробнее об использовании Data Docs можно почитать здесь.
great_expectations docs build
Команда автоматически откроет окно в браузере, чтобы мы смогли увидеть результаты валидации:
Набор ожиданий готов, а значит, мы готовы запускать наши чекпоинты в Dagster!
2. Работа с Dagster
Прежде, чем мы создадим наш проект, поговорим об основных концепциях Dagster.
Asset (активы) — это объект, который фиксирует результат выполнения некоторого этапа в пайплайне dagster. Ассеты могут быть любого типа, например, файлами, таблицами базы данных или моделью машинного обучения.
Ops (операции) — являются основной единицей вычисления в Dagster, и должны выполнять относительно простые задачи, например, какие-нибудь вычисления, обращения к базе данных, вызовы API и т.д.
Таким образом, операции — это действия, которые выполняются в пайплайне, а активы — это данные, которые обрабатываются в процессе выполнения этих операций.
Jobs (задания) - основная единица исполнения и мониторинга в Dagster. Задание может материализовать набор активов, или выполнить граф операций.
Resources (ресурсы) - это объекты, которые являются общими для нескольких активов, операций, расписаний или датчиков. Например, ресурсом может быть подключение к базе данных или сервису.
2.1 Создание проекта
Самый простой способ начать работу с Dagster — использовать шаблон проекта по умолчанию с помощью интерфейса командной строки:
pip install dagster
dagster project scaffold --name great-expectations-dagster
Команда dagster project
создаст необходимую структуру папок:
great-expectations-dagster/
├── great_expectations_dagster
│ ├── assets.py
│ └── init.py
├── great_expectations_dagster_tests/
├── pyproject.toml
├── README.md
├── setup.cfg
└── setup.py
Файлы, с которыми мы будем работать:
__init__.py
— содержит все определения, созданные в нашем проекте. Это могут активы, задания, расписания, датчики и ресурсы;assets.py
— модуль Python, содержащий программно-определяемые активы;setup.py
— содержит сценарий сборки, используем его для указания зависимостей для нашего проекта.
Подробнее о структуре проекта Dagster можно прочитать здесь.
2.2 Создание программно-определяемого актива
Мы создадим ассет, который провалидилирует наши данные и запишет результаты проверки в постоянное хранилище.
P.S. В Dagster есть возможность использовать фабрику ge_validation_op_factory
из пакета dagster-ge для создания операций Dagster, которые интегрируются с Great Expectations. Однако это потребует создания операции, которая выгрузит нам наш набор данных в виде датафрейма и передаст его в качестве входных данных операции, созданной с помощью ge_validation_op_factory.
Отредактируем файл assets.py:
from dagster import MetadataValue, Output, asset
from dagster_ge.factory import GEContextResource
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView
@asset
def validate_data(data_context: GEContextResource):
# предоставим контекст данных GE
context = data_context.get_data_context()
# запустим проверку на соответствие набору ожиданий
results = context.run_checkpoint('my_checkpoint')
# визуазлиция результатов проверки в Dagster требует вывода метаданных
validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
rendered_document_content_list = (
validation_results_page_renderer.render_validation_operator_result(results)
)
md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list))
return Output(
value=results['success'],
metadata={
"Expectation Results": MetadataValue.md(md_str)
})
Далее идем в __init__.py
и редактируем наше определение:
from dagster import (Definitions, define_asset_job, file_relative_path,
load_assets_from_modules)
from dagster_ge.factory import GEContextResource
from great_expectations_dagster.assets import validate_data
defs = Definitions(
assets=[
validate_data
],
jobs=[
# задание, которое запустит материализацию нашего ассета
define_asset_job(
name='validate_data_job',
selection=[validate_data]
)
],
resources={
# для настройки контекста GE необходимо указать корень каталога GE
# (путь к файлу great_exepctations.yaml)
'data_context': GEContextResource(
ge_root_dir=file_relative_path(__file__, '../../great_expectations')
)
}
)
Теперь нам останется поправить файл setup.py
и добавить туда пакет dagster-ge
:
from setuptools import find_packages, setup
setup(
name="great_expectations_dagster",
packages=find_packages(exclude=["great_expectations_dagster_tests"]),
install_requires=[
"dagster",
"dagster-ge"
],
extras_require={"dev": ["dagit", "pytest"]},
)
Теперь все готово, чтобы мы могли запустить наш маленький паплайн!
В корне директории great-expectations-dagster/
выполним следующие команды:
pip install -e “.[dev]”
dagit
Команда dagit
распечатает URL-адрес, по которому мы сможем получить доступ к интерфейсу в браузере, обычно через порт 3000. Мы должны увидеть наше задание validate_data_job
:
Теперь нам достаточно нажать на кнопку Materialize, подождать, пока задание материализует актив, и мы сможем увидеть метаданные нашей материализации:
На вкладке мы видим поле Expectation Results и ссылку [Show Markdown], нам сюда)
Вот и всё :-)
Интеграция между Dagster и Great Expectations представляет собой мощный инструмент для обеспечения качества данных в конвейерах данных, с возможностью проверки и визуализации ожиданий на каждом этапе контейнеров данных.
Надеюсь, этот туториал будет полезен :-)