Меня зовут Артем Шнайдер, и я занимаюсь DataScience в Бланке. Сегодня я хочу рассказать вам о том, как можно интегрировать два мощных инструмента – Dagster и Great Expectations.

Great Expectations позволяет определить так называемые ожидания от ваших данных, то есть задать правила и условия, которым данные должны соответствовать. 

Dagster, с другой стороны, это платформа с открытым исходным кодом для управления данными, которая позволяет создавать, тестировать и развертывать пайплайны данных. Написан на python, что позволяет пользователям гибко настраивать и расширять его функциональность.

Исходный код к этой статье на GitHub.

Давайте начнем?)

Подготовка

Нам понадобятся какие-нибудь данные, которые мы загрузим в базу данных. Я решил взять набор данных транзакций с Kaggle.

Собрал для вас готовый репозиторий, который вы можете клонировать по этой ссылке

Перейдем в директорию /simple‑postgres‑container и запустим нашу базу данных:

docker compose -f "docker-compose.yaml" up -d --build

Теперь у нас есть локальный экземпляр базы данных Postgres с табличкой transactions

Идем дальше)

1. Great Expectations шаг за шагом

В этой части мы установим пакет great-expectations, подключимся к нашей базе данных  и создадим простой набор ожиданий.

Выполним установку great-expectationsи инициализируем проект:

pip install great-expectations
great_expectations init

Команда создаст директорию со следующей структурой:

great_expectations/
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│ └── custom_data_docs
├── profilers
└── uncommitted
├── config_variables.yml
├── data_docs
└── validations

Дальнейшие команды будем выполнять из директории /great_expectations:

cd great_expectations

Great Expectations предлагает  использовать интерфейс командной строки для автоматического создания предварительно настроенных блокнотов Jupyter, которые проведут через конкретные этапы работы с Great Expectations.

1.1 Настроим источник данных

Выполним команду:

great_expectations datasource new

GE предложит варианты подключения к данным, выбираем Relational database (SQL):

P.S. Может потребоваться установка дополнительных пакетов - их GE установит сам в процессе.

Далее выбираем нашу базу данных:

GE сгенерирует Jupyter ноутбук, в котором мы должны заполнить учетные данные для подключения:

Больше ничего менять не потребуется, выполняем ноутбук до конца, закрываем и прерываем команду в терминале. Конфигурацию нашего источника данных можно увидеть в файле great_expectations.yaml.

great_expectations.yaml
# Welcome to Great Expectations! Always know what to expect from your data.
#
# Here you can define datasources, batch kwargs generators, integrations and
# more. This file is intended to be committed to your repo. For help with
# configuration please:
#   - Read our docs: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview/#2-configure-your-datasource
#   - Join our slack channel: http://greatexpectations.io/slack

# config_version refers to the syntactic version of this config file, and is used in maintaining backwards compatibility
# It is auto-generated and usually does not need to be changed.
config_version: 3.0

# Datasources tell Great Expectations where your data lives and how to get it.
# You can use the CLI command `great_expectations datasource new` to help you
# add a new datasource. Read more at https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview
datasources:
  my_datasource:
    class_name: Datasource
    module_name: great_expectations.datasource
    data_connectors:
      default_runtime_data_connector_name:
        batch_identifiers:
          - default_identifier_name
        class_name: RuntimeDataConnector
        module_name: great_expectations.datasource.data_connector
      default_inferred_data_connector_name:
        include_schema_name: true
        class_name: InferredAssetSqlDataConnector
        module_name: great_expectations.datasource.data_connector
        introspection_directives:
          schema_name: public
      default_configured_data_connector_name:
        class_name: ConfiguredAssetSqlDataConnector
        assets:
          transactions:
            class_name: Asset
            schema_name: public
            module_name: great_expectations.datasource.data_connector.asset
        module_name: great_expectations.datasource.data_connector
    execution_engine:
      class_name: SqlAlchemyExecutionEngine
      credentials:
        host: localhost
        port: '5432'
        username: postgres
        password: postgres
        database: postgres
        drivername: postgresql
      module_name: great_expectations.execution_engine

# This config file supports variable substitution which enables: 1) keeping
# secrets out of source control & 2) environment-based configuration changes
# such as staging vs prod.
#
# When GX encounters substitution syntax (like `my_key: ${my_value}` or
# `my_key: $my_value`) in the great_expectations.yml file, it will attempt
# to replace the value of `my_key` with the value from an environment
# variable `my_value` or a corresponding key read from this config file,
# which is defined through the `config_variables_file_path`.
# Environment variables take precedence over variables defined here.
#
# Substitution values defined here can be a simple (non-nested) value,
# nested value such as a dictionary, or an environment variable (i.e. ${ENV_VAR})
#
#
# https://docs.greatexpectations.io/docs/guides/setup/configuring_data_contexts/how_to_configure_credentials


config_variables_file_path: uncommitted/config_variables.yml

# The plugins_directory will be added to your python path for custom modules
# used to override and extend Great Expectations.
plugins_directory: plugins/

