Приветствую! Меня зовут Жумабаев Султан, и в ПГК я работаю инженером данных на проекте «Цифровой вагон». Могу уверенно сказать, Oracle сегодня — одно из самых популярных и надежных хранилищ, хотя рынок и предлагает множество новых современных разработок. В этой статье я расскажу про использование Materialized Views для организации ETL-процессов в рамках проекта.

Существует два основных способа использования материализованных представлений:

  1. Репликация данных в отдельные базы данных для снижения нагрузки запросов.

  2. Повышение производительности запросов за счет периодического вычисления и хранения результатов сложных агрегаций данных, что позволяет пользователям запрашивать результаты сложных запросов на определенный момент времени.

Цифровой вагон

Система «Цифровой вагон» помогает собирать и анализировать большое количество данных о состоянии вагона, принимать своевременные решения о его ремонте и, таким образом, оптимизировать затраты ПГК. Проект стартовал с модуля, который позволяет отслеживать технические показатели колесных пар с датчиков, массово расположенных на сети РЖД — ИС КТИ (контрольно-технические измерения). Благодаря этой информации мы можем осуществлять их предиктивный, то есть предупредительный ремонт. Подробнее здесь.

Организация хранения данных применяемых в продукте (Oracle + Postgre)

Архитектура всех современных корпоративных систем подчиняется определенным стандартам: она должна быть построена на переиспользуемых микросервисах, то есть быть business oriented. Наш продукт не является исключением, он также построен на микросервисах. При этом в каждом из них используется своя база PostgreSQL.

Я хочу остановиться на основном источнике (хранилище), который используется в Департаменте цифрового развития — Oracle. Часть данных для продукта проходит предварительную обработку внутри хранилища данных, поскольку другие команды также используют их в своих проектах.

Постановка задачи

Данные для «Цифрового вагона» мы получаем из нескольких источников. Прежде всего, это данные с датчиков, которые установлены на сети РЖД. Они измеряют технические показатели колесных пар вагонов и позволяют видеть информацию практически в режиме реального времени (на деле же обычная пакетная обработка). Что мы можем увидеть? Номер вагона и детали, а также показатели ее технического состояния: толщину гребня и толщину обода. Полученные данные необходимо обогатить справочниками, преобразовать и снова обогатить.

Поскольку хранилище построено на Oracle, то первый вариант реализации возложили на пакеты (Package Oracle PL/SQL). Написали несколько модулей, протестировали, все заработало. Процедуры запускались с помощью планировщика задач Oracle Scheduler.

Спустя некоторое время, столкнулись с рядом проблем, приведу несколько из них:

  • пакеты содержали более тысячи строк чистого SQL-кода. Подобное затрудняет знакомство новых коллег с исходниками, а также дальнейшую поддержку;

  • формируемая таблица и генерирующая её процедура, связаны через планировщик, который может находиться в другой схеме. Следовательно, периодически возникают трудности в поиске участка кода, который заполняет сущность и задачу, запускающую его;

  • отсутствие графического интерфейса (писать запросы - это не GUI) для мониторинга за запуском задач по расписанию.

Материализованные представления в Oracle

Пару слов о материализованных представлениях. Materialized view (MV) позволяет выполнять SQL-запрос в определенный момент времени и сохранять результат в таблице (локально или в удаленной базе данных). Во время выполнения команды на создание MV, Oracle создает объект материализованного представления и обычную таблицу с таким же названием, как и MV (базовая таблица). После инициализации MV, можно перезапустить запрос MV и обновить данные в базовой таблице. Подробнее здесь.

Материализованное представление может формироваться основываясь на таблицах, представлениях и других материализованных представлениях. Таблицы, которые указываются в конструкции FROM MV, часто называют мастер таблицами.

При определении материализованного представления можно указать три параметра для обновления (ссылка):

  • когда обновлять;

  • как обновлять;

  • можно ли использовать "доверительные" ограничения при обновлении (в этой статье про это говорить не будем).

Существуют два варианта запуска обновления MV — ON COMMIT и ON DEMAND:

  • ON COMMIT - обновление происходит во время фиксации транзакции, которая изменила одну из мастер-таблиц. Реализует событийную модель, однако имеет ряд ограничений - необходимо иметь права на ON COMMIT и если обновить MV не выйдет, то вся транзакция откатится.

  • ON DEMAND - обновление происходит при ручном запуске одной из доступной процедуры из пакета: DBMS_MVIEW (REFRESHREFREASH_ALL_MVIEWSREFRESH_DEPENDENT).

Также можно указать каким образом MV будет обновляться из мастер-таблиц:

  • COMPLETE - обновляется, пересчитывая весь запрос, перезаписывая базовую таблицу;

  • FAST - применить инкрементные изменения для MV, используя информацию записанную в журналы материализованных представлений;

  • FORCE - если возможно, применяется FAST, в противном случае применяется COMPLETE;

  • NEVER - MV не будет обновляться механизмами обновления.

