Привет, друзья!

Продолжаем нашу серию статей о создании BI-системы в компании Sminex. В первой части мы рассказали, что в качестве основного места хранения аналитических данных используется хранилище с якорной моделью. Она характеризуется высокой нормализацией и строгими правилами наименования объектов.

Строгие правила наименования лучше сразу прописать в соглашении об именовании объектов. В нашем случае мы использовали базовое соглашение [Anchor Modeling: Naming Convention] с небольшими доработками и упрощениями.

Однако, чтобы добиться высокой нормализации, каждый объект, независимо от его типа, нужно выделять в отдельную таблицу, прописывать для него DDL и ключи. Учитывая количество объектов, это превращается в огромную рутинную работу. При этом нельзя допускать ситуации «сейчас быстро сделаем, а потом поправим». Это идеальная область для автоматизации в условиях ограничения ресурсов.

Важно!

Мы не претендуем на абсолютную правильность реализации, оптимальность и соответствие нашего решения всем общепринятым правилам. Конкретно в нашей ситуации, с нашими ресурсами и объёмом задач от заказчиков, это решение удалось реализовать достаточно быстро, и оно начало приносить свои плоды. Мы будем рады услышать конструктивную критику по поводу решения. Самое полезное опубликуем в следующей статье!

Шаг 1. Как облегчить работу инженерам по выполнению поставок данных

Необходимо:

  1. Придумать велосипед менее затратный способ укладки данных в якорную модель по сравнению с ручным созданием и настройкой таблиц.

  2. Разработать способ автоматической генерации однотипных SQL-запросов, исключающий ошибки в наименовании объектов и в логике работы с данными.

  3. Сохранять логи о совершенных действиях.

Шаг 2. ООП нам поможет

Для реализации задуманного мы разработали Python-модуль anchor_model.py, в котором создали классы и наделили их необходимыми методами. Давайте познакомимся с ними поближе.

Класс AnchorModel

Этот класс является суперклассом. Все остальные классы наследуются от него. Он содержит три метода:

  • exists — проверяет, существует ли уже такая таблица в DWH.

  • get_description — возвращает описание таблицы.

  • log — записывает в логи создание или наполнение таблицы якорной модели.

class AnchorModel:
class AnchorModel:
    """
    Суперкласс для работы с таблицами якорной модели
    Все остальные классы наследуются от него
 
    Методы:
    [exists] проверяет, существует ли уже такая таблица в DWH
    [get_description] возвращает описание таблицы (комментарий)
    [log] логирует обновление данных
    """

    def __init__(self):
        # Подключение к DWH
        self.engine_dwh = get_engine('dwh')
        # Название таблицы
        self.table_name = None
        # Источник данных
        self.source = None
        # Тип данных (задается через create или get_data_type)
        self.data_type = None
        # Описание (задается через create или get_description)
        self.description = None
        # Тип данных поля инкремента
        self.increment = None

        # Переменные для логирования
        self.log_string = ''
        self.log_rowcount = 0
        self.is_success = False
        self.exception = ''

    def exists(self):
        """
        Проверка на то, что такая таблица уже существует в DWH
        """
        try:
            query = f"""
                SELECT EXISTS (
                    SELECT 1
                    FROM information_schema.tables
                    WHERE table_name = '{self.table_name}'
                );
            """
            exists = self.engine_dwh.execute(query).scalar()

            if exists:
                logging.info(f"{self.log_string} Таблица [{self.table_name}] уже существует")
                return True
            else:
                logging.info(f"{self.log_string} Таблицы [{self.table_name}] не существует")
                return False

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

    def get_description(self):
        """
        Возвращает описание (комментарий к таблице)
        """
        try:
            query = f"""
                SELECT d.description AS table_comment
                FROM pg_class c
                JOIN pg_description d ON c.oid = d.objoid
                                     AND d.objsubid = 0 /* уровень 0 = комментарии к таблицам, не к колонкам) */
                WHERE c.relname = '{self.table_name}'
            """

            description = self.engine_dwh.execute(query).scalar()

            if description is None:
                raise Exception(f'Не удалось получить описание таблицы {self.table_name}, '
                                f'проверь название и что у нее есть комментарий')
            else:
                self.description = description
                return self.description

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

    def log(self):
        """
        Логирует обновление данных
        """
        try:
            query = f"""
                INSERT INTO _log (name, source, rows, is_success, exception)
                VALUES ('{self.table_name}', '{self.source}', {self.log_rowcount}, {self.is_success}, '{self.exception}')
            ;"""
            self.engine_dwh.execute(query)

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

Класс Anchor

Класс Anchor наследуется от AnchorModel. При инициализации необходимо задать обязательный параметр name — имя якоря. В классе Anchor предусмотрены следующие методы:

  • create_anchor — создает таблицу якоря в DWH.

  • update_anchor — обновляет данные в якоре.

  • get_anchor_data_type — возвращает тип данных бизнес-ключа существующего якоря.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

class Anchor(AnchorModel):
class Anchor(AnchorModel):
    """
    Класс для работы с якорем
 
    Атрибуты:
    [name] название якоря. При инициализации задается без префикса, напр. tender или document
 
    Методы:
    [create_anchor] создает таблицу в DWH с нужным префиксом и триггером для генерации суррогатного ключа
    [update_anchor] принимает название источника и SQL-запрос, обновляет данные в якоре
    [get_anchor_data_type] возвращает тип данных бизнес-ключа существующего якоря (напр. uuid)
    [get_max_anchor_bk] возвращает максимальное значение бизнес-ключа
    """

    def __init__(self, name: str):
        super().__init__()
        # Название типа таблицы якорной модели
        self.type = 'anchor'
        # Название сущности
        self.name = name
        # Название таблицы = первые два символа названия + _ + название (напр. для tender - te_tender)
        self.table_name = self.name[:2] + '_' + self.name
        # Название временной таблицы
        self.tmp_table_name = self.table_name + '_tmp'
        # Название колонки суррогатного ключа = название + _ + id (напр. для tender - tender_id)
        self.id_column_name = self.name + '_id'
        # Название колонки бизнес ключа = название + _ + bk (напр. для tender - tender_bk)
        self.bk_column_name = self.name + '_bk'
        # Строка для отображения в логах консоли
        self.log_string = f'[Anchor] [{self.name}] ::'

    def create_anchor(self, data_type: str, description: str, increment: str = None):
        """
        Функция для создания таблицы якоря в DWH
        [data_type] тип данных якоря
        [description] описание (комментарий к таблице)
        [increment] тип данных инкремента. Если указан:
                  - заменяет колонку insert_ts и хранит в ней значения на момент обновления сущности
                  - при загрузке новых данных кроме ключа якоря в запросе также должно фигурировать поле инкремента
        """

        try:

            if self.exists():
                return True

            self.data_type = data_type
            self.description = description
            self.increment = increment

            # Создаем таблицу якоря
            if self.increment:
                self.engine_dwh.execute(f"""
                    CREATE TABLE {self.table_name} (
                        {self.id_column_name} uuid PRIMARY KEY,
                        {self.bk_column_name} {self.data_type} NOT NULL UNIQUE,
                        increment {self.increment} NOT NULL
                    );
                """)
            else:
                self.engine_dwh.execute(f"""
                    CREATE TABLE {self.table_name} (
                        {self.id_column_name} uuid PRIMARY KEY,
                        {self.bk_column_name} {self.data_type} NOT NULL UNIQUE,
                        insert_ts timestamp with time zone DEFAULT now()
                    );
                """)

            # Добавляем комментарий
            self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")

            # Создаем триггер для генерации суррогатного ключа при добавлении нового бизнес-ключа
            self.engine_dwh.execute(f"""
                CREATE OR REPLACE FUNCTION generate_{self.id_column_name}()
                RETURNS TRIGGER AS $$
                BEGIN
                    NEW.{self.id_column_name} := uuid_generate_v5(uuid_nil(), NEW.{self.bk_column_name}::text);
                    RETURN NEW;
                END;
                $$ LANGUAGE plpgsql;
            """)

            # Триггер
            self.engine_dwh.execute(f"""
                CREATE TRIGGER {self.table_name}_trigger
                BEFORE INSERT ON {self.table_name}
                FOR EACH ROW
                EXECUTE PROCEDURE generate_{self.id_column_name}();
            """)

            # Запись в реестр всех таблиц якорной модели.
            # Из нее потом генерируется документация и схема хранилища.
            self.engine_dwh.execute(f"""
                DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
                INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)
                VALUES ('{self.table_name}', 'anchor', '{self.name}', '{self.data_type}', '{self.name}')
            ;""")

            logging.info(
                f"{self.log_string} Таблица [{self.table_name}] создана с триггером на суррогатный ключ {self.id_column_name}")

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

    def update_anchor(self, source: str, query: str, increment: bool = False):
        """
        Загрузка данных якоря из источника
        [source] название БД (как в Airflow Connections)
        [query] запрос для выгрузки нужных данных
        """
        try:
            self.data_type = self.get_anchor_data_type()
            self.description = self.get_description()
            self.source = source

            # Загрузка и очистка данных
            raw_data = sql(db=source, query=query, description=self.description, as_df=True)
            data = purify_data(data=raw_data)

            # Если после очистки данных не осталось, то завершаем работу
            # Логируем в блоке finally
            if data.empty:
                self.is_success = True
                return True

            # Начинаем загрузку данных в якорь

            # Создаем временную таблицу
            self.engine_dwh.execute(f"""
                DROP TABLE IF EXISTS {self.tmp_table_name};""")

            if increment:
                self.engine_dwh.execute(f"""
                    CREATE TABLE {self.tmp_table_name} (
                        {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY
                    );""")
            else:
                self.engine_dwh.execute(f"""
                    CREATE TABLE {self.tmp_table_name} (
                        {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY
                    );""")
            logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')

            # Загрузка данных во временную таблицу
            data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi',
                        chunksize=1000)
            logging.info(f'Во временную таблицу загружено [{data.shape[0]:_}] строк')

            # Наполнение якоря новыми данными
            result = self.engine_dwh.execute(f"""
                WITH new_values AS (
                    SELECT tmp.{self.bk_column_name}
                    FROM {self.tmp_table_name} tmp
                    LEFT JOIN {self.table_name} anchor ON anchor.{self.bk_column_name} = tmp.{self.bk_column_name}
                    WHERE anchor.{self.bk_column_name} IS NULL
                )
                INSERT INTO {self.table_name} ({self.bk_column_name})
                SELECT {self.bk_column_name} FROM new_values
            ;""")
            self.log_rowcount = result.rowcount
            self.is_success = True
            logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в якорь')

        except Exception as e:
            self.exception = str(e).replace('\'', '\'\'')
            raise e

        finally:
            # Удаление временной таблицы
            self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
            logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')

            # Логирование
            self.log()

            # Закрытие подключения
            self.engine_dwh.dispose()

    def get_anchor_data_type(self):
        """
        Возвращает тип данных бизнес-ключа якоря
        """

        query = f"""
            SELECT data_type
            FROM information_schema.columns
            WHERE table_name = '{self.table_name}'
              AND column_name = '{self.bk_column_name}';
        """

        data_type = self.engine_dwh.execute(query).scalar()

        if data_type is None:
            raise Exception('Не удалось получить тип данных бизнес-ключа якоря, проверь название якоря')
        else:
            self.data_type = data_type
            return data_type

    def get_max_anchor_bk(self):
        """
        Возвращает максимальное значение бизнес-ключа якоря
        """
        try:
            query = f"SELECT MAX({self.bk_column_name}) FROM {self.table_name}"
            max_bk = self.engine_dwh.execute(query).scalar()
            return max_bk
        except Exception as e:
            raise e
        finally:
            self.engine_dwh.dispose()

Класс AnchorAttribute

Класс AnchorAttribute наследуется от AnchorModel. При инициализации необходимо задать значения обязательных параметров:

  • anchor_name — имя якоря, которому принадлежит атрибут.

  • name — имя атрибута якоря без префиксов.

  • is_historized — признак исторического атрибута.

class AnchorAttribute(AnchorModel):
class AnchorAttribute(AnchorModel):
    """
    Класс для работы с атрибутами якоря
 
    Атрибуты:
    [anchor_name] название якоря, к которому относится атрибут (напр. tender)
    [name] название атрибута якоря без префиксов (напр. name)
    [is_historized] признак исторического атрибута
       (в таблицу добавляется поле "valid_from", а в датасете должна быть дата изменения атрибута)
 
    Методы:
    [create_attribute] создает таблицу в DWH с нужным названием (напр. te_nam_tender_name)
    [update_attribute] принимает название источника и SQL-запрос, обновляет данные в атрибуте
    [get_attribute_data_type] возвращает тип данных атрибута (напр. text)
    [get_attribute_last_date] получение максимальной даты valid_from для итеративной загрузки исторических атрибутов
    [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря
    """

    def __init__(self, anchor_name: str, name: str, is_historized: bool = False):
        super().__init__()

        # Название типа таблицы якорной модели
        self.type = 'attr'

        # Инициализация переменных атрибута
        self.name = name

        # Инициализация переменных якоря
        self.anchor = Anchor(name=anchor_name)
        self.anchor_name = self.anchor.name
        self.anchor_data_type = self.anchor.get_anchor_data_type()
        self.anchor_description = self.anchor.get_description()

        # Признак исторического атрибута
        self.is_historized = is_historized

        # Название таблицы атрибута = первые 2 буквы названия якоря + _ + первые 3 буквы названия атрибута + _ +
        # название якоря + _ + название атрибута
        self.table_name = (self.anchor_name[:2] + '_' + self.name.replace('_', '')[:3]
                           + '_' + self.anchor_name + '_' + self.name)
        # Если атрибут исторический, то добавляем _his
        if self.is_historized:
            self.table_name += '_his'
        # Название временной таблицы
        self.tmp_table_name = self.table_name + '_tmp'

        # Строка для отображения в логах
        self.log_string = f'[AnchorAttribute] [{self.name}] ::'

    def create_attribute(self, data_type: str, description: str):

        self.data_type = data_type
        self.description = description

        try:

            if self.exists():
                return True

            # Формируем DDL таблицы в зависимости от того, исторический атрибут или нет
            if self.is_historized:
                query_ddl = f"""
                    CREATE TABLE {self.table_name} (
                        {self.anchor.id_column_name} uuid NOT NULL,
                        {self.name} {self.data_type} NOT NULL,
                        valid_from timestamp NOT NULL,
                        valid_to timestamp,
                        insert_ts timestamp with time zone DEFAULT now(), /* UTC */
 
                        CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}, valid_from),
                        CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}
                        FOREIGN KEY ({self.anchor.id_column_name})
                        REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})
                    );
                """

            else:
                query_ddl = f"""
                    CREATE TABLE {self.table_name} (
                        {self.anchor.id_column_name} uuid NOT NULL,
                        {self.name} {self.data_type} NOT NULL,
                        insert_ts timestamp with time zone DEFAULT now(), /* UTC */
 
                        CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}),
                        CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}
                        FOREIGN KEY ({self.anchor.id_column_name})
                        REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})
                    );
                """

            self.engine_dwh.execute(query_ddl)

            # Добавляем комментарий
            self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")

            # Запись в таблицу со всеми таблицами якорной модели
            self.engine_dwh.execute(f"""
                DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
                INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name, is_historized)
                VALUES ('{self.table_name}', 'attr', '{self.name}', '{self.data_type}', '{self.anchor_name}',
                        {self.is_historized});
            """)

            logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

    def update_attribute(self, source: str, query: str, full_reload: bool = False):
        """
        Загрузка данных атрибута якоря из источника
        Пока обычные атрибуты заливаются путем TRUNCATE-INSERT,
        а исторические атрибуты заливаются инкрементально
 
        [source] название БД из функции get_engine
        [query] SQL-запрос для выгрузки данных
        [full_reload] если поставить True, то атрибут обновится через TRUNCATE
        """

        try:
            self.data_type = self.get_attribute_data_type()
            self.description = self.get_description()
            self.source = source

            # Загрузка и очистка данных
            raw_data = sql(db=source, query=query, description=self.description, as_df=True)
            data = purify_data(raw_data)

            # Если после очистки данных не осталось, то завершаем работу
            if data.empty:
                self.is_success = True
                return True

            # Создаем временную таблицу
            if self.is_historized:
                query_ddl_tmp = f"""
                    CREATE TABLE {self.tmp_table_name} (
                        {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,
                        {self.name} {self.data_type} NOT NULL,
                        valid_from timestamp NOT NULL,
                        valid_to timestamp
                    );
                """
            else:
                query_ddl_tmp = f"""
                    CREATE TABLE {self.tmp_table_name} (
                        {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,
                        {self.name} {self.data_type} NOT NULL
                    );
                """

            self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
            self.engine_dwh.execute(query_ddl_tmp)
            logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')

            # Загрузка данных во временную таблицу
            data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append',
                        index=False, method='multi', chunksize=1_000)
            logging.info(f'{self.log_string} Во временную таблицу загружено [{data.shape[0]:_}] строк')

            # Добавление новых значений в якорь
            result = self.engine_dwh.execute(f"""
                WITH new_anchor_values AS (
                    SELECT tmp.{self.anchor.bk_column_name}
                    FROM {self.tmp_table_name} tmp
                    LEFT JOIN {self.anchor.table_name} anchor ON tmp.{self.anchor.bk_column_name} = anchor.{self.anchor.bk_column_name}
                    WHERE anchor.{self.anchor.bk_column_name} IS NULL
                )
                INSERT INTO {self.anchor.table_name} ({self.anchor.bk_column_name})
                SELECT DISTINCT {self.anchor.bk_column_name} FROM new_anchor_values
            ;""")
            logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')

            # Добавление новых значений в атрибут
            if full_reload:
                self.engine_dwh.execute(f"TRUNCATE {self.table_name};")

            # Если атрибут исторический, то добавляем новые значения
            if self.is_historized:
                result = self.engine_dwh.execute(f"""
                    MERGE INTO {self.table_name} AS target
                    USING {self.tmp_table_name}  AS source
                    ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)
                    AND target.valid_from = source.valid_from
 
                    WHEN MATCHED THEN UPDATE
                    SET {self.name} = source.{self.name},
                        valid_from = source.valid_from
 
                    WHEN NOT MATCHED THEN INSERT
                    ({self.anchor.id_column_name}, {self.name}, valid_from)
                    VALUES
                    (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name}, source.valid_from)
                ;""")
            # Если атрибут не исторический, то обновляем старые значения и записываем новые (если есть)
            else:
                result = self.engine_dwh.execute(f"""
                    MERGE INTO {self.table_name} AS target
                    USING {self.tmp_table_name}  AS source
                    ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)
 
                    WHEN MATCHED THEN UPDATE
                    SET {self.name} = source.{self.name}
 
                    WHEN NOT MATCHED THEN INSERT
                    ({self.anchor.id_column_name}, {self.name})
                    VALUES
                    (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name})
                ;""")

            self.is_success = True
            self.log_rowcount = result.rowcount
            logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружено в атрибут')

        except Exception as e:
            self.exception = str(e).replace('\'', '\'\'')
            raise e

        finally:
            # Удаление временной таблицы
            self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
            logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')

            # Закрытие подключения
            self.engine_dwh.dispose()

            # Запись в лог
            self.log()

    def get_attribute_data_type(self):
        """
        Возвращает тип данных атрибута
        """

        if not self.exists():
            raise Exception('Атрибута не существует, сначала надо выполнить create_attribute')

        query = f"""
            SELECT data_type
            FROM information_schema.columns
            WHERE table_name = '{self.table_name}'
              AND column_name = '{self.name}';
        """

        data_type = self.engine_dwh.execute(query).scalar()

        if data_type is None:
            raise Exception('Не удалось получить тип данных атрибута, проверь название атрибута')
        else:
            self.data_type = data_type
            return data_type

    def get_attribute_last_date(self):
        """
        Функция для получения максимальной даты valid_from
        для итеративной загрузки исторических атрибутов
        """
        if self.is_historized:
            query = f"SELECT MAX(valid_from) FROM {self.table_name}"
            last_date = sql(db=self.engine_dwh, query=query,
                            description='Последняя записанная в атрибут дата', as_df=True).iloc[0][0]
            if last_date is None:
                last_date = datetime.now() - relativedelta(months=12)
            return last_date
        else:
            raise Exception(f'Нельзя получить последнюю дату данных для неисторического атрибута [{self.name}]')

    def get_max_anchor_bk(self):
        """
        Получение максимального ID бизнес-ключа якоря.
        Используется для инкрементальной загрузки данных некоторых сущностей
        (например логов и транзакций, где есть инкрементальный id строки)
        """
        try:
            query = f"""
                SELECT MAX(anchor.{self.anchor.bk_column_name})
                FROM {self.anchor.table_name} anchor
                JOIN {self.table_name} attr ON anchor.{self.anchor.id_column_name} = attr.{self.anchor.id_column_name}
            """
            max_bk = self.engine_dwh.execute(query).scalar()
            return max_bk
        except Exception as e:
            raise e
        finally:
            self.engine_dwh.dispose()