stores:
# Stores are configurable places to store things like Expectations, Validations
# Data Docs, and more. These are for advanced users only - most users can simply
# leave this section alone.
#
# Three stores are required: expectations, validations, and
# evaluation_parameters, and must exist with a valid store entry. Additional
# stores can be configured for uses such as data_docs, etc.
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: expectations/

  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/validations/

  evaluation_parameter_store:
    class_name: EvaluationParameterStore
  checkpoint_store:
    class_name: CheckpointStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: checkpoints/

  profiler_store:
    class_name: ProfilerStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: profilers/

expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store

data_docs_sites:
  # Data Docs make it simple to visualize data quality in your project. These
  # include Expectations, Validations & Profiles. The are built for all
  # Datasources from JSON artifacts in the local repo including validations &
  # profiles from the uncommitted directory. Read more at https://docs.greatexpectations.io/docs/terms/data_docs
  local_site:
    class_name: SiteBuilder
    show_how_to_buttons: true
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/data_docs/local_site/
    site_index_builder:
      class_name: DefaultSiteIndexBuilder

anonymous_usage_statistics:
  enabled: true
  data_context_id: ffd790a7-4454-4b53-b9b1-428bfcfb4e64
notebooks:
include_rendered_content:
  globally: false
  expectation_validation_result: false
  expectation_suite: false

1.2 Создадим набор ожиданий

Возвращаемся в командую строку и выполняем:

great_expectations suite new

GE на выбор предоставит несколько способов создания наборов проверок. В документации рекомендуется автоматическое создание с использованием Data Assistant — предварительно настроенной утилиты, упрощающей создание ожиданий.

Также GE предлагает три типа DataConnector классов. Про выбор коннекторов можно почитать здесь, а нам нужен default_configured_data_connector_name:

Выбираем ассет данных и задаем имя нашему набору проверок:

Наборы ожиданий, созданные с помощью ноутбука, не предназначены для использования в производственной среде. Мы можем отредактировать сгенерированный файл/expectations/transactions/suite.json и добавить туда еще пару ожиданий: проверим, что все транзакции имеют ID, а количество купленных товаров больше 0.

Итоговый suite.json
{
  "data_asset_type": null,
  "expectation_suite_name": "transactions.suite",
  "expectations": [
    {
      "expectation_type": "expect_table_row_count_to_be_between",
      "kwargs": {
        "max_value": 2000000,
        "min_value": 0
      },
      "meta": {
        "profiler_details": {
          "metric_configuration": {
            "domain_kwargs": {},
            "metric_name": "table.row_count",
            "metric_value_kwargs": null
          },
          "num_batches": 1
        }
      }
    },
    {
      "expectation_type": "expect_table_columns_to_match_set",
      "kwargs": {
        "column_set": [
          "ItemCode",
          "UserId",
          "NumberOfItemsPurchased",
          "Country",
          "TransactionTime",
          "ItemDescription",
          "TransactionId",
          "CostPerItem"
        ],
        "exact_match": null
      },
      "meta": {
        "profiler_details": {
          "success_ratio": 1.0
        }
      }
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "TransactionId"
      }
    },
    {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "NumberOfItemsPurchased",
        "min_value": 1
      }
    }
  ],
  "ge_cloud_id": null,
  "meta": {
    "citations": [
      {
        "citation_date": "2023-07-06T00:08:01.886955Z",
        "comment": "Created by effective Rule-Based Profiler of OnboardingDataAssistant with the configuration included.\n"
      }
    ],
    "great_expectations_version": "0.17.2"
  }
}

С множеством доступных ожиданий можно ознакомиться в Expectations Store.

Таким образом, у нас есть две проверки на уровне таблицы, и еще две — на уровне колонок. Идем дальше.

1.3 Создание Checkpoint

Checkpoint (контрольная точка) — это основное средство проверки данных при развертывании Great Expectations в производственной среде. Выполним команду:

great_expectations checkpoint new my_checkpoint

После выполнения всех шагов в ноутбуке в директории /checkpoints появится файл my_checkpoint.yaml:

my_checkpoint.yaml
name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name:
batch_request: {}
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
      site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
  - batch_request:
      datasource_name: my_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: public.transactions
      data_connector_query:
        index: -1
    expectation_suite_name: transactions.suite
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:

Если в ноутбуке вы не проверяли работоспособность чекпоинта, это можно сделать в терминале:

great_expectations checkpoint run my_checkpoint

Мы должны увидеть результаты проверки:

В терминале не будет отображаться информация о том, какая именно проверка не прошла, однако имеется возможность автоматически сгенерировать документацию в виде файлов HTML, что позволит посмотреть результаты в любом браузере. Подробнее об использовании Data Docs можно почитать здесь.

great_expectations docs build

Команда автоматически откроет окно в браузере, чтобы мы смогли увидеть результаты валидации:

Набор ожиданий готов, а значит, мы готовы запускать наши чекпоинты в Dagster!

2. Работа с Dagster

Прежде, чем мы создадим наш проект, поговорим об основных концепциях Dagster.

Asset (активы) — это объект, который фиксирует результат выполнения некоторого этапа в пайплайне dagster. Ассеты могут быть любого типа, например, файлами, таблицами базы данных или моделью машинного обучения.

Ops (операции) — являются основной единицей вычисления в Dagster, и должны выполнять относительно простые задачи, например, какие-нибудь вычисления, обращения к базе данных, вызовы API и т.д.

Таким образом, операции — это действия, которые выполняются в пайплайне, а активы — это данные, которые обрабатываются в процессе выполнения этих операций. 

Jobs (задания) - основная единица исполнения и мониторинга в Dagster. Задание может материализовать набор активов, или выполнить граф операций.

Resources (ресурсы) - это объекты, которые являются общими для нескольких активов, операций, расписаний или датчиков. Например, ресурсом может быть подключение к базе данных или сервису.

2.1 Создание проекта

Самый простой способ начать работу с Dagster — использовать шаблон проекта по умолчанию с помощью интерфейса командной строки:

pip install dagster
dagster project scaffold --name great-expectations-dagster

Команда dagster project создаст необходимую структуру папок:

great-expectations-dagster/
├── great_expectations_dagster
│ ├── assets.py
│ └── init.py
├── great_expectations_dagster_tests/
├── pyproject.toml
├── README.md
├── setup.cfg
└── setup.py

Файлы, с которыми мы будем работать:

  • __init__.py — содержит все определения, созданные в нашем проекте. Это могут активы, задания, расписания, датчики и ресурсы;

  • assets.py — модуль Python, содержащий программно-определяемые активы;

  • setup.py — содержит сценарий сборки, используем его для указания зависимостей для нашего проекта.

Подробнее о структуре проекта Dagster можно прочитать здесь.

2.2 Создание программно-определяемого актива

Мы создадим ассет, который провалидилирует наши данные и запишет результаты проверки в постоянное хранилище.

P.S. В Dagster есть возможность использовать фабрику ge_validation_op_factory из пакета dagster-ge для создания операций Dagster, которые интегрируются с Great Expectations. Однако это потребует создания операции, которая выгрузит нам наш набор данных в виде датафрейма и передаст его в качестве входных данных операции, созданной с помощью ge_validation_op_factory.

Отредактируем файл assets.py:

from dagster import MetadataValue, Output, asset
from dagster_ge.factory import GEContextResource
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView


@asset
def validate_data(data_context: GEContextResource):

    # предоставим контекст данных GE
    context = data_context.get_data_context()

    # запустим проверку на соответствие набору ожиданий
    results = context.run_checkpoint('my_checkpoint')

    # визуазлиция результатов проверки в Dagster требует вывода метаданных  
    validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)

    rendered_document_content_list = (
        validation_results_page_renderer.render_validation_operator_result(results)
    )
    md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list))

    return Output(
        value=results['success'],
        metadata={
            "Expectation Results": MetadataValue.md(md_str)
    })

Далее идем в __init__.py и редактируем наше определение:

from dagster import (Definitions, define_asset_job, file_relative_path,
                     load_assets_from_modules)
from dagster_ge.factory import GEContextResource
from great_expectations_dagster.assets import validate_data



defs = Definitions(
    assets=[
        validate_data
    ],
    jobs=[
        # задание, которое запустит материализацию нашего ассета
        define_asset_job(
            name='validate_data_job',
            selection=[validate_data]
        )
    ],
    resources={
        # для настройки контекста GE необходимо указать корень каталога GE
        # (путь к файлу great_exepctations.yaml)
        'data_context': GEContextResource(
            ge_root_dir=file_relative_path(__file__, '../../great_expectations')
        )
    }
)

Теперь нам останется поправить файл setup.py и добавить туда пакет dagster-ge:

from setuptools import find_packages, setup

setup(
    name="great_expectations_dagster",
    packages=find_packages(exclude=["great_expectations_dagster_tests"]),
    install_requires=[
        "dagster",
        "dagster-ge"
    ],
    extras_require={"dev": ["dagit", "pytest"]},
)

Теперь все готово, чтобы мы могли запустить наш маленький паплайн!

В корне директории great-expectations-dagster/ выполним следующие команды:

pip install -e “.[dev]”
dagit

Команда dagit распечатает URL-адрес, по которому мы сможем получить доступ к интерфейсу в браузере, обычно через порт 3000. Мы должны увидеть наше задание validate_data_job:

Теперь нам достаточно нажать на кнопку Materialize, подождать, пока задание материализует актив, и мы сможем увидеть метаданные нашей материализации:

Для того, чтобы увидеть панель справа, необходимо кликнуть на икону ассета
Для того, чтобы увидеть панель справа, необходимо кликнуть на икону ассета

На вкладке мы видим поле Expectation Results и ссылку [Show Markdown], нам сюда)

Вот и всё :-)

Интеграция между Dagster и Great Expectations представляет собой мощный инструмент для обеспечения качества данных в конвейерах данных, с возможностью проверки и визуализации ожиданий на каждом этапе контейнеров данных.

Надеюсь, этот туториал будет полезен :-)

Комментарии (0)