Привет, друзья!
Продолжаем нашу серию статей о создании BI-системы в компании Sminex. В первой части мы рассказали, что в качестве основного места хранения аналитических данных используется хранилище с якорной моделью. Она характеризуется высокой нормализацией и строгими правилами наименования объектов.
Строгие правила наименования лучше сразу прописать в соглашении об именовании объектов. В нашем случае мы использовали базовое соглашение [Anchor Modeling: Naming Convention] с небольшими доработками и упрощениями.
Однако, чтобы добиться высокой нормализации, каждый объект, независимо от его типа, нужно выделять в отдельную таблицу, прописывать для него DDL и ключи. Учитывая количество объектов, это превращается в огромную рутинную работу. При этом нельзя допускать ситуации «сейчас быстро сделаем, а потом поправим». Это идеальная область для автоматизации в условиях ограничения ресурсов.
Важно!
Мы не претендуем на абсолютную правильность реализации, оптимальность и соответствие нашего решения всем общепринятым правилам. Конкретно в нашей ситуации, с нашими ресурсами и объёмом задач от заказчиков, это решение удалось реализовать достаточно быстро, и оно начало приносить свои плоды. Мы будем рады услышать конструктивную критику по поводу решения. Самое полезное опубликуем в следующей статье!
Шаг 1. Как облегчить работу инженерам по выполнению поставок данных
Необходимо:
-
Придумать
велосипедменее затратный способ укладки данных в якорную модель по сравнению с ручным созданием и настройкой таблиц. Разработать способ автоматической генерации однотипных SQL-запросов, исключающий ошибки в наименовании объектов и в логике работы с данными.
Сохранять логи о совершенных действиях.
Шаг 2. ООП нам поможет
Для реализации задуманного мы разработали Python-модуль anchor_model.py
, в котором создали классы и наделили их необходимыми методами. Давайте познакомимся с ними поближе.
Класс AnchorModel
Этот класс является суперклассом. Все остальные классы наследуются от него. Он содержит три метода:
exists
— проверяет, существует ли уже такая таблица в DWH.get_description
— возвращает описание таблицы.log
— записывает в логи создание или наполнение таблицы якорной модели.
class AnchorModel:
class AnchorModel:
"""
Суперкласс для работы с таблицами якорной модели
Все остальные классы наследуются от него
Методы:
[exists] проверяет, существует ли уже такая таблица в DWH
[get_description] возвращает описание таблицы (комментарий)
[log] логирует обновление данных
"""
def __init__(self):
# Подключение к DWH
self.engine_dwh = get_engine('dwh')
# Название таблицы
self.table_name = None
# Источник данных
self.source = None
# Тип данных (задается через create или get_data_type)
self.data_type = None
# Описание (задается через create или get_description)
self.description = None
# Тип данных поля инкремента
self.increment = None
# Переменные для логирования
self.log_string = ''
self.log_rowcount = 0
self.is_success = False
self.exception = ''
def exists(self):
"""
Проверка на то, что такая таблица уже существует в DWH
"""
try:
query = f"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = '{self.table_name}'
);
"""
exists = self.engine_dwh.execute(query).scalar()
if exists:
logging.info(f"{self.log_string} Таблица [{self.table_name}] уже существует")
return True
else:
logging.info(f"{self.log_string} Таблицы [{self.table_name}] не существует")
return False
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
def get_description(self):
"""
Возвращает описание (комментарий к таблице)
"""
try:
query = f"""
SELECT d.description AS table_comment
FROM pg_class c
JOIN pg_description d ON c.oid = d.objoid
AND d.objsubid = 0 /* уровень 0 = комментарии к таблицам, не к колонкам) */
WHERE c.relname = '{self.table_name}'
"""
description = self.engine_dwh.execute(query).scalar()
if description is None:
raise Exception(f'Не удалось получить описание таблицы {self.table_name}, '
f'проверь название и что у нее есть комментарий')
else:
self.description = description
return self.description
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
def log(self):
"""
Логирует обновление данных
"""
try:
query = f"""
INSERT INTO _log (name, source, rows, is_success, exception)
VALUES ('{self.table_name}', '{self.source}', {self.log_rowcount}, {self.is_success}, '{self.exception}')
;"""
self.engine_dwh.execute(query)
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
Класс Anchor
Класс Anchor
наследуется от AnchorModel
. При инициализации необходимо задать обязательный параметр name
— имя якоря. В классе Anchor
предусмотрены следующие методы:
create_anchor
— создает таблицу якоря в DWH.update_anchor
— обновляет данные в якоре.get_anchor_data_type
— возвращает тип данных бизнес-ключа существующего якоря.get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
class Anchor(AnchorModel):
class Anchor(AnchorModel):
"""
Класс для работы с якорем
Атрибуты:
[name] название якоря. При инициализации задается без префикса, напр. tender или document
Методы:
[create_anchor] создает таблицу в DWH с нужным префиксом и триггером для генерации суррогатного ключа
[update_anchor] принимает название источника и SQL-запрос, обновляет данные в якоре
[get_anchor_data_type] возвращает тип данных бизнес-ключа существующего якоря (напр. uuid)
[get_max_anchor_bk] возвращает максимальное значение бизнес-ключа
"""
def __init__(self, name: str):
super().__init__()
# Название типа таблицы якорной модели
self.type = 'anchor'
# Название сущности
self.name = name
# Название таблицы = первые два символа названия + _ + название (напр. для tender - te_tender)
self.table_name = self.name[:2] + '_' + self.name
# Название временной таблицы
self.tmp_table_name = self.table_name + '_tmp'
# Название колонки суррогатного ключа = название + _ + id (напр. для tender - tender_id)
self.id_column_name = self.name + '_id'
# Название колонки бизнес ключа = название + _ + bk (напр. для tender - tender_bk)
self.bk_column_name = self.name + '_bk'
# Строка для отображения в логах консоли
self.log_string = f'[Anchor] [{self.name}] ::'
def create_anchor(self, data_type: str, description: str, increment: str = None):
"""
Функция для создания таблицы якоря в DWH
[data_type] тип данных якоря
[description] описание (комментарий к таблице)
[increment] тип данных инкремента. Если указан:
- заменяет колонку insert_ts и хранит в ней значения на момент обновления сущности
- при загрузке новых данных кроме ключа якоря в запросе также должно фигурировать поле инкремента
"""
try:
if self.exists():
return True
self.data_type = data_type
self.description = description
self.increment = increment
# Создаем таблицу якоря
if self.increment:
self.engine_dwh.execute(f"""
CREATE TABLE {self.table_name} (
{self.id_column_name} uuid PRIMARY KEY,
{self.bk_column_name} {self.data_type} NOT NULL UNIQUE,
increment {self.increment} NOT NULL
);
""")
else:
self.engine_dwh.execute(f"""
CREATE TABLE {self.table_name} (
{self.id_column_name} uuid PRIMARY KEY,
{self.bk_column_name} {self.data_type} NOT NULL UNIQUE,
insert_ts timestamp with time zone DEFAULT now()
);
""")
# Добавляем комментарий
self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")
# Создаем триггер для генерации суррогатного ключа при добавлении нового бизнес-ключа
self.engine_dwh.execute(f"""
CREATE OR REPLACE FUNCTION generate_{self.id_column_name}()
RETURNS TRIGGER AS $$
BEGIN
NEW.{self.id_column_name} := uuid_generate_v5(uuid_nil(), NEW.{self.bk_column_name}::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
# Триггер
self.engine_dwh.execute(f"""
CREATE TRIGGER {self.table_name}_trigger
BEFORE INSERT ON {self.table_name}
FOR EACH ROW
EXECUTE PROCEDURE generate_{self.id_column_name}();
""")
# Запись в реестр всех таблиц якорной модели.
# Из нее потом генерируется документация и схема хранилища.
self.engine_dwh.execute(f"""
DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)
VALUES ('{self.table_name}', 'anchor', '{self.name}', '{self.data_type}', '{self.name}')
;""")
logging.info(
f"{self.log_string} Таблица [{self.table_name}] создана с триггером на суррогатный ключ {self.id_column_name}")
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
def update_anchor(self, source: str, query: str, increment: bool = False):
"""
Загрузка данных якоря из источника
[source] название БД (как в Airflow Connections)
[query] запрос для выгрузки нужных данных
"""
try:
self.data_type = self.get_anchor_data_type()
self.description = self.get_description()
self.source = source
# Загрузка и очистка данных
raw_data = sql(db=source, query=query, description=self.description, as_df=True)
data = purify_data(data=raw_data)
# Если после очистки данных не осталось, то завершаем работу
# Логируем в блоке finally
if data.empty:
self.is_success = True
return True
# Начинаем загрузку данных в якорь
# Создаем временную таблицу
self.engine_dwh.execute(f"""
DROP TABLE IF EXISTS {self.tmp_table_name};""")
if increment:
self.engine_dwh.execute(f"""
CREATE TABLE {self.tmp_table_name} (
{self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY
);""")
else:
self.engine_dwh.execute(f"""
CREATE TABLE {self.tmp_table_name} (
{self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY
);""")
logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')
# Загрузка данных во временную таблицу
data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi',
chunksize=1000)
logging.info(f'Во временную таблицу загружено [{data.shape[0]:_}] строк')
# Наполнение якоря новыми данными
result = self.engine_dwh.execute(f"""
WITH new_values AS (
SELECT tmp.{self.bk_column_name}
FROM {self.tmp_table_name} tmp
LEFT JOIN {self.table_name} anchor ON anchor.{self.bk_column_name} = tmp.{self.bk_column_name}
WHERE anchor.{self.bk_column_name} IS NULL
)
INSERT INTO {self.table_name} ({self.bk_column_name})
SELECT {self.bk_column_name} FROM new_values
;""")
self.log_rowcount = result.rowcount
self.is_success = True
logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в якорь')
except Exception as e:
self.exception = str(e).replace('\'', '\'\'')
raise e
finally:
# Удаление временной таблицы
self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')
# Логирование
self.log()
# Закрытие подключения
self.engine_dwh.dispose()
def get_anchor_data_type(self):
"""
Возвращает тип данных бизнес-ключа якоря
"""
query = f"""
SELECT data_type
FROM information_schema.columns
WHERE table_name = '{self.table_name}'
AND column_name = '{self.bk_column_name}';
"""
data_type = self.engine_dwh.execute(query).scalar()
if data_type is None:
raise Exception('Не удалось получить тип данных бизнес-ключа якоря, проверь название якоря')
else:
self.data_type = data_type
return data_type
def get_max_anchor_bk(self):
"""
Возвращает максимальное значение бизнес-ключа якоря
"""
try:
query = f"SELECT MAX({self.bk_column_name}) FROM {self.table_name}"
max_bk = self.engine_dwh.execute(query).scalar()
return max_bk
except Exception as e:
raise e
finally:
self.engine_dwh.dispose()
Класс AnchorAttribute
Класс AnchorAttribute
наследуется от AnchorModel
. При инициализации необходимо задать значения обязательных параметров:
anchor_name
— имя якоря, которому принадлежит атрибут.name
— имя атрибута якоря без префиксов.is_historized
— признак исторического атрибута.
class AnchorAttribute(AnchorModel):
class AnchorAttribute(AnchorModel):
"""
Класс для работы с атрибутами якоря
Атрибуты:
[anchor_name] название якоря, к которому относится атрибут (напр. tender)
[name] название атрибута якоря без префиксов (напр. name)
[is_historized] признак исторического атрибута
(в таблицу добавляется поле "valid_from", а в датасете должна быть дата изменения атрибута)
Методы:
[create_attribute] создает таблицу в DWH с нужным названием (напр. te_nam_tender_name)
[update_attribute] принимает название источника и SQL-запрос, обновляет данные в атрибуте
[get_attribute_data_type] возвращает тип данных атрибута (напр. text)
[get_attribute_last_date] получение максимальной даты valid_from для итеративной загрузки исторических атрибутов
[get_max_anchor_bk] получение максимального ID бизнес-ключа якоря
"""
def __init__(self, anchor_name: str, name: str, is_historized: bool = False):
super().__init__()
# Название типа таблицы якорной модели
self.type = 'attr'
# Инициализация переменных атрибута
self.name = name
# Инициализация переменных якоря
self.anchor = Anchor(name=anchor_name)
self.anchor_name = self.anchor.name
self.anchor_data_type = self.anchor.get_anchor_data_type()
self.anchor_description = self.anchor.get_description()
# Признак исторического атрибута
self.is_historized = is_historized
# Название таблицы атрибута = первые 2 буквы названия якоря + _ + первые 3 буквы названия атрибута + _ +
# название якоря + _ + название атрибута
self.table_name = (self.anchor_name[:2] + '_' + self.name.replace('_', '')[:3]
+ '_' + self.anchor_name + '_' + self.name)
# Если атрибут исторический, то добавляем _his
if self.is_historized:
self.table_name += '_his'
# Название временной таблицы
self.tmp_table_name = self.table_name + '_tmp'
# Строка для отображения в логах
self.log_string = f'[AnchorAttribute] [{self.name}] ::'
def create_attribute(self, data_type: str, description: str):
self.data_type = data_type
self.description = description
try:
if self.exists():
return True
# Формируем DDL таблицы в зависимости от того, исторический атрибут или нет
if self.is_historized:
query_ddl = f"""
CREATE TABLE {self.table_name} (
{self.anchor.id_column_name} uuid NOT NULL,
{self.name} {self.data_type} NOT NULL,
valid_from timestamp NOT NULL,
valid_to timestamp,
insert_ts timestamp with time zone DEFAULT now(), /* UTC */
CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}, valid_from),
CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}
FOREIGN KEY ({self.anchor.id_column_name})
REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})
);
"""
else:
query_ddl = f"""
CREATE TABLE {self.table_name} (
{self.anchor.id_column_name} uuid NOT NULL,
{self.name} {self.data_type} NOT NULL,
insert_ts timestamp with time zone DEFAULT now(), /* UTC */
CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}),
CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}
FOREIGN KEY ({self.anchor.id_column_name})
REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})
);
"""
self.engine_dwh.execute(query_ddl)
# Добавляем комментарий
self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")
# Запись в таблицу со всеми таблицами якорной модели
self.engine_dwh.execute(f"""
DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name, is_historized)
VALUES ('{self.table_name}', 'attr', '{self.name}', '{self.data_type}', '{self.anchor_name}',
{self.is_historized});
""")
logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
def update_attribute(self, source: str, query: str, full_reload: bool = False):
"""
Загрузка данных атрибута якоря из источника
Пока обычные атрибуты заливаются путем TRUNCATE-INSERT,
а исторические атрибуты заливаются инкрементально
[source] название БД из функции get_engine
[query] SQL-запрос для выгрузки данных
[full_reload] если поставить True, то атрибут обновится через TRUNCATE
"""
try:
self.data_type = self.get_attribute_data_type()
self.description = self.get_description()
self.source = source
# Загрузка и очистка данных
raw_data = sql(db=source, query=query, description=self.description, as_df=True)
data = purify_data(raw_data)
# Если после очистки данных не осталось, то завершаем работу
if data.empty:
self.is_success = True
return True
# Создаем временную таблицу
if self.is_historized:
query_ddl_tmp = f"""
CREATE TABLE {self.tmp_table_name} (
{self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,
{self.name} {self.data_type} NOT NULL,
valid_from timestamp NOT NULL,
valid_to timestamp
);
"""
else:
query_ddl_tmp = f"""
CREATE TABLE {self.tmp_table_name} (
{self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,
{self.name} {self.data_type} NOT NULL
);
"""
self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
self.engine_dwh.execute(query_ddl_tmp)
logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')
# Загрузка данных во временную таблицу
data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append',
index=False, method='multi', chunksize=1_000)
logging.info(f'{self.log_string} Во временную таблицу загружено [{data.shape[0]:_}] строк')
# Добавление новых значений в якорь
result = self.engine_dwh.execute(f"""
WITH new_anchor_values AS (
SELECT tmp.{self.anchor.bk_column_name}
FROM {self.tmp_table_name} tmp
LEFT JOIN {self.anchor.table_name} anchor ON tmp.{self.anchor.bk_column_name} = anchor.{self.anchor.bk_column_name}
WHERE anchor.{self.anchor.bk_column_name} IS NULL
)
INSERT INTO {self.anchor.table_name} ({self.anchor.bk_column_name})
SELECT DISTINCT {self.anchor.bk_column_name} FROM new_anchor_values
;""")
logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')
# Добавление новых значений в атрибут
if full_reload:
self.engine_dwh.execute(f"TRUNCATE {self.table_name};")
# Если атрибут исторический, то добавляем новые значения
if self.is_historized:
result = self.engine_dwh.execute(f"""
MERGE INTO {self.table_name} AS target
USING {self.tmp_table_name} AS source
ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)
AND target.valid_from = source.valid_from
WHEN MATCHED THEN UPDATE
SET {self.name} = source.{self.name},
valid_from = source.valid_from
WHEN NOT MATCHED THEN INSERT
({self.anchor.id_column_name}, {self.name}, valid_from)
VALUES
(uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name}, source.valid_from)
;""")
# Если атрибут не исторический, то обновляем старые значения и записываем новые (если есть)
else:
result = self.engine_dwh.execute(f"""
MERGE INTO {self.table_name} AS target
USING {self.tmp_table_name} AS source
ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)
WHEN MATCHED THEN UPDATE
SET {self.name} = source.{self.name}
WHEN NOT MATCHED THEN INSERT
({self.anchor.id_column_name}, {self.name})
VALUES
(uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name})
;""")
self.is_success = True
self.log_rowcount = result.rowcount
logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружено в атрибут')
except Exception as e:
self.exception = str(e).replace('\'', '\'\'')
raise e
finally:
# Удаление временной таблицы
self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')
# Закрытие подключения
self.engine_dwh.dispose()
# Запись в лог
self.log()
def get_attribute_data_type(self):
"""
Возвращает тип данных атрибута
"""
if not self.exists():
raise Exception('Атрибута не существует, сначала надо выполнить create_attribute')
query = f"""
SELECT data_type
FROM information_schema.columns
WHERE table_name = '{self.table_name}'
AND column_name = '{self.name}';
"""
data_type = self.engine_dwh.execute(query).scalar()
if data_type is None:
raise Exception('Не удалось получить тип данных атрибута, проверь название атрибута')
else:
self.data_type = data_type
return data_type
def get_attribute_last_date(self):
"""
Функция для получения максимальной даты valid_from
для итеративной загрузки исторических атрибутов
"""
if self.is_historized:
query = f"SELECT MAX(valid_from) FROM {self.table_name}"
last_date = sql(db=self.engine_dwh, query=query,
description='Последняя записанная в атрибут дата', as_df=True).iloc[0][0]
if last_date is None:
last_date = datetime.now() - relativedelta(months=12)
return last_date
else:
raise Exception(f'Нельзя получить последнюю дату данных для неисторического атрибута [{self.name}]')
def get_max_anchor_bk(self):
"""
Получение максимального ID бизнес-ключа якоря.
Используется для инкрементальной загрузки данных некоторых сущностей
(например логов и транзакций, где есть инкрементальный id строки)
"""
try:
query = f"""
SELECT MAX(anchor.{self.anchor.bk_column_name})
FROM {self.anchor.table_name} anchor
JOIN {self.table_name} attr ON anchor.{self.anchor.id_column_name} = attr.{self.anchor.id_column_name}
"""
max_bk = self.engine_dwh.execute(query).scalar()
return max_bk
except Exception as e:
raise e
finally:
self.engine_dwh.dispose()
Особое внимание стоит уделить параметру is_historized
. Он определяет дальнейшую судьбу атрибута, а именно: нужно ли сохранять его историю изменения или нет.
Если атрибут не является историческим, мы всегда будем видеть только его актуальное состояние. При изменении его значения в системе-источнике актуальное значение будет заменять имеющееся в DWH.
Если атрибут является историческим, при создании у него появится дополнительное поле
valid_from
, и при изменении значения атрибута в системе-источнике в таблице атрибута в DWH появится новая строка с новым значениемvalid_from
.
Такой подход позволяет не засорять хранилище ненужными данными, а также сокращает количество операций и экономит ресурсы.
Методы класса AnchorAttribute
:
create_attribute
— создает таблицу атрибута в DWH.update_attribute
— обновляет данные в таблице атрибута.get_attribute_data_type
— возвращает тип данных атрибута.get_attribute_last_date
— возвращает максимальное значение поляvalid_from
для итеративной загрузки исторических атрибутов.get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
Класс Tie
Класс Tie
наследуется от AnchorModel
. При инициализации необходимо задать значения двух обязательных параметров — anchor1
и anchor2
— имена якорей, находящихся в связи.
Методы класса Tie
:
create_tie
— создает таблицу связи в DWH.update_tie
— обновляет данные в таблице связи.get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
class Tie(AnchorModel):
class Tie(AnchorModel):
"""
Класс для работы с таблицами связей между якорями
Атрибуты:
[anchor1_name] имя первого якоря
[anchor2_name] имя второго якоря
Методы:
[create_tie] создает таблицу в DWH для связи (напр. te_re_tender_request)
[update_tie] принимает название источника и SQL-запрос, обновляет данные в связи
[get_max_anchor_bk] получение максимального ID бизнес-ключа якоря
"""
def __init__(self, anchor1_name: str, anchor2_name: str):
super().__init__()
# Название типа таблицы якорной модели
self.type = 'tie'
# Создаем объекты якорей, чтобы из них получать нужные переменные
self.anchor1 = Anchor(name=anchor1_name)
self.anchor2 = Anchor(name=anchor2_name)
# Описание
self.description = f'Связь сущности "{self.anchor1.get_description()}" с сущностью "{self.anchor2.get_description()}"'
# Название таблицы связи = первые 2 буквы первого якоря + _ + первые 2 буквы второго якоря + _ +
# название первого якоря + _ + название второго якоря
self.table_name = anchor1_name[:2] + '_' + anchor2_name[:2] + '_' + anchor1_name + '_' + anchor2_name
# Название временной таблицы
self.tmp_table_name = self.table_name + '_tmp'
# Строка для отображения в логах
self.log_string = f'[Tie] :: [{self.table_name}]'
def create_tie(self):
"""
Создание таблицы связи якорей (если такой еще нет)
"""
try:
if self.exists():
return True
# Формируем DDL таблицы
query_ddl = f"""
CREATE TABLE {self.table_name} (
{self.anchor1.id_column_name} uuid NOT NULL,
{self.anchor2.id_column_name} uuid NOT NULL,
insert_ts timestamp with time zone DEFAULT now(),
CONSTRAINT fk_{self.table_name}_{self.anchor1.id_column_name}
FOREIGN KEY ({self.anchor1.id_column_name})
REFERENCES {self.anchor1.table_name} ({self.anchor1.id_column_name}),
CONSTRAINT fk_{self.table_name}_{self.anchor2.id_column_name}
FOREIGN KEY ({self.anchor2.id_column_name})
REFERENCES {self.anchor2.table_name} ({self.anchor2.id_column_name})
);
CREATE INDEX {self.table_name}_{self.anchor1.id_column_name}_index
ON {self.table_name} ({self.anchor1.id_column_name});
CREATE INDEX {self.table_name}_{self.anchor2.id_column_name}_index
ON {self.table_name} ({self.anchor2.id_column_name});
"""
self.engine_dwh.execute(query_ddl)
# Добавляем комментарий
self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")
# Запись в таблицу со всеми таблицами якорной модели
self.engine_dwh.execute(f"""
DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)
VALUES ('{self.table_name}', 'tie', '{self.anchor2.name}', NULL, '{self.anchor1.name}')
""")
logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")
except:
raise
finally:
# Закрытие подключения
self.engine_dwh.dispose()
def update_tie(self, source: str, query:str, full_reload: bool = False):
"""
Загрузка данных в таблицу связей (Tie)
Данные для таблицы связей мы получаем в виде бизнес-ключей двух якорей
"""
try:
self.create_tie()
self.source = source
# Загрузка и очистка данных
raw_data = sql(db=source, query=query, description=self.description, as_df=True)
data = purify_data(raw_data)
# Если после очистки данных не осталось, то завершаем работу
if data.empty:
self.is_success = True
return True
# Создаем временную таблицу
self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
self.engine_dwh.execute(f"""
CREATE TABLE {self.tmp_table_name} (
{self.anchor1.bk_column_name} {self.anchor1.get_anchor_data_type()} NOT NULL,
{self.anchor2.bk_column_name} {self.anchor2.get_anchor_data_type()} NOT NULL
);
""")
logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')
# Загрузка данных во временную таблицу
data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False,
method='multi', chunksize=1000)
logging.info(f'{self.log_string} [{data.shape[0]:_}] строк загружено во временную таблицу')
# Добавление новых значений в якорь
result = self.engine_dwh.execute(f"""
WITH new_anchor_values AS (
SELECT tmp.{self.anchor2.bk_column_name}
FROM {self.tmp_table_name} tmp
LEFT JOIN {self.anchor2.table_name} anchor ON tmp.{self.anchor2.bk_column_name} = anchor.{self.anchor2.bk_column_name}
WHERE anchor.{self.anchor2.bk_column_name} IS NULL
)
INSERT INTO {self.anchor2.table_name} ({self.anchor2.bk_column_name})
SELECT DISTINCT {self.anchor2.bk_column_name}
FROM new_anchor_values
;""")
logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')
# Наполнение новыми данными
# Если хотим полностью обновить связи, то делаем сначала TRUNCATE
if full_reload:
query = f"TRUNCATE TABLE {self.table_name};"
self.engine_dwh.execute(query)
query = f"""
WITH new_values AS (
SELECT anc1.{self.anchor1.id_column_name},
anc2.{self.anchor2.id_column_name}
FROM {self.tmp_table_name} tmp
JOIN {self.anchor1.table_name} anc1 ON tmp.{self.anchor1.bk_column_name} = anc1.{self.anchor1.bk_column_name}
JOIN {self.anchor2.table_name} anc2 ON tmp.{self.anchor2.bk_column_name} = anc2.{self.anchor2.bk_column_name}
LEFT JOIN {self.table_name} tie ON anc1.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}
AND anc2.{self.anchor2.id_column_name} = tie.{self.anchor2.id_column_name}
WHERE tie.{self.anchor1.id_column_name} IS NULL
OR tie.{self.anchor2.id_column_name} IS NULL
)
INSERT INTO {self.table_name} ({self.anchor1.id_column_name}, {self.anchor2.id_column_name})
SELECT {self.anchor1.id_column_name}, {self.anchor2.id_column_name}
FROM new_values;
"""
result = self.engine_dwh.execute(query)
self.is_success = True
self.log_rowcount = result.rowcount
logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в связь')
except Exception as e:
self.is_success = False
self.exception = str(e).replace('\'', '\'\'')
raise e
finally:
# Удаление временной таблицы
self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')
# Закрытие подключения
self.engine_dwh.dispose()
# Запись в лог
self.log()
def get_max_anchor_bk(self):
"""
Получение максимального ID бизнес-ключа якоря.
Используется для инкрементальной загрузки данных некоторых сущностей
(например логов и транзакций, где есть инкрементальный id строки)
"""
try:
query = f"""
SELECT MAX(anchor.{self.anchor1.bk_column_name})
FROM {self.anchor1.table_name} anchor
JOIN {self.table_name} tie ON anchor.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}
"""
max_bk = self.engine_dwh.execute(query).scalar()
return max_bk
except Exception as e:
raise e
finally:
self.engine_dwh.dispose()
Шаг 3. Пробуем применять
Итак, представим задачу. Нам нужно реализовать поставку данных из Jira, а конкретно объекта Sprint, для отчета, демонстрирующего минимальный, медианный и максимальный Lead Time у разных команд разработки, а также его динамику по месяцам.
Пункт 1. Импортируем все наши классы, прописываем аргументы DAG и начинаем создавать задачи. Сначала инициализируем якорь jira_sprint
, прописываем создание якоря, базовый запрос и обновление якоря.
Пример DAG (загрузка якоря)
from airflow.models import DAG
from airflow.decorators import task
from datetime import timedelta
from tools.smnx_data_tools import task_fail_alert
from tools.anchor_model import Anchor, AnchorAttribute, Tie
args = {
'owner': 'owner_name',
'start_date': '2024-09-04',
'depends_on_past': False,
'catchup': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'on_failure_callback': task_fail_alert
}
@task
def anc_jira_sprint():
query = """
SELECT "ID" AS jira_sprint_bk
FROM "AO_60DB71_SPRINT"
"""
anchor = Anchor(name='jira_sprint')
anchor.create_anchor(data_type='integer', description='Jira спринт')
anchor.update_anchor(source='jira_db', query=query)
Пункт 2. Определяем атрибуты спринта. В нашем случае это название, дата начала и дата завершения. Прописываем базовый запрос, инициализируем атрибуты, создаем и обновляем их. Повторяем для каждого атрибута якоря.
Пример DAG (загрузка атрибутов якоря)
@task
def attr_name():
query = """
SELECT "ID" AS jira_sprint_bk,
"NAME" AS name
FROM "AO_60DB71_SPRINT"
"""
anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='name')
anchor_attribute.create_attribute(data_type='text', description='Название')
anchor_attribute.update_attribute(source='jira_db', query=query)
@task
def attr_started():
query = """
SELECT "ID" AS jira_sprint_bk,
to_timestamp("START_DATE"/1000) AS started
FROM "AO_60DB71_SPRINT"
"""
anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='started')
anchor_attribute.create_attribute(data_type='timestamp with time zone', description='')
anchor_attribute.update_attribute(source='jira_db', query=query)
@task
def attr_completed():
query = """
SELECT "ID" AS jira_sprint_bk,
to_timestamp("COMPLETE_DATE"/1000) AS completed
FROM "AO_60DB71_SPRINT"
"""
anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='completed')
anchor_attribute.create_attribute(data_type='', description='')
anchor_attribute.update_attribute(source='jira_db', query=query)
Пункт 3. Определяем связи с другими якорями. В нашем случае jira_sprint
имеет связь с jira_issue
, так как нам важно понимать, к какому спринту относится задача. Прописываем базовый запрос, инициализируем связь, создаем и обновляем её.
Пример DAG (загрузка связи)
@task
def tie_jira_issue():
query = """
SELECT "ID" AS jira_sprint_bk,
i.id AS jira_issue_bk
FROM "AO_60DB71_SPRINT" s
JOIN customfieldvalue v ON v.customfield = '10101' /* sprint */
AND v.stringvalue = s."ID"::text
JOIN jiraissue i ON v.issue = i.id
"""
tie = Tie(anchor1_name='jira_sprint', anchor2_name='jira_issue')
tie.create_tie()
tie.update_tie(source='jira_db', query=query)
Пункт 4. Оборачиваем полученные задачи в DAG и ставим на регулярный запуск.
Пример DAG (определение свойств DAG, установление последовательности задач)
with DAG(dag_id='am_jira_sprint',
default_args=args,
schedule_interval=timedelta(hours=1),
tags=['AnchorModel'],
max_active_runs=1) as dag:
anc_jira_sprint() >> attr_name() >> attr_started() >> attr_completed() >> tie_jira_issue()
Шаг 4. Profit — подводим итоги
С помощью anchor_model.py
мы организовали поставку данных, необходимых для отчета, за пару часов и избежали ошибок в именах объектов. По сути, вся работа инженера свелась к определению имени якоря и его атрибутов, написанию нескольких строк кода и базовых запросов. Время разработки сокращено более чем на 50%, а количество ошибок уменьшилось более чем на 95% по сравнению с полностью ручной разработкой поставки данных.
Благодаря предсказуемости имен объектов в DWH, BI-разработчик может приступить к разработке витрин данных, необходимых для создания отчетов, сразу же после первой отработки DAG. Это, в свою очередь, ускоряет разработку отчета, и бизнес быстрее получает необходимые ему наглядные визуализации данных.
Подобная автоматизация позволяет нескольким инженерам выполнить несколько поставок данных в течение одного спринта и обеспечить работой BI-разработчиков на несколько спринтов вперед.
Для демонстрации работы мы выбрали небольшой якорь с тремя атрибутами и одной связью. Конечно, в реальной работе будут встречаться якоря с гораздо большим количеством атрибутов и связей, но принципиально это ничего не меняет.
Мы будем и дальше улучшать работу модуля anchor_model.py
, о чём обязательно расскажем в продолжении цикла статей о разработке BI-системы. Оставайтесь с нами, пишите своё мнение, советы и критику в комментариях. До скорых встреч!