Особое внимание стоит уделить параметру is_historized. Он определяет дальнейшую судьбу атрибута, а именно: нужно ли сохранять его историю изменения или нет.

  • Если атрибут не является историческим, мы всегда будем видеть только его актуальное состояние. При изменении его значения в системе-источнике актуальное значение будет заменять имеющееся в DWH.

  • Если атрибут является историческим, при создании у него появится дополнительное поле valid_from, и при изменении значения атрибута в системе-источнике в таблице атрибута в DWH появится новая строка с новым значением valid_from.

Такой подход позволяет не засорять хранилище ненужными данными, а также сокращает количество операций и экономит ресурсы.

Методы класса AnchorAttribute:

  • create_attribute — создает таблицу атрибута в DWH.

  • update_attribute — обновляет данные в таблице атрибута.

  • get_attribute_data_type — возвращает тип данных атрибута.

  • get_attribute_last_date — возвращает максимальное значение поля valid_from для итеративной загрузки исторических атрибутов.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

Класс Tie

Класс Tie наследуется от AnchorModel. При инициализации необходимо задать значения двух обязательных параметров — anchor1 и anchor2 — имена якорей, находящихся в связи.

Методы класса Tie:

  • create_tie — создает таблицу связи в DWH.

  • update_tie — обновляет данные в таблице связи.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

class Tie(AnchorModel):
class Tie(AnchorModel):
    """
    Класс для работы с таблицами связей между якорями
 
    Атрибуты:
    [anchor1_name] имя первого якоря
    [anchor2_name] имя второго якоря
 
    Методы:
    [create_tie] создает таблицу в DWH для связи (напр. te_re_tender_request)
    [update_tie] принимает название источника и SQL-запрос, обновляет данные в связи
    [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря
    """

    def __init__(self, anchor1_name: str, anchor2_name: str):
        super().__init__()

        # Название типа таблицы якорной модели
        self.type = 'tie'

        # Создаем объекты якорей, чтобы из них получать нужные переменные
        self.anchor1 = Anchor(name=anchor1_name)
        self.anchor2 = Anchor(name=anchor2_name)

        # Описание
        self.description = f'Связь сущности "{self.anchor1.get_description()}" с сущностью "{self.anchor2.get_description()}"'

        # Название таблицы связи = первые 2 буквы первого якоря + _ + первые 2 буквы второго якоря + _ +
        # название первого якоря + _ + название второго якоря
        self.table_name = anchor1_name[:2] + '_' + anchor2_name[:2] + '_' + anchor1_name + '_' + anchor2_name
        # Название временной таблицы
        self.tmp_table_name = self.table_name + '_tmp'

        # Строка для отображения в логах
        self.log_string = f'[Tie] :: [{self.table_name}]'

    def create_tie(self):
        """
        Создание таблицы связи якорей (если такой еще нет)
        """

        try:

            if self.exists():
                return True

            # Формируем DDL таблицы
            query_ddl = f"""
                CREATE TABLE {self.table_name} (
                    {self.anchor1.id_column_name} uuid NOT NULL,
                    {self.anchor2.id_column_name} uuid NOT NULL,
                    insert_ts timestamp with time zone DEFAULT now(),
 
                    CONSTRAINT fk_{self.table_name}_{self.anchor1.id_column_name}
                    FOREIGN KEY ({self.anchor1.id_column_name})
                    REFERENCES {self.anchor1.table_name} ({self.anchor1.id_column_name}),
 
                    CONSTRAINT fk_{self.table_name}_{self.anchor2.id_column_name}
                    FOREIGN KEY ({self.anchor2.id_column_name})
                    REFERENCES {self.anchor2.table_name} ({self.anchor2.id_column_name})
                );
                CREATE INDEX {self.table_name}_{self.anchor1.id_column_name}_index
                  ON {self.table_name} ({self.anchor1.id_column_name});
                CREATE INDEX {self.table_name}_{self.anchor2.id_column_name}_index
                  ON {self.table_name} ({self.anchor2.id_column_name});
            """
            self.engine_dwh.execute(query_ddl)

            # Добавляем комментарий
            self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")

            # Запись в таблицу со всеми таблицами якорной модели
            self.engine_dwh.execute(f"""
                DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';
                INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)
                VALUES ('{self.table_name}', 'tie', '{self.anchor2.name}', NULL, '{self.anchor1.name}')
            """)

            logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")

        except:
            raise

        finally:
            # Закрытие подключения
            self.engine_dwh.dispose()

    def update_tie(self, source: str, query:str, full_reload: bool = False):
        """
        Загрузка данных в таблицу связей (Tie)
        Данные для таблицы связей мы получаем в виде бизнес-ключей двух якорей
        """
        try:
            self.create_tie()
            self.source = source

            # Загрузка и очистка данных
            raw_data = sql(db=source, query=query, description=self.description, as_df=True)
            data = purify_data(raw_data)

            # Если после очистки данных не осталось, то завершаем работу
            if data.empty:
                self.is_success = True
                return True

            # Создаем временную таблицу
            self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
            self.engine_dwh.execute(f"""
                CREATE TABLE {self.tmp_table_name} (
                    {self.anchor1.bk_column_name} {self.anchor1.get_anchor_data_type()} NOT NULL,
                    {self.anchor2.bk_column_name} {self.anchor2.get_anchor_data_type()} NOT NULL
                );
            """)
            logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')

            # Загрузка данных во временную таблицу
            data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False,
                        method='multi', chunksize=1000)
            logging.info(f'{self.log_string} [{data.shape[0]:_}] строк загружено во временную таблицу')

            # Добавление новых значений в якорь
            result = self.engine_dwh.execute(f"""
                WITH new_anchor_values AS (
                    SELECT tmp.{self.anchor2.bk_column_name}
                    FROM {self.tmp_table_name} tmp
                    LEFT JOIN {self.anchor2.table_name} anchor ON tmp.{self.anchor2.bk_column_name} = anchor.{self.anchor2.bk_column_name}
                    WHERE anchor.{self.anchor2.bk_column_name} IS NULL
                )
                INSERT INTO {self.anchor2.table_name} ({self.anchor2.bk_column_name})
                SELECT DISTINCT {self.anchor2.bk_column_name}
                FROM new_anchor_values
            ;""")
            logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')

            # Наполнение новыми данными

            # Если хотим полностью обновить связи, то делаем сначала TRUNCATE
            if full_reload:
                query = f"TRUNCATE TABLE {self.table_name};"
                self.engine_dwh.execute(query)

            query = f"""
                WITH new_values AS (
                    SELECT anc1.{self.anchor1.id_column_name},
                           anc2.{self.anchor2.id_column_name}
                    FROM {self.tmp_table_name} tmp
                    JOIN {self.anchor1.table_name} anc1 ON tmp.{self.anchor1.bk_column_name} = anc1.{self.anchor1.bk_column_name}
                    JOIN {self.anchor2.table_name} anc2 ON tmp.{self.anchor2.bk_column_name} = anc2.{self.anchor2.bk_column_name}
                    LEFT JOIN {self.table_name}     tie ON anc1.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}
                                                       AND anc2.{self.anchor2.id_column_name} = tie.{self.anchor2.id_column_name}
                    WHERE tie.{self.anchor1.id_column_name} IS NULL
                       OR tie.{self.anchor2.id_column_name} IS NULL
                )
                INSERT INTO {self.table_name} ({self.anchor1.id_column_name}, {self.anchor2.id_column_name})
                SELECT {self.anchor1.id_column_name}, {self.anchor2.id_column_name}
                FROM new_values;
            """
            result = self.engine_dwh.execute(query)
            self.is_success = True
            self.log_rowcount = result.rowcount
            logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в связь')

        except Exception as e:
            self.is_success = False
            self.exception = str(e).replace('\'', '\'\'')
            raise e

        finally:
            # Удаление временной таблицы
            self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")
            logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')

            # Закрытие подключения
            self.engine_dwh.dispose()

            # Запись в лог
            self.log()

    def get_max_anchor_bk(self):
        """
        Получение максимального ID бизнес-ключа якоря.
        Используется для инкрементальной загрузки данных некоторых сущностей
        (например логов и транзакций, где есть инкрементальный id строки)
        """
        try:
            query = f"""
                SELECT MAX(anchor.{self.anchor1.bk_column_name})
                FROM {self.anchor1.table_name} anchor
                JOIN {self.table_name} tie ON anchor.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}
            """
            max_bk = self.engine_dwh.execute(query).scalar()
            return max_bk
        except Exception as e:
            raise e
        finally:
            self.engine_dwh.dispose()