Сделаем небольшую сводку. Наиболее подходящий вариант обновления MV для нашей задачи - это опции ON COMMIT и FAST, однако в этом случае накладываются ограничения из-за которых данный вариант не подойдет. Собственно, о них ниже, на демонстрационном примере.

Строим ETL процесс из материализованных представлений

У нас имеются таблица-источник и справочник: table_a и dim_b соответственно. Под собой они скрывают реальные сущности, однако в разборе обойдемся абстрактными названиями. Приведенные таблицы нам необходимо объединить, отфильтровать, агрегировать, при этом оставив только актуальную информацию. В итоге мы должны получить что-то похожее (Рис. 1):

Рисунок 1. Целевая схема ETL-процесса
Рисунок 1. Целевая схема ETL-процесса

Наполнение таблицы-источника table_a крайне критично, потому что даже в "сыром" виде она несет полезную информацию. Из-за этого вариант с ON COMMIT отпадает, поскольку если наш процесс сломается, то новые данные не поступят в таблицы-источники, следовательно выбираем ON DEMAND. Да и в общем-то у нашей учетной есть права только на чтение этой таблицы.

А что на счет опции FAST? Тоже не подойдет, поскольку исходя из постановки задачи, нам нужны только свежие данные, поэтому используем COMPLETE.

Перейдем к созданию материализованных представлений:

  • Материализованное представление demo_join_mv содержащее join:

CREATE MATERIALIZED VIEW demo_join_mv
REFRESH complete ON demand AS
SELECT
    ROW_NUMBER() OVER(
        PARTITION BY a.CAR_NUMBER ORDER BY a.BUSINESS_DT DESC, a.NODE_TYPE, a.DETAIL_TYPE
    ) rn,
    b.CAR_ID, a.CAR_NUMBER,
    a.NODE_TYPE, a.DETAIL_TYPE,
    a.RIDGE_LEFT, a.RIDGE_RIGHT,
    a.BUSINESS_DT
FROM
    table_a a, dim_b b
WHERE
    b.CAR_NUMBER = a.CAR_NUMBER
    AND b.STATE <> 5  -- ограничиваем выборку с помощью справочника

Содержимое demo_join_mv, где RN - это номер колесной пары:

RN

CAR_ID

CAR_NUMBER

NODE_TYPE

DETAIL_TYPE

RIDGE_LEFT

RIDGE_RIGHT

BUSINESS_DT

1

543210123456

21011013

11

1

30

30

2021-05-22

2

543210123456

21011013

11

2

30

30

2021-05-22

3

543210123456

21011013

12

1

30

30

2021-05-22

4

543210123456

21011013

12

2

30

30

2021-05-22

1

654321012345

22351042

11

1

30

30

2021-05-08

2

654321012345

22351042

11

2

30

30

2021-05-08

3

654321012345

22351042

12

1

30

30

2021-05-08

4

654321012345

22351042

12

2

30

30

2021-05-08

  • Материализованное представление demo_agg_mv содержащее aggregates:

CREATE MATERIALIZED VIEW demo_agg_mv
REFRESH complete ON demand AS
SELECT
    dj.CAR_ID, dj.CAR_NUMBER,
    MAX(CASE WHEN dj.RN = 1 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_1,
    MAX(CASE WHEN dj.RN = 1 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_1,
    MAX(CASE WHEN dj.RN = 2 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_2,
    MAX(CASE WHEN dj.RN = 2 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_2,
    MAX(CASE WHEN dj.RN = 3 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_3,
    MAX(CASE WHEN dj.RN = 3 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_3,
    MAX(CASE WHEN dj.RN = 4 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_4,
    MAX(CASE WHEN dj.RN = 4 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_4,
    dj.BUSINESS_DT
FROM
    DEMO_JOIN_MV dj
WHERE
    dj.RN <= 4
GROUP BY
    dj.CAR_ID, dj.CAR_NUMBER, dj.BUSINESS_DT

Содержимое demo_agg_mv:

CAR_ID

CAR_NUMBER

RIDGE_LEFT_1

RIDGE_RIGHT_1

RIDGE_LEFT_2

RIDGE_RIGHT_2

RIDGE_LEFT_3

RIDGE_RIGHT_3

RIDGE_LEFT_4

RIDGE_RIGHT_4

BUSINESS_DT

543210123456

21011013

30

30

30

30

30

30

30

30

2021-05-22

654321012345

22351042

30

30

30

30

30

30

30

30

2021-05-08

И получаем следующий результат (Рис. 2). Полученная схема удовлетворяет целевому варианту (Рис. 1):

Рисунок 2. Разработанная схема ETL-процесса
Рисунок 2. Разработанная схема ETL-процесса

Сущности, продолжающие ETL-процесс, опущены для упрощения

Осталась проблема запуска обновлений, описанная в начале статьи. Чтобы избежать использования планировщика Oracle, для оркестрации запуска обновлений MV мы решили использовать инструмент AirFlow (ссылка). Данный инструмент имеет приятный веб-интерфейс и удобный доступ к логам. Для его работы необходимо написать python скрипт (DAG), в рамках которого задачи выполняются по прописанной логике и заданному расписанию. В качестве завершающего этапа DAG'а можно настроить отправку оповещения в Telegram.

Таким образом можно существенно снизить кодовую базу реализованную в пакетах и передать планирование запуска задач специализированному инструменту.

Разные полезности

Для более комфортной работы с материализованными представлениями в Oracle предлагаю подборку сниппетов:

  • Запуск обновления MV

BEGIN
    -- METHOD=>'C' - параметр обновления: 'C' - COMPLETE, 'F' - FAST, '?' - FORCE
    -- atomic_refresh=>FALSE - по умолчанию TRUE, 
    --  вместо DML выражения DELETE применяется DDL выражение TRUNCATE для более быстрой очистки таблицы
    DBMS_MVIEW.REFRESH('DW_DEMO_JOIN_MV', METHOD=>'C', atomic_refresh=>FALSE);
    DBMS_MVIEW.REFRESH('DW_DEMO_AGG_MV', METHOD=>'C', atomic_refresh=>FALSE);
END;
/
  • Запрос, для получения связей между MV и мастер-таблицами

SELECT
    OWNER AS view_schema,
    NAME AS view_name,
    REFERENCED_OWNER AS referenced_schema,
    REFERENCED_NAME AS referenced_table
FROM
    sys.ALL_DEPENDENCIES
WHERE
    "TYPE" = 'MATERIALIZED VIEW'
    AND REFERENCED_TYPE = 'TABLE'
    AND NAME <> REFERENCED_NAME
    AND NAME LIKE 'DW_DEMO_%'  -- ваше название
ORDER BY
    view_schema, view_name
    
-- Результат
-- |VIEW_SCHEMA|VIEW_NAME      |REFERENCED_SCHEMA|REFERENCED_TABLE    |
-- |-----------|---------------|-----------------|--------------------|
-- |SCHM_A     |DW_DEMO_AGG_MV |SCHM_A           |DW_DEMO_JOIN_MV     |
-- |SCHM_A     |DW_DEMO_JOIN_MV|SCHM_B           |WPC                 |
-- |SCHM_A     |DW_DEMO_JOIN_MV|SCHM_C           |RK                  |
  • Последнее обновление MV

SELECT
    MVIEW_NAME,
    LAST_REFRESH_TYPE,
    LAST_REFRESH_DATE
FROM 
    ALL_MVIEWS
WHERE 
    MVIEW_NAME LIKE 'DW_DEMO_%'  -- ваше название
ORDER BY 
    1, 3
	
-- Результат
-- |MVIEW_NAME     |LAST_REFRESH_TYPE|LAST_REFRESH_DATE      |
-- |---------------|-----------------|-----------------------|
-- |DW_DEMO_AGG_MV |COMPLETE         |2021-11-10 12:19:11.000|
-- |DW_DEMO_JOIN_MV|COMPLETE         |2021-11-10 12:18:54.000|
  • Отрисовка графа зависимостей MV и мастер-таблиц

import graphviz

import pandas as pd
import sqlalchemy

# Создаем подключение к БД
db_engine = sqlalchemy.create_engine("oracle+cx_oracle://%(user)s:%(password)s@%(dsn)s")

# Получаем DataFrame с зависимостями мат представлений и таблиц
mv_df = pd.read_sql("""
SELECT
    OWNER AS view_schema,
    NAME AS view_name,
    REFERENCED_OWNER AS referenced_schema,
    REFERENCED_NAME AS referenced_table
FROM
    sys.ALL_DEPENDENCIES
WHERE
    "TYPE" = 'MATERIALIZED VIEW'
    AND REFERENCED_TYPE = 'TABLE'
    AND NAME <> REFERENCED_NAME
    AND NAME LIKE 'DW_DEMO_%'  -- Названия ваших мат представлений
ORDER BY
    view_schema, view_name
""", con=db_engine)

# Формируем списки имен мат представлений и таблиц
mvs_list = mv_df["view_name"].unique().tolist()
tables_list = [t for t in mv_df["referenced_table"].unique().tolist() if t not in mvs_list]

# Создаем ориентированный граф на языке DOT
mv_diag = graphviz.Digraph("MV diagram")
mv_diag.attr(rankdir="LR", size="8,5")

# Добавляем в граф таблицы
mv_diag.attr("node", shape="ellipse")
for t in tables_list:
    mv_diag.node(t)

# Добавляем в граф мат представления
mv_diag.attr("node", shape="box")
for v in mvs_list:
    mv_diag.node(v)

# Добавляем связи между узлами
for _, row in mv_df.iterrows():
    mv_diag.edge(row["referenced_table"], row["view_name"])

# Сохраняем и отображаем результат
mv_diag.view("MV diagram")
# или
mv_diag.render("mv_diagram", format="png", view=True)

Заключение

В ходе внедрения материализованных представлений мы выявили некоторые минусы:

  • ограничения по использованию всех возможностей MV ввиду выстроенной архитектуры (нет возможности создать триггеры на другие сущности из-за разделения зон ответственности);

  • обработку данных для приложений, которую избыточно реализовывать на стороне сервиса, но при этом сложно реализовать чистым SQL кодом, необходимо реализовывать с помощью PL/SQL;

  • появляется много дополнительных таблиц вместо одного пакета (хотя мы этого и добивались - притянутый минус).

Тем не менее улучшение построения витрин – бесконечный процесс, который не ограничивается использованием какой-то конкретной технологии. Данный процесс требует вовлечения инженеров и архитекторов данных, которые постоянно повышают свой уровень навыков. Мы заинтересованы в новых членах команды, все вакансии компании в сфере IT можно посмотреть здесь. Ну а я готов ответить на ваши вопросы и обсудить опыт ПГК в комментариях.

P.S.

В данной статье описан один из простых подходов в работе с материализованными представлениями. Мы обошлись без использования MV log, который необходим для "быстрых" обновлений, не затронули создание MV на основе уже существующей таблицы и другие интересные темы. Если вам интересно более детально изучить механизм Materialized View в Oracle, то советую следующие источники:

  1. https://learning.oreilly.com/library/view/pro-oracle-database/9781430229704

  2. https://docs.oracle.com/cd/B19306_01/server.102/b14223/basicmv.htm

  3. https://docs.oracle.com/database/121/DWHSG/advmv.htm

Комментарии (7)


  1. Ustas4
    26.11.2021 01:37

    А рассматривали ли вы вариант использования Talend?


    1. SultanK Автор
      26.11.2021 01:51
      +1

      Если я не ошибаюсь, то в основном Talend применяется для интеграции данных, то есть извлечение данных из источника, применение минимальных преобразований и складывание в другом месте. Для подобных задач, в ПГК, используется ETL-инструмент Apache NiFi. Например для получения данных КТИ, которые мы закупаем у РЖД, реализована группа процессов, которые извлекают дневной срез данных по вагонам и складывает в БД Oracle.

      Также, для работы с Talend используется Java или Perl, однако основным языком программирования в ПГК является Python.


      1. Ustas4
        26.11.2021 12:51

        Пусть ПГК использует то, что есть. Я для ETL использовал богатую палитру инструментов Talend, не написав при этом ни строчки кода.

        Исторически тема интересна для меня, как для бывшего вагонника 1983 г. БелИИЖТ.


      1. EvgenyVilkov
        01.12.2021 18:20

        NiFi - это интеграциооный инструмент. Talend - полноценный ETL (ELT при желании).

        Делать ETL написанием кода в процедуре или в MV весьма порочная практика и к сожалению наиболее часто встречающаяся


  1. mebelousov
    29.11.2021 21:02

    Если ваша команда использует airflow, то можно хранить код в DAG-ах, а не в базе. Дополнительно получаем

    • возможности декомпозиции по таскам

    • python вместо PL/SQL

    В каких случаях предпочтительнее использовать материализованные представление вместо кода рядом с airflow?


    1. SultanK Автор
      29.11.2021 22:10

      Корпоративный AirFlow в ПГК был поднят совсем недавно, до него был развернут инстанс в докере на машине, которая предназначена для обучения моделей (ребята потеснились немного и разрешили его там развернуть, только архитекторам не говорите ????).

      Соответственно использовать данный инстанс на регламенте для трансформации данных не представлялось возможным.

      В настоящее время, задача по переносу процесса обработки данных КТИ на корпоративный AirFlow находится в беклоге и в ближайшее время будет реализована.

      Материализованные представления отлично подходят для аггрегации данных, поэтому подобные операции переписываться на Sql Alchemy не будут.


  1. EvgenyVilkov
    01.12.2021 18:24

    В свое время решил данную задачу просто написав планировщих запуска кода процедур из пакетов. API позволяло выстроить граф зависимостей и потом его визиализировать на веб морде в виде диаграммы ганта.

    Повзрослев, перешел на ELT подход в разработке - генерация исполняемого SQL кода на основе метаданных, формируемых диаграммой процесса. Все остальное, простите, - костыли