Приветствую! Меня зовут Жумабаев Султан, и в ПГК я работаю инженером данных на проекте «Цифровой вагон». Могу уверенно сказать, Oracle сегодня — одно из самых популярных и надежных хранилищ, хотя рынок и предлагает множество новых современных разработок. В этой статье я расскажу про использование Materialized Views для организации ETL-процессов в рамках проекта.
Существует два основных способа использования материализованных представлений:
Репликация данных в отдельные базы данных для снижения нагрузки запросов.
Повышение производительности запросов за счет периодического вычисления и хранения результатов сложных агрегаций данных, что позволяет пользователям запрашивать результаты сложных запросов на определенный момент времени.
Цифровой вагон
Система «Цифровой вагон» помогает собирать и анализировать большое количество данных о состоянии вагона, принимать своевременные решения о его ремонте и, таким образом, оптимизировать затраты ПГК. Проект стартовал с модуля, который позволяет отслеживать технические показатели колесных пар с датчиков, массово расположенных на сети РЖД — ИС КТИ (контрольно-технические измерения). Благодаря этой информации мы можем осуществлять их предиктивный, то есть предупредительный ремонт. Подробнее здесь.
Организация хранения данных применяемых в продукте (Oracle + Postgre)
Архитектура всех современных корпоративных систем подчиняется определенным стандартам: она должна быть построена на переиспользуемых микросервисах, то есть быть business oriented. Наш продукт не является исключением, он также построен на микросервисах. При этом в каждом из них используется своя база PostgreSQL.
Я хочу остановиться на основном источнике (хранилище), который используется в Департаменте цифрового развития — Oracle. Часть данных для продукта проходит предварительную обработку внутри хранилища данных, поскольку другие команды также используют их в своих проектах.
Постановка задачи
Данные для «Цифрового вагона» мы получаем из нескольких источников. Прежде всего, это данные с датчиков, которые установлены на сети РЖД. Они измеряют технические показатели колесных пар вагонов и позволяют видеть информацию практически в режиме реального времени (на деле же обычная пакетная обработка). Что мы можем увидеть? Номер вагона и детали, а также показатели ее технического состояния: толщину гребня и толщину обода. Полученные данные необходимо обогатить справочниками, преобразовать и снова обогатить.
Поскольку хранилище построено на Oracle, то первый вариант реализации возложили на пакеты (Package Oracle PL/SQL). Написали несколько модулей, протестировали, все заработало. Процедуры запускались с помощью планировщика задач Oracle Scheduler.
Спустя некоторое время, столкнулись с рядом проблем, приведу несколько из них:
пакеты содержали более тысячи строк чистого SQL-кода. Подобное затрудняет знакомство новых коллег с исходниками, а также дальнейшую поддержку;
формируемая таблица и генерирующая её процедура, связаны через планировщик, который может находиться в другой схеме. Следовательно, периодически возникают трудности в поиске участка кода, который заполняет сущность и задачу, запускающую его;
отсутствие графического интерфейса (
писать запросы - это не GUI) для мониторинга за запуском задач по расписанию.
Материализованные представления в Oracle
Пару слов о материализованных представлениях. Materialized view (MV) позволяет выполнять SQL-запрос в определенный момент времени и сохранять результат в таблице (локально или в удаленной базе данных). Во время выполнения команды на создание MV, Oracle создает объект материализованного представления и обычную таблицу с таким же названием, как и MV (базовая таблица). После инициализации MV, можно перезапустить запрос MV и обновить данные в базовой таблице. Подробнее здесь.
Материализованное представление может формироваться основываясь на таблицах, представлениях и других материализованных представлениях. Таблицы, которые указываются в конструкции FROM
MV, часто называют мастер таблицами.
При определении материализованного представления можно указать три параметра для обновления (ссылка):
когда обновлять;
как обновлять;
можно ли использовать "доверительные" ограничения при обновлении (в этой статье про это говорить не будем).
Существуют два варианта запуска обновления MV — ON COMMIT
и ON DEMAND
:
ON COMMIT
- обновление происходит во время фиксации транзакции, которая изменила одну из мастер-таблиц. Реализует событийную модель, однако имеет ряд ограничений - необходимо иметь права наON COMMIT
и если обновить MV не выйдет, то вся транзакция откатится.ON DEMAND
- обновление происходит при ручном запуске одной из доступной процедуры из пакета:DBMS_MVIEW
(REFRESH
,REFREASH_ALL_MVIEWS
,REFRESH_DEPENDENT
).
Также можно указать каким образом MV будет обновляться из мастер-таблиц:
COMPLETE
- обновляется, пересчитывая весь запрос, перезаписывая базовую таблицу;FAST
- применить инкрементные изменения для MV, используя информацию записанную в журналы материализованных представлений;FORCE
- если возможно, применяетсяFAST
, в противном случае применяетсяCOMPLETE
;NEVER
- MV не будет обновляться механизмами обновления.
Сделаем небольшую сводку. Наиболее подходящий вариант обновления MV для нашей задачи - это опции ON COMMIT
и FAST
, однако в этом случае накладываются ограничения из-за которых данный вариант не подойдет. Собственно, о них ниже, на демонстрационном примере.
Строим ETL процесс из материализованных представлений
У нас имеются таблица-источник и справочник: table_a
и dim_b
соответственно. Под собой они скрывают реальные сущности, однако в разборе обойдемся абстрактными названиями. Приведенные таблицы нам необходимо объединить, отфильтровать, агрегировать, при этом оставив только актуальную информацию. В итоге мы должны получить что-то похожее (Рис. 1):
Наполнение таблицы-источника table_a
крайне критично, потому что даже в "сыром" виде она несет полезную информацию. Из-за этого вариант с ON COMMIT
отпадает, поскольку если наш процесс сломается, то новые данные не поступят в таблицы-источники, следовательно выбираем ON DEMAND
. Да и в общем-то у нашей учетной есть права только на чтение этой таблицы.
А что на счет опции FAST
? Тоже не подойдет, поскольку исходя из постановки задачи, нам нужны только свежие данные, поэтому используем COMPLETE
.
Перейдем к созданию материализованных представлений:
Материализованное представление
demo_join_mv
содержащее join:
CREATE MATERIALIZED VIEW demo_join_mv
REFRESH complete ON demand AS
SELECT
ROW_NUMBER() OVER(
PARTITION BY a.CAR_NUMBER ORDER BY a.BUSINESS_DT DESC, a.NODE_TYPE, a.DETAIL_TYPE
) rn,
b.CAR_ID, a.CAR_NUMBER,
a.NODE_TYPE, a.DETAIL_TYPE,
a.RIDGE_LEFT, a.RIDGE_RIGHT,
a.BUSINESS_DT
FROM
table_a a, dim_b b
WHERE
b.CAR_NUMBER = a.CAR_NUMBER
AND b.STATE <> 5 -- ограничиваем выборку с помощью справочника
Содержимое demo_join_mv
, где RN
- это номер колесной пары:
RN |
CAR_ID |
CAR_NUMBER |
NODE_TYPE |
DETAIL_TYPE |
RIDGE_LEFT |
RIDGE_RIGHT |
BUSINESS_DT |
---|---|---|---|---|---|---|---|
1 |
543210123456 |
21011013 |
11 |
1 |
30 |
30 |
2021-05-22 |
2 |
543210123456 |
21011013 |
11 |
2 |
30 |
30 |
2021-05-22 |
3 |
543210123456 |
21011013 |
12 |
1 |
30 |
30 |
2021-05-22 |
4 |
543210123456 |
21011013 |
12 |
2 |
30 |
30 |
2021-05-22 |
1 |
654321012345 |
22351042 |
11 |
1 |
30 |
30 |
2021-05-08 |
2 |
654321012345 |
22351042 |
11 |
2 |
30 |
30 |
2021-05-08 |
3 |
654321012345 |
22351042 |
12 |
1 |
30 |
30 |
2021-05-08 |
4 |
654321012345 |
22351042 |
12 |
2 |
30 |
30 |
2021-05-08 |
Материализованное представление
demo_agg_mv
содержащее aggregates:
CREATE MATERIALIZED VIEW demo_agg_mv
REFRESH complete ON demand AS
SELECT
dj.CAR_ID, dj.CAR_NUMBER,
MAX(CASE WHEN dj.RN = 1 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_1,
MAX(CASE WHEN dj.RN = 1 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_1,
MAX(CASE WHEN dj.RN = 2 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_2,
MAX(CASE WHEN dj.RN = 2 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_2,
MAX(CASE WHEN dj.RN = 3 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_3,
MAX(CASE WHEN dj.RN = 3 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_3,
MAX(CASE WHEN dj.RN = 4 THEN dj.RIDGE_LEFT END) RIDGE_LEFT_4,
MAX(CASE WHEN dj.RN = 4 THEN dj.RIDGE_RIGHT END) RIDGE_RIGHT_4,
dj.BUSINESS_DT
FROM
DEMO_JOIN_MV dj
WHERE
dj.RN <= 4
GROUP BY
dj.CAR_ID, dj.CAR_NUMBER, dj.BUSINESS_DT
Содержимое demo_agg_mv
:
CAR_ID |
CAR_NUMBER |
RIDGE_LEFT_1 |
RIDGE_RIGHT_1 |
RIDGE_LEFT_2 |
RIDGE_RIGHT_2 |
RIDGE_LEFT_3 |
RIDGE_RIGHT_3 |
RIDGE_LEFT_4 |
RIDGE_RIGHT_4 |
BUSINESS_DT |
---|---|---|---|---|---|---|---|---|---|---|
543210123456 |
21011013 |
30 |
30 |
30 |
30 |
30 |
30 |
30 |
30 |
2021-05-22 |
654321012345 |
22351042 |
30 |
30 |
30 |
30 |
30 |
30 |
30 |
30 |
2021-05-08 |
И получаем следующий результат (Рис. 2). Полученная схема удовлетворяет целевому варианту (Рис. 1):
Сущности, продолжающие ETL-процесс, опущены для упрощения
Осталась проблема запуска обновлений, описанная в начале статьи. Чтобы избежать использования планировщика Oracle, для оркестрации запуска обновлений MV мы решили использовать инструмент AirFlow (ссылка). Данный инструмент имеет приятный веб-интерфейс и удобный доступ к логам. Для его работы необходимо написать python скрипт (DAG), в рамках которого задачи выполняются по прописанной логике и заданному расписанию. В качестве завершающего этапа DAG'а можно настроить отправку оповещения в Telegram.
Таким образом можно существенно снизить кодовую базу реализованную в пакетах и передать планирование запуска задач специализированному инструменту.
Разные полезности
Для более комфортной работы с материализованными представлениями в Oracle предлагаю подборку сниппетов:
Запуск обновления MV
BEGIN
-- METHOD=>'C' - параметр обновления: 'C' - COMPLETE, 'F' - FAST, '?' - FORCE
-- atomic_refresh=>FALSE - по умолчанию TRUE,
-- вместо DML выражения DELETE применяется DDL выражение TRUNCATE для более быстрой очистки таблицы
DBMS_MVIEW.REFRESH('DW_DEMO_JOIN_MV', METHOD=>'C', atomic_refresh=>FALSE);
DBMS_MVIEW.REFRESH('DW_DEMO_AGG_MV', METHOD=>'C', atomic_refresh=>FALSE);
END;
/
Запрос, для получения связей между MV и мастер-таблицами
SELECT
OWNER AS view_schema,
NAME AS view_name,
REFERENCED_OWNER AS referenced_schema,
REFERENCED_NAME AS referenced_table
FROM
sys.ALL_DEPENDENCIES
WHERE
"TYPE" = 'MATERIALIZED VIEW'
AND REFERENCED_TYPE = 'TABLE'
AND NAME <> REFERENCED_NAME
AND NAME LIKE 'DW_DEMO_%' -- ваше название
ORDER BY
view_schema, view_name
-- Результат
-- |VIEW_SCHEMA|VIEW_NAME |REFERENCED_SCHEMA|REFERENCED_TABLE |
-- |-----------|---------------|-----------------|--------------------|
-- |SCHM_A |DW_DEMO_AGG_MV |SCHM_A |DW_DEMO_JOIN_MV |
-- |SCHM_A |DW_DEMO_JOIN_MV|SCHM_B |WPC |
-- |SCHM_A |DW_DEMO_JOIN_MV|SCHM_C |RK |
Последнее обновление MV
SELECT
MVIEW_NAME,
LAST_REFRESH_TYPE,
LAST_REFRESH_DATE
FROM
ALL_MVIEWS
WHERE
MVIEW_NAME LIKE 'DW_DEMO_%' -- ваше название
ORDER BY
1, 3
-- Результат
-- |MVIEW_NAME |LAST_REFRESH_TYPE|LAST_REFRESH_DATE |
-- |---------------|-----------------|-----------------------|
-- |DW_DEMO_AGG_MV |COMPLETE |2021-11-10 12:19:11.000|
-- |DW_DEMO_JOIN_MV|COMPLETE |2021-11-10 12:18:54.000|
Отрисовка графа зависимостей MV и мастер-таблиц
import graphviz
import pandas as pd
import sqlalchemy
# Создаем подключение к БД
db_engine = sqlalchemy.create_engine("oracle+cx_oracle://%(user)s:%(password)s@%(dsn)s")
# Получаем DataFrame с зависимостями мат представлений и таблиц
mv_df = pd.read_sql("""
SELECT
OWNER AS view_schema,
NAME AS view_name,
REFERENCED_OWNER AS referenced_schema,
REFERENCED_NAME AS referenced_table
FROM
sys.ALL_DEPENDENCIES
WHERE
"TYPE" = 'MATERIALIZED VIEW'
AND REFERENCED_TYPE = 'TABLE'
AND NAME <> REFERENCED_NAME
AND NAME LIKE 'DW_DEMO_%' -- Названия ваших мат представлений
ORDER BY
view_schema, view_name
""", con=db_engine)
# Формируем списки имен мат представлений и таблиц
mvs_list = mv_df["view_name"].unique().tolist()
tables_list = [t for t in mv_df["referenced_table"].unique().tolist() if t not in mvs_list]
# Создаем ориентированный граф на языке DOT
mv_diag = graphviz.Digraph("MV diagram")
mv_diag.attr(rankdir="LR", size="8,5")
# Добавляем в граф таблицы
mv_diag.attr("node", shape="ellipse")
for t in tables_list:
mv_diag.node(t)
# Добавляем в граф мат представления
mv_diag.attr("node", shape="box")
for v in mvs_list:
mv_diag.node(v)
# Добавляем связи между узлами
for _, row in mv_df.iterrows():
mv_diag.edge(row["referenced_table"], row["view_name"])
# Сохраняем и отображаем результат
mv_diag.view("MV diagram")
# или
mv_diag.render("mv_diagram", format="png", view=True)
Заключение
В ходе внедрения материализованных представлений мы выявили некоторые минусы:
ограничения по использованию всех возможностей MV ввиду выстроенной архитектуры (нет возможности создать триггеры на другие сущности из-за разделения зон ответственности);
обработку данных для приложений, которую избыточно реализовывать на стороне сервиса, но при этом сложно реализовать чистым SQL кодом, необходимо реализовывать с помощью PL/SQL;
появляется много дополнительных таблиц вместо одного пакета (хотя мы этого и добивались - притянутый минус).
Тем не менее улучшение построения витрин – бесконечный процесс, который не ограничивается использованием какой-то конкретной технологии. Данный процесс требует вовлечения инженеров и архитекторов данных, которые постоянно повышают свой уровень навыков. Мы заинтересованы в новых членах команды, все вакансии компании в сфере IT можно посмотреть здесь. Ну а я готов ответить на ваши вопросы и обсудить опыт ПГК в комментариях.
P.S.
В данной статье описан один из простых подходов в работе с материализованными представлениями. Мы обошлись без использования MV log
, который необходим для "быстрых" обновлений, не затронули создание MV на основе уже существующей таблицы и другие интересные темы. Если вам интересно более детально изучить механизм Materialized View в Oracle, то советую следующие источники:
Комментарии (7)
mebelousov
29.11.2021 21:02Если ваша команда использует airflow, то можно хранить код в DAG-ах, а не в базе. Дополнительно получаем
возможности декомпозиции по таскам
python вместо PL/SQL
В каких случаях предпочтительнее использовать материализованные представление вместо кода рядом с airflow?
SultanK Автор
29.11.2021 22:10Корпоративный AirFlow в ПГК был поднят совсем недавно, до него был развернут инстанс в докере на машине, которая предназначена для обучения моделей (ребята потеснились немного и разрешили его там развернуть, только архитекторам не говорите ????).
Соответственно использовать данный инстанс на регламенте для трансформации данных не представлялось возможным.
В настоящее время, задача по переносу процесса обработки данных КТИ на корпоративный AirFlow находится в беклоге и в ближайшее время будет реализована.
Материализованные представления отлично подходят для аггрегации данных, поэтому подобные операции переписываться на Sql Alchemy не будут.
EvgenyVilkov
01.12.2021 18:24В свое время решил данную задачу просто написав планировщих запуска кода процедур из пакетов. API позволяло выстроить граф зависимостей и потом его визиализировать на веб морде в виде диаграммы ганта.
Повзрослев, перешел на ELT подход в разработке - генерация исполняемого SQL кода на основе метаданных, формируемых диаграммой процесса. Все остальное, простите, - костыли
Ustas4
А рассматривали ли вы вариант использования Talend?
SultanK Автор
Если я не ошибаюсь, то в основном Talend применяется для интеграции данных, то есть извлечение данных из источника, применение минимальных преобразований и складывание в другом месте. Для подобных задач, в ПГК, используется ETL-инструмент Apache NiFi. Например для получения данных КТИ, которые мы закупаем у РЖД, реализована группа процессов, которые извлекают дневной срез данных по вагонам и складывает в БД Oracle.
Также, для работы с Talend используется Java или Perl, однако основным языком программирования в ПГК является Python.
Ustas4
Пусть ПГК использует то, что есть. Я для ETL использовал богатую палитру инструментов Talend, не написав при этом ни строчки кода.
Исторически тема интересна для меня, как для бывшего вагонника 1983 г. БелИИЖТ.
EvgenyVilkov
NiFi - это интеграциооный инструмент. Talend - полноценный ETL (ELT при желании).
Делать ETL написанием кода в процедуре или в MV весьма порочная практика и к сожалению наиболее часто встречающаяся