Шаг 3. Пробуем применять

Итак, представим задачу. Нам нужно реализовать поставку данных из Jira, а конкретно объекта Sprint, для отчета, демонстрирующего минимальный, медианный и максимальный Lead Time у разных команд разработки, а также его динамику по месяцам.

Пункт 1. Импортируем все наши классы, прописываем аргументы DAG и начинаем создавать задачи. Сначала инициализируем якорь jira_sprint, прописываем создание якоря, базовый запрос и обновление якоря.

Пример DAG (загрузка якоря)
from airflow.models import DAG
from airflow.decorators import task
from datetime import timedelta
 
from tools.smnx_data_tools import task_fail_alert
from tools.anchor_model import Anchor, AnchorAttribute, Tie
 
args = {
    'owner': 'owner_name',
    'start_date': '2024-09-04',
    'depends_on_past': False,
    'catchup': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'on_failure_callback': task_fail_alert
}
 
 
@task
def anc_jira_sprint():
    query = """
        SELECT "ID" AS jira_sprint_bk
        FROM "AO_60DB71_SPRINT"
    """

    anchor = Anchor(name='jira_sprint')
    anchor.create_anchor(data_type='integer', description='Jira спринт')
    anchor.update_anchor(source='jira_db', query=query)

Пункт 2. Определяем атрибуты спринта. В нашем случае это название, дата начала и дата завершения. Прописываем базовый запрос, инициализируем атрибуты, создаем и обновляем их. Повторяем для каждого атрибута якоря.

Пример DAG (загрузка атрибутов якоря)
@task
def attr_name():
    query = """
        SELECT "ID"   AS jira_sprint_bk,
               "NAME" AS name
        FROM "AO_60DB71_SPRINT"
    """

    anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='name')
    anchor_attribute.create_attribute(data_type='text', description='Название')
    anchor_attribute.update_attribute(source='jira_db', query=query)
 
 
@task
def attr_started():
    query = """
        SELECT "ID"                            AS jira_sprint_bk,
               to_timestamp("START_DATE"/1000) AS started
        FROM "AO_60DB71_SPRINT"
    """

    anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='started')
    anchor_attribute.create_attribute(data_type='timestamp with time zone', description='')
    anchor_attribute.update_attribute(source='jira_db', query=query)
 
 
@task
def attr_completed():
    query = """
        SELECT "ID"                               AS jira_sprint_bk,
               to_timestamp("COMPLETE_DATE"/1000) AS completed
        FROM "AO_60DB71_SPRINT"
    """

    anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='completed')
    anchor_attribute.create_attribute(data_type='', description='')
    anchor_attribute.update_attribute(source='jira_db', query=query)

Пункт 3. Определяем связи с другими якорями. В нашем случае jira_sprint имеет связь с jira_issue, так как нам важно понимать, к какому спринту относится задача. Прописываем базовый запрос, инициализируем связь, создаем и обновляем её.

Пример DAG (загрузка связи)
@task
def tie_jira_issue():
    query = """
        SELECT "ID" AS jira_sprint_bk,
               i.id AS jira_issue_bk
        FROM "AO_60DB71_SPRINT" s
        JOIN customfieldvalue v ON v.customfield = '10101' /* sprint */
                               AND v.stringvalue = s."ID"::text
        JOIN jiraissue        i ON v.issue = i.id
    """

    tie = Tie(anchor1_name='jira_sprint', anchor2_name='jira_issue')
    tie.create_tie()
    tie.update_tie(source='jira_db', query=query)

Пункт 4. Оборачиваем полученные задачи в DAG и ставим на регулярный запуск.

Пример DAG (определение свойств DAG, установление последовательности задач)
with DAG(dag_id='am_jira_sprint',
         default_args=args,
         schedule_interval=timedelta(hours=1),
         tags=['AnchorModel'],
         max_active_runs=1) as dag:
     
    anc_jira_sprint() >> attr_name() >> attr_started() >> attr_completed() >> tie_jira_issue()

Шаг 4. Profit — подводим итоги

С помощью anchor_model.py мы организовали поставку данных, необходимых для отчета, за пару часов и избежали ошибок в именах объектов. По сути, вся работа инженера свелась к определению имени якоря и его атрибутов, написанию нескольких строк кода и базовых запросов. Время разработки сокращено более чем на 50%, а количество ошибок уменьшилось более чем на 95% по сравнению с полностью ручной разработкой поставки данных.

Благодаря предсказуемости имен объектов в DWH, BI-разработчик может приступить к разработке витрин данных, необходимых для создания отчетов, сразу же после первой отработки DAG. Это, в свою очередь, ускоряет разработку отчета, и бизнес быстрее получает необходимые ему наглядные визуализации данных.

Подобная автоматизация позволяет нескольким инженерам выполнить несколько поставок данных в течение одного спринта и обеспечить работой BI-разработчиков на несколько спринтов вперед.

Для демонстрации работы мы выбрали небольшой якорь с тремя атрибутами и одной связью. Конечно, в реальной работе будут встречаться якоря с гораздо большим количеством атрибутов и связей, но принципиально это ничего не меняет.

Мы будем и дальше улучшать работу модуля anchor_model.py, о чём обязательно расскажем в продолжении цикла статей о разработке BI-системы. Оставайтесь с нами, пишите своё мнение, советы и критику в комментариях. До скорых встреч!

Комментарии (0)