Оглавление
Предистория
Здесь будет рассказано почему это всё получилось и коротко про бизнес-кейс, если не интересно, то переходи сразу к следующему разделу :)
Существуют такие метрики, которые выглядят просто, но очень хорошо отображают суть бизнеса в какой-то момент.
Такие метрики как: MAU, DAU, сумма продаж и прочее дают стейкхолдером хорошее понимание что сейчас с бизнесом.
Для дата-аналитика это выглядит так: дата, название метрики, значение.
Для дата-инженера это выглядит как семь кругов ада: извлечение из источника, очистка, обогащение, агрегация, расчет бизнес-логики, снова агрегация, опять очистка и только тогда метрика готова попасть "на стол". (количество кругов ада различается от команды к команде и от бизнес-задачи, но суть вы поняли :))
Я постоянно пытаюсь как-то автоматизировать свою работу и упростить жизнь и себе, и людям.
И тут появился хороший шанс это сделать.
Пришел бизнес с примерно таким запросом: "У нас существует более ста метрик, которые имеют вид: дата, название метрики, значение. Мы хотели бы это всё автоматизировать, потому что это собирается частично руками, частично какими-то скриптами и у этого всего разные владельцы. А мы хотим всё хранить в одном месте. Ну и самое главное – мы хотим версионировать все свои метрики, чтобы мы знали когда, зачем и почему изменилась та или иная метрика" – на самом этот запрос звучал так: "нам нужно хранить метрики и знать когда, что и почему с ними происходило" :)
И я начал думать.
На ум пришла сразу же идея – использовать SCD.
Сразу захотелось создать такую таблицу с типом "II", в которой будет метрика, логика, комментарии и прочее. У каждой строки будет дата создания, дата "удаления".
А ещё я очень хотел не только создать такую таблицу, но и иметь возможность генерировать DAG через эту таблицу
И я это сделал.
Об этом и пойдет дальше речь.
Если что, то весь код, все коммиты и прочее находятся в данном git-репозитории.
Используемые технологии:
-
MacBook Air (M1, 2020)
Оперативная память 16 Gb
macOS Monterey 12.6
-
Docker 4.22.0
-
Docker resources:
CPUs: 2
Memory: 4 Gb
Swap: 4 Gb
-
Python3.11
Подготовка стенда
Для начала скачаем себе docker-compose.yaml
с официального сайт Airflow:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'
Если его запустить, то мы получим там Python3.7. Но хотелось бы что-то побыстрее, поэтому давайте сделаем себе Python3.11. Для этого нужно создать Dockerfile со следующей командой:
FROM apache/airflow:2.6.3-python3.11
Затем в docker-compose.yaml
изменяем стандартный образ на наш (строка 53):
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3}
build: .
Всё, готово. Теперь давайте соберем наш проект. Для этого выполним команду:
docker-compose up -d
* Если вы уже выполняли команду docker-compose up -d
, то необходимо сначала выполнить команду docker-compose build
и затем снова docker-compose up -d
.
Ждем, когда скачаются все образы и поднимутся все контейнеры. После сборки Airflow он будет доступен по адресу http://0.0.0.0:8080/home
Создание виртуального окружения
Теперь нужно создать виртуальное окружение, чтобы корректно работала подсветка синтаксиса и работали все подсказки.
Для этого нужно выполнить команду:
python3.11 -m venv venv && \ ~/_code/github/scd_dag_factory
source venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt
В наше виртуальное окружение установятся все зависимости, которые необходимы для работы данного проекта.
Создание сервиса с базой данных в docker-compose
Для нашего проекта понадобится база данных (БД). Я по классике выбрал PostgreSQL (в оригинальном проекте был развернут инстанс GreenPlum 6).
Для того, чтобы корректно работала БД в нашей сборке мы добавим в конец docker-compose.yaml
следующую команду:
test_db:
image: postgres
restart: always
ports:
- "1:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
После добавления этой команды необходимо заново перезапустить сборку командой:
docker-compose up -d
Теперь всё готово для работы над проектом
Для начала напишем простой DAG, который позволит проверить работу коннекта к нашей ранее созданной БД.
DAG, который проверяет коннект к БД в docker-сборке
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'check_pg'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')
# Описание возможных ключей для default_args
# https://github.com/apache/airflow/blob/343d38af380afad2b202838317a47a7b1687f14f/airflow/example_dags/tutorial.py#L39
args = {
'owner': OWNER,
'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
}
def check_pg_connect(**context):
""""""
pg = PostgresHook('test_db')
df = pg.get_pandas_df('SELECT 1 AS one')
if len(df) == 1:
print(True)
with DAG(
dag_id=DAG_ID,
schedule_interval='10 0 * * *',
default_args=args,
tags=['check_pg_connect', 'test'],
concurrency=1,
max_active_tasks=1,
max_active_runs=1,
) as dag:
start = EmptyOperator(
task_id='start',
)
check_pg_connect = PythonOperator(
task_id='check_pg_connect',
python_callable=check_pg_connect,
)
end = EmptyOperator(
task_id='end',
)
start >> check_pg_connect >> end
Запустим его и посмотрим на него. Если раны горят зеленым, то всё идет по плану.
Далее давайте напишем наш типовой DAG, который будет собирать метрики по типу: дата, метрика, значение.
Типовой DAG, который будет собирать какую-то метрику (simple_dag)
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'simple_dag'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')
# Названия коннекторов к PG
PG_CONNECT = 'test_db'
# Используемые таблицы в DAG
PG_TARGET_SCHEMA = 'dm'
PG_TARGET_TABLE = 'fct_sales'
PG_TMP_SCHEMA = 'stg'
PG_TMP_TABLE = f'tmp_{PG_TARGET_TABLE}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'
INDEX_KPI = 1
sql_query = '''
SELECT
('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
(random()*100)::int AS value,
1 AS kpi_id
'''
LONG_DESCRIPTION = '# LONG_DESCRIPTION'
SHORT_DESCRIPTION = 'SHORT_DESCRIPTION'
args = {
'owner': OWNER,
'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
'catchup': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
}
with DAG(
dag_id=DAG_ID,
schedule_interval='10 0 * * *',
default_args=args,
tags=['dm'],
description=SHORT_DESCRIPTION,
concurrency=1,
max_active_tasks=1,
max_active_runs=1,
) as dag:
dag.doc_md = LONG_DESCRIPTION
start = EmptyOperator(
task_id='start',
)
drop_tmp_before = PostgresOperator(
task_id='drop_tmp_before',
sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
postgres_conn_id=PG_CONNECT
)
create_tmp = PostgresOperator(
task_id='create_tmp',
sql=f'''
CREATE TABLE {PG_TMP_SCHEMA}.{PG_TMP_TABLE} AS
{
sql_query.format(
start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
)
};
''',
postgres_conn_id=PG_CONNECT
)
delete_from_target = PostgresOperator(
task_id='delete_from_target',
sql=f'''
DELETE FROM {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}
WHERE
date IN (
SELECT
date
FROM
{PG_TMP_SCHEMA}.{PG_TMP_TABLE}
)
''',
postgres_conn_id=PG_CONNECT
)
insert_from_tmp_to_target = PostgresOperator(
task_id='insert_from_tmp_to_target',
sql=f'''
INSERT INTO {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}("date", value, kpi_id)
SELECT
"date",
value,
kpi_id
FROM
{PG_TMP_SCHEMA}.{PG_TMP_TABLE}
''',
postgres_conn_id=PG_CONNECT
)
drop_tmp_after = PostgresOperator(
task_id='drop_tmp_after',
sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
postgres_conn_id=PG_CONNECT
)
end = EmptyOperator(
task_id='end',
)
start >> drop_tmp_before >> create_tmp >> delete_from_target >> insert_from_tmp_to_target >> drop_tmp_after >> end
В генерируемых значениях есть уже колонка kpi_id
. Пока не обращаем на неё внимание. О ней чуть позже.
Перед запуском DAG давайте создадим таблицу, в которую будут писаться метрики.
DDL код для создания таблицы фактов
CREATE SCHEMA dm;
CREATE SCHEMA stg;
DROP TABLE IF EXISTS dm.fct_sales;
CREATE TABLE dm.fct_sales (
id bigserial PRIMARY KEY,
"date" date NOT NULL,
kpi_id int2 NOT NULL,
value int4 NOT NULL
);
После этого мы можем запустить DAG и мы увидим как появляются данные в нашей таблице.
Уже хорошо. Мы, на текущий момент, имеем DAG, который собирает какую-то метрику (давайте примем за условность, что метрика реально собирается, хоть и значения рандомного генерируются).
Теперь мы можем перейти к созданию SCD-таблицы с метриками.
Создание SCD-таблицы
Немного повторюсь, я хочу в этой таблице хранить всю информацию, которая касается метрик. Соответсвенно я хотел бы иметь полную информацию о ней: название, бизнес-логику, комментарии аналитиков, комментарии дата-инженеров и прочее.
А самое главное – я хочу из этой информации генерировать DAG, поэтому нужно определить значения, которые не могут быть пустыми (NOT NULL
).
Давайте создадим такую таблицу следующим скриптом:
DDL код для SCD-таблицы
CREATE TABLE public.dim_kpi_dag_gen_config (
id serial4 NOT NULL,
kpi_id int4 NOT NULL,
dag_id varchar(255) NOT NULL,
"owner" varchar(30) NOT NULL,
start_date varchar(40) NOT NULL,
metric_name_en varchar(255) NOT NULL,
sql_query text NOT NULL,
short_description_md varchar(255) NULL,
long_description_md text NULL,
cron varchar(50) NOT NULL,
sensors varchar(255) NULL,
tags varchar(255) NOT NULL,
metric_line varchar(255) NULL,
"source" varchar(255) NULL,
bi_logic varchar(255) NULL,
comment_pa text NULL,
comment_de text NULL,
is_actual bool NULL DEFAULT TRUE,
created_at timestamptz NULL DEFAULT now(),
changed_at timestamptz NULL,
pg_environment varchar(10) NOT NULL DEFAULT 'prod'::character varying,
airflow_environment varchar(10) NOT NULL DEFAULT 'dev'::character varying,
CONSTRAINT dim_kpi_dag_gen_config_pkey PRIMARY KEY (id)
)
И давайте сразу же создадим таблицу, в которой будут храниться все наши собранные метрики:
DDL код для таблицы фактов (таблица для всех собираемых метрик)
CREATE TABLE public.fct_dm_kpi (
"date" date NULL,
value float8 NULL,
kpi_id int4 NULL
);
Создание инструмента для наполнения SCD-таблицы
Ниже я покажу MVP-вариант, который может заполнять SCD-таблицу. На своем проекте, с помощью другой команды, мы прикрутили к нему веб-интерфейс.
SCD-инструмент, который позволяет актуализировать и заполнять таблицу
from connectors_to_databases import PostgreSQL
pg = PostgreSQL(
port=1
)
TABLE = 'dim_kpi_dag_gen_config'
def gen_insert_sql_for_kpi_id(dict_kpi: dict = None) -> str:
"""
Генерирует скрипт для вставки данных в SCD.
Определяет есть ли такой ключ.
Если нет, то делает вставку с нужными данными, указанными в dict_kpi.
Если есть, то делает вставку с нужными данными, указанными в dict_kpi и дублирует информацию из прошлых строк.
@param dict_kpi: Словарь с описанием kpi.
@return: Строку для вставки значений в SCD.
"""
# Проверка наличия kpi_id в таблице
df_check = pg.execute_to_df(f'''
SELECT
kpi_id
FROM
{TABLE}
WHERE
kpi_id = {dict_kpi['kpi_id']}
''')
# Проверяем есть ли такой kpi_id в таблице
if len(df_check) >= 1:
# В запросе исключаем те поля, которые генерируется сами через `DEFAULT`
query = f'''
SELECT
column_name
FROM
information_schema.columns
WHERE
table_name = '{TABLE}'
AND column_name NOT IN (
'id', 'created_at', 'changed_at', 'is_actual',
{', '.join(f"'{i}'" for i in dict_kpi)}
)
'''
df = pg.execute_to_df(query) # noqa: PD901
insert_sql_column_current = ', '.join(value for value in df.column_name)
insert_sql_column_modified = insert_sql_column_current + f''', {', '.join(i for i in dict_kpi)}'''
list_values = []
for value in dict_kpi.values():
# Обработка одинарных кавычек в значениях. Они встречаются при указании дат.
if "'" in str(value):
value = value.replace("'", "''")
list_values.append(f"'{value}'")
elif value is None:
list_values.append('NULL')
else:
list_values.append(f"'{value}'")
insert_sql_column_values = insert_sql_column_current + f''', {', '.join(list_values)}'''
sql_insert = f'''
INSERT INTO {TABLE}
(
{insert_sql_column_modified}
)
SELECT
{insert_sql_column_values}
FROM
{TABLE}
WHERE
is_actual IS TRUE
AND kpi_id = {dict_kpi['kpi_id']};
'''
else:
# Если нет такого kpi_id в таблице, то генерируем вставку значений из словаря
columns = ', '.join(value for value in dict_kpi)
list_values = []
for value in dict_kpi.values():
if "'" in str(value):
value = value.replace("'", "''")
list_values.append(f"'{value}'")
elif value is None:
list_values.append('NULL')
else:
list_values.append(f"'{value}'")
values = ', '.join(list_values)
sql_insert = f'''
INSERT INTO {TABLE}({columns})
VALUES ({values});
'''
return sql_insert
def scd_dim_kpi(dict_kpi: dict = None) -> None:
"""
Основная функция, которая принимает на вход словарь с описанием kpi.
Каждый ключ – это поле в таблице SCD.
Каждое значение – это значение поля в таблице SCD.
@param dict_kpi: Словарь с описанием kpi по выбранным колонкам.
@return: Ничего не возвращает, выполняет SQL-скрипт на вставку данных в SCD.
"""
# Обновление changed_at в предыдущей актуальной записи
update_changed_at_for_kpi_id = f'''
UPDATE {TABLE}
SET
changed_at = NOW()
WHERE
kpi_id = {dict_kpi['kpi_id']}
AND is_actual IS TRUE;
'''
# Вставка новой записи с обновленными значениями полей
insert_new_values_for_kpi_id = gen_insert_sql_for_kpi_id(dict_kpi=dict_kpi)
# Обновление is_actual для каждого kpi_id
update_is_actual_for_kpi_id = f'''
UPDATE {TABLE}
SET
is_actual = false
WHERE
kpi_id = {dict_kpi['kpi_id']}
AND id <> (
SELECT MAX(id)
FROM {TABLE}
WHERE kpi_id = {dict_kpi['kpi_id']}
);
'''
# Собираем SQL-скрипт из разных кусков, чтобы он прошел в одной транзакции
sql_query = update_changed_at_for_kpi_id + insert_new_values_for_kpi_id + update_is_actual_for_kpi_id
print(sql_query) # noqa: T201
pg.execute_script(sql_query)
И теперь давайте вызовем нашу основную функцию scd_dim_kpi
, чтобы получить первую запись с метрикой в нашей таблице.
# Пример использования
new_values = {
'kpi_id': 1,
'dag_id': 'test_1',
'metric_name_en': 'test_1',
'owner': 'korsak0v',
'start_date': '2021-01-01',
'cron': '10 0 * * *',
'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
'sql_query': '''
SELECT
date,
count(values) AS value
FROM
fct_some_table_with_random_values
WHERE
date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY
1
''',
}
# Вызов функции
scd_dim_kpi(
dict_kpi=new_values
)
Вот что мы увидим в БД:
Отлично, метрика находится в БД и с этого момента можно сказать начинается её версионирование. Каждое изменение будет фиксироваться в БД отдельной строкой и мы сможем отследить все изменения.
Давайте для примера изменим поле comment_de
:
new_values = {
'kpi_id': 1,
'comment_de': 'Сделали так, потому что потому'
}
# Вызов функции
scd_dim_kpi(
dict_kpi=new_values
)
Получим в БД такую информацию: kpi_id
не изменился, изменилась информация в dim_kpi_dag_gen_config
и также видно, когда была изменена метрика, ну и самое главное – флаг is_actual
переместился на последнюю запись по данному kpi_id
.
Теперь мы готовы генерировать DAG из этой таблицы.
Генерация DAG через SCD-таблицу
Давайте оставим simple_dag без изменений, но возьмем его за основу и сделаем из него функцию, которая будет возвращать DAG на основании полученных атрибутов.
DAG генератор DAG
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def create_kpi_of_dag(
owner: str = None,
dag_id: str = None,
pg_target_schema: str = None,
pg_target_table: str = None,
index_kpi: str = None,
pg_environment: str = None,
airflow_environment: str = None,
long_description: str = None,
short_description: str = None,
start_date: str = None,
cron_expression: str = None,
tags: str = None,
sensors: str = None,
sql_query: str = None,
) -> DAG:
"""
Функция, которая генерирует типовой DAG для получения метрики.
Вся логика описана и прокомментирована внутри функции.
Некоторые моменты обработаны исключительно для функции, чтобы обработать какие-то атрибуты и получить желаемый
эффект.
@param owner: Владелец DAG.
@param dag_id: Название DAG.
@param pg_target_schema: Целевая схема в PostgreSQL.
@param pg_target_table: Целевая таблица в PostgreSQL.
@param index_kpi: ID показателя.
@param pg_environment: Окружение PostgreSQL.
@param airflow_environment: Окружение Airflow.
@param long_description: Полное описание отчета.
@param short_description: Короткое описание отчета.
@param start_date: Дата начала работы DAG.
@param cron_expression: Cron.
@param tags: Tags.
@param sensors: Sensors.
@param sql_query: SQL-запрос для получения метрики.
@return: Возвращает DAG.
"""
# Конфигурация DAG
local_tz = pendulum.timezone('Europe/Moscow')
# Используемые таблицы в DAG
pg_target_schema = pg_target_schema
pg_target_table = pg_target_table
pg_tmp_schema = 'stg'
pg_tmp_table = f'tmp_{dag_id}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'
# Названия коннекторов к GP
pg_connect = 'test_db_dev' if pg_environment == 'dev' else 'test_db'
# Сделана заглушка атрибута. Это можно использовать для указания разных сценариев в зависимости от окружения
airflow_environment = airflow_environment
# Дата приходит в формате str и после парсинга, мы можем получить дату и любые элементы даты
parse_date = pendulum.parse(start_date)
args = {
'owner': owner,
'start_date': datetime(parse_date.year, parse_date.month, parse_date.day, tzinfo=local_tz),
'catchup': True,
'depends_on_past': True,
'retries': 3,
'retry_delay': timedelta(hours=1),
}
# Tags приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
raw_tags = list(tags.split(','))
tags_ = []
for i in raw_tags:
tags_.append( # noqa: PERF401
i.replace("'", "")
.replace(" ", '')
.replace("[", "")
.replace("]", "")
)
# Sensors приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
if sensors:
raw_sensors = list(sensors.split(','))
sensors_ = []
for i in raw_sensors:
sensors_.append( # noqa: PERF401
i.replace("'", "")
.replace(' ', '')
.replace("[", "")
.replace("]", "")
)
else:
sensors_ = None
with DAG(
dag_id=dag_id,
schedule_interval=cron_expression,
default_args=args,
tags=tags_,
description=short_description,
concurrency=1,
max_active_tasks=1,
max_active_runs=1,
) as dag:
dag.doc_md = long_description
start = EmptyOperator(
task_id='start',
)
# Если есть sensors, то мы создаем задачи с сенсорами, иначе создаем одну пустышку
if sensors_:
sensors_task = [
ExternalTaskSensor(
task_id=f'sensor_{dag}',
external_dag_id=dag,
allowed_states=['success'],
mode='reschedule',
timeout=360000, # длительность работы сенсора
poke_interval=600 # частота проверки
) for dag in sensors_
]
else:
sensors_task = [EmptyOperator(task_id=f'empty_{value}') for value in range(1)]
drop_tmp_before = PostgresOperator(
task_id='drop_tmp_before',
sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
postgres_conn_id=pg_connect
)
create_tmp = PostgresOperator(
task_id='create_tmp',
sql=f'''
CREATE TABLE {pg_tmp_schema}.{pg_tmp_table} AS
{
sql_query.format(
start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
)
};
''',
postgres_conn_id=pg_connect
)
delete_from_target = PostgresOperator(
task_id='delete_from_target',
sql=f'''
DELETE FROM {pg_target_schema}.{pg_target_table}
WHERE
date IN (
SELECT
date
FROM
{pg_tmp_schema}.{pg_tmp_table}
WHERE
kpi_id = {index_kpi}
)
AND kpi_id = {index_kpi}
''',
postgres_conn_id=pg_connect
)
insert_from_tmp_to_target = PostgresOperator(
task_id='insert_from_tmp_to_target',
sql=f'''
INSERT INTO {pg_target_schema}.{pg_target_table}("date", value, kpi_id)
SELECT
"date",
value,
{index_kpi} AS kpi_id
FROM
{pg_tmp_schema}.{pg_tmp_table}
''',
postgres_conn_id=pg_connect
)
drop_tmp_after = PostgresOperator(
task_id='drop_tmp_after',
sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
postgres_conn_id=pg_connect
)
end = EmptyOperator(
task_id='end',
)
start >> sensors_task >> drop_tmp_before >> create_tmp >> delete_from_target >>\
insert_from_tmp_to_target >> drop_tmp_after >> end
return dag
# build a dag from dag config
def generator_of_morning_kpi_dag_to_gp() -> None:
"""
Функция получает список config из БД и генерирует DAG's на основании функции `create_kpi_of_dag`.
Итерируется по config и каждый раз выполняет функцию `create_kpi_of_dag`, которая возвращает DAG.
@return: None
"""
pg_hook = PostgresHook(postgres_conn_id='test_db')
df = pg_hook.get_pandas_df( # noqa: PD901
'''
SELECT
kpi_id,
dag_id,
"owner",
sql_query,
start_date,
pg_environment,
airflow_environment,
short_description_md,
long_description_md,
cron,
sensors,
tags
FROM
dim_kpi_dag_gen_config
WHERE
is_actual IS TRUE
ORDER BY
id;
'''
)
for i in range(len(df)):
create_kpi_of_dag(
owner=df.iloc[i].owner,
dag_id=df.iloc[i].dag_id,
pg_target_schema='public',
pg_target_table='fct_dm_kpi',
index_kpi=df.iloc[i].kpi_id,
pg_environment=df.iloc[i].pg_environment,
airflow_environment=df.iloc[i].airflow_environment,
long_description=df.iloc[i].long_description_md,
short_description=df.iloc[i].short_description_md,
start_date=df.iloc[i].start_date,
cron_expression=df.iloc[i].cron,
tags=df.iloc[i].tags,
sensors=df.iloc[i].sensors,
sql_query=df.iloc[i].sql_query,
)
generator_of_morning_kpi_dag_to_gp()
После добавления данного DAG мы сразу же видим результат – в нашем веб-интерфейсе появился DAG с ранее созданной метрикой:
Давайте запустим наш сгенерированный DAG и посмотрим на результат, но в начале отредактируем SQL-запрос под необходимый нам формат.
Изменение sql_query для DAG test_1
new_values = {
'kpi_id': 1,
'sql_query': '''
SELECT
('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
(random()*100)::int AS value
''',
}
# Вызов функции
scd_dim_kpi(
dict_kpi=new_values
)
Всё, DAG работает и что-то собирает. Давайте сделаем для примера ещё несколько DAG.
Я создал пять дополнительных DAG по следующему шаблону (для демонстрации):
Hidden text
# Пример использования
new_values = {
'kpi_id': 6,
'dag_id': 'test_6',
'metric_name_en': 'test_6',
'owner': 'korsak0v',
'start_date': '2000-01-01',
'cron': '15 14 1 * *',
'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
'sql_query': '''
SELECT
('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
(random()*100)::int AS value
''',
}
# Вызов функции
scd_dim_kpi(
dict_kpi=new_values
)
Всё работает корректно. В БД записи есть:
В Airflow DAG корректно отображаются и работают.
Давайте добавим ещё сенсоры для демонстрации. Я добавил новый DAG и если посмотреть на его граф, то мы можем увидеть EmptyOperator
– empty_0
. Сейчас это просто заглушка на случай если у нас нет сенсоров на другие DAG. При дагране он выполнится менее чем за секунду и наш пайплайн продолжит корректно работать.
Давайте добавим список сенсоров:
Добавление списка сенсоров для изменения графа DAG test_7
new_values = {
'kpi_id': 7,
'sensors': '''['test_1', 'test_2', 'test_3', 'test_4', 'test_5', 'test_6']''',
}
# Вызов функции
scd_dim_kpi(
dict_kpi=new_values
)
И получим такой граф. Теперь вместо нашей заглушки мы имеем список сенсоров, которые будут проверять отработали ли какие-то DAG или нет.
Ну, и в конце давайте посмотрим, что насобирали наши сгенерированные DAG:
В дальнейшем мы сможем сделать JOIN между таблицей фактов и справочником, чтобы получить информацию по каждой из метрик.
Заключение
Я постарался описать максимально подробно свой кейс. Возможно, он является узконаправленным, но тут я рассказал об очередном генераторе DAG.
Также я не отрицаю, что это не идеально, но и идеала не существует ;)
Мог какие-то моменты упустить/упростить/не описать подробно. За всем этим можете обращаться в комментарии.
Важные моменты, на которые стоит обратить внимание:
Данная реализация не генерирует файл, а генерирует только объекты, поэтому во всех DAG будет исходный код нашего DAG, в котором происходит вся генерация. Если вам нужны физически созданные DAG, то можете прочитать мою предыдущую статью.
Если некорректно передать какие-то атрибуты в функцию, то сломаются все DAG. Если к примеру вместо даты попадет какая-то строка, то произойдет исключение, функция не отработает и ни один DAG не будет создан. Но вы можете это улучшить при помощи обработки исключений и прочего.
В данной реализации
kpi_id
нужно подставлять самостоятельно. В моем варианте не реализован автоинкремент. Но как я говорил ранее этот продукт доработан и другая команда реализовала уже это на backend.Я показал самую простую реализацию данного инструмента. Вы можете настраивать его под себя, с обработкой каждого атрибута, использовать фреймворки, использовать самописные обработчики и прочие.
Можно сделать дополнительную нормализацию данных, сделать дополнительные справочники, привязать ORM-модели и реализовать сразу какой-то backend, чтобы не заполнять это руками.
В демонстрационном DAG реализована идемпотентность, которая позволяет грузить метрики снапшотами за каждый день. Вы можете это регулировать также под свои нужды.
-
Мой кейс решает определенную бизнес-задачу. Бизнес рад и я рад :)
Комментарии (7)
ivankudryavtsev
02.09.2023 03:50+3Более того, чем больше думаю о решении, тем больше мне кажется, что Вы не смогли именно смысл решения правильно читателю «продать».
Bessnov
02.09.2023 03:50+1Ожидалось почитать что-то новое про DAG, в итоге статья про работу с airflow :(
Taragolis
02.09.2023 03:50Лично мне как человеку, кто регулярно отвечает на вопросы произволительности Airflow, в данной статье явно не хватает указания следующих ссылок на документацию:
-
Optimizing DAG parsing delays during execution, эксперементальный функционал, но шанс что он останется значительно выше, чем у того, что MsSQL останется в качестве DB Backend для Airflow
Потому как если кто-то попробует повторить этот эксперимент, он может легко столкнуться с проблемой, что Airflow начнет заспамливать базу по интервалу сканирования DAGов. Поэтому еще оставлю ссылку на тюнинг планировщика Airflow
Ну и еще от себя добавляю, официальный Docker Compose файл для Airflow существует чисто для того, чтобы пользователь мог запустить и сказать "Вау! Это действительно работает!"
-
wifage
02.09.2023 03:50Я конечно не дата-инженер, хотя много работал с данными, но на первый взгляд все же проще засунуть данные в json и положить в какой нибудь аналог mongoDB. А дальше уже можно делать с данными все что угодно. Если я не прав, поясните почему.
ivankudryavtsev
Дорогой автор, я ни в коей мере не считаю себя экспертом, но генерировать код из базы, на мой взгляд ничем не лучше чем создавать DAG-и по конфигу, который, как минимум легко обрабатывать существующими инструментами разработки.
Объясните в чем смысл, пожалуйста?
А то Вы пишите, что думал-думал и придумал, а зачем весь изврат не пишите. В том-то и прелесть AirFlow и прочих управляшек заданиями, что они управляшки, а логику обработки данных в каждом задании можно кастомизировать… а если логика укладывается в общую канву, отличаясь запросами, то мне кажется и AirFlow может быть не нужен.
Может тут бы людям хватило просто ClickHouse и дашборда типа Tableau.
Суммирую. Мне кажется, Вам не удалось объяснить ценность и достоинства вашего подхода.
k0rsakov Автор
Добрый день.
Спасибо за ваш комментарий.
Отвечу сразу на два комментария.
На момент реализации данного решения нас это устраивало и сейчас устраивает, так как основной оркестратор в компании – это Airflow
Да, возможно, и изврат, но об этом также написано, что это решение моего бизнес-кейса. Возможно в дальнейшем он покажет свою несостоятельность и мы его изменим, когда увеличится список компетенций и/или технологий.
У меня не было задачи "продавать" данное решение. Оно не идеальное, но оно отвечает нашему запросу. Оно работает и приносит пользу – для нас это главное.
Возможно в будущем я сделаю часть "два", в которой раскритикую своё предложение и покажу что-то другое. На текущий момент – это описание моего опыта и бизнес-кейса.
ivankudryavtsev
Не стоит писать, если нет цели «продать» статью читателю, это просто трата и своего и чужого времени в духе «зафига дофига нафигачили». Ваш императив, приведший Вас к этому решению из статьи не ясен. Любая архитектура делается с учетом pros/cons и требований.
Если я спрошу «почему суп ложкой едят?». Вы мне ответите и кто угодно ответит - это общеизвестное знание, но Ваша архитектура не самоочевидна, а статья не отвечает на вопрос «почему и зачем она такая?».
Обычно, людям сначала интересно «зачем», а потом «как».