Привет, Хабр! Меня зовут Михаил Килинский, я — Senior Developer в проекте Data Warehouse «Лаборатории Касперского». Наша команда строит хранилища данных и разрабатывает ETL- и ELT-процессы, мы пишем на Python, SQL и C# (.NET), а весь код находится в монорепозитории.
Гибкие методологии построения хранилищ данных — очень популярный тренд. Причина — возрастающая сложность корпоративных моделей данных и необходимость интеграции большого числа разнородных источников, которые включают в себя не только традиционные СУБД и плоские файлы, но и различные real-time-брокеры сообщений, внешние API и базы данных NoSQL.
В этой статье хочу рассказать, как мы изначально хранили данные, почему решили внедрить методологии Data Lake и Data Vault и к чему это привело. Кроме того, покажу, как мы изобрели свой велосипед разработали свой фреймворк по автоматизации работы с помощью модели Data Vault.
Как было
Изначально у нас было классическое BI-DWH, построенное по Кимбаллу. Хранилище данных было развернуто в Microsoft SQL Server, а так как основные источники данных представляли собой плоские файлы или другие серверы MS SQL, то для интеграции хватало такого инструмента, как SQL Server Integration Services (SSIS).
Архитектура выглядела примерно так:
Такой подход имел некоторые очевидные проблемы, связанные прежде всего со значительными сложностями повторного использования кода в случае разработки ETL-процессов на SSIS, а также сложностями в проведении ревью и разработке автотестов.
Впрочем, это остается уместным, если ваши источники данных достаточно однородны и их немного, однако при добавлении новых типов источников трудозатраты на разработку могут резко вырасти.
Выбираем новую концепцию
Для более удобного внедрения механизмов обмена сообщениями между различными информационными системами в рамках «Лаборатории Касперского» активно используется подход Enterprise Service Bus (ESB).
Поскольку инфраструктура ESB построена на базе группы распределенных кластеров RabbitMQ и брокеров Apache Kafka, для нашей команды стало понятно, что существующих инструментов построения витрин данных уже недостаточно.
Теперь для построения одной витрины было необходимо подключаться к очередям Kafka и RabbitMQ, забирать большие объемы данных в near real-time режиме и объединять их с существующими в DWH справочниками. При этом надо было быстро реагировать на изменения схемы сообщений в очередях, что достаточно непросто в случае, если приемником данных выступает СУБД, в которой принят подход Schema-on-Write. Также одним из требований была работа с оффсетами — для потенциальной возможности перечитывать сообщения за определенные периоды времени.
Так мы решили создать слой Data Lake хранилища на основе Hadoop. Этот слой становился единой точкой входа для таких потоков данных, как брокеры сообщений, внешние API или облачные хранилища, в частности GCP.
Преимущества подхода Data Lake:
Возможность быстро подключать неструктурированные или слабоструктурированные источники данных благодаря подходу Schema-on-Read.
Хранение исторических данных в одном месте, что было удобно как для отладки и поиска ошибок в ETL-процессах, так и для проведения сложной аналитики или построения ML-моделей.
Простота масштабирования ресурсов.
Возможность использования «сырых» данных для исследовательских задач, в частности для построения ML-моделей.
Однако такой подход потребовал введения еще одного промежуточного слоя хранения данных, в который попадали бы детальные данные из Data Lake, необходимые для построения витрин. И тут мы столкнулись с первой проблемой: выбором места для хранения данных в этом детальном слое. Дело в том, что для построения витрин были необходимы данные, как находящиеся в Data Lake, так и уже хранящиеся в DWH.
Дилемма заключалась в выборе между двумя решениями: либо перенос необходимых данных из DWH в Data Lake, построение витрин на инфраструктуре Hadoop и затем их загрузка в DWH, либо перенос данных из Data Lake ближе к витринам в DWH и сохранение их в более структурированном формате.
У каждого подхода были свои плюсы и минусы, но мы выбрали второй вариант, когда данные из Data Lake попадают в DWH на базе MS SQL Server.
Для хранения данных в промежуточном слое была выбрана методология Data Vault. Подробнее о ней можно почитать, например, в этом хабропосте. Там же есть и про ее недостатки, но, кстати, почему-то нет главного: нельзя сказать заранее, подойдет ли она именно в вашем случае.
Что касается нас, то плюсы были следующими:
Гибкость. Так как в Data Lake данные из брокеров сохранялись AS IS, мы часто сталкивались с изменением схемы сообщений. Методология Data Vault позволяет решить эту проблему с относительно небольшими издержками: например, при появлении нового поля в сообщении можно завести для него новый satellite.
Удобство интеграции различных источников. Источниками данных для одной и той же сущности Data Vault могут быть несколько топиков, сообщения из которых поступают и загружаются независимо в совокупность таблиц hub, satellites и links.
Предсказуемость разработки. Теперь мы имеем понятный процесс добавления необходимых данных в Data Vault и опыт в создании витрин в слое DWH на его основе. Это позволяет лучше прогнозировать сроки реализации новых требований от бизнеса.
Хранение исторических данных. Модель данных Data Vault предоставляет удобную возможность определить состояние сущности на любой выбранный момент времени и отслеживать историю ее изменений, а это особенно актуально для построения витрин в слое DWH.
Увы, в реальности редко все проходит гладко :) Вот и мы столкнулись с рядом проблем:
1. Проблемы с производительностью запросов к сущностям Data Vault.
В связи с высокой нормализацией модели мы сталкиваемся с обилием операций join, а из-за хранения изменений в данных нам необходимо часто использовать оконные функции. Хотя это частая проблема, о которой много говорят, я не могу сказать, что это сильно мешало разработке.
Естественно, все зависит от объема ваших данных и технологий, на которых реализован Data Vault. Если вы используете реляционную БД, то эта проблема частично решается благодаря грамотному проектированию индексов для таблиц. В распределенных системах может помочь грамотное задание партицирования и использование дистрибуции по ключам. Однако хочу заметить, что большинство ETL-процессов носят инкрементальный характер, поэтому запускать сложные запросы с большим количеством оконных функций и джоинов нужно будет на небольших порциях данных.
2. Сложность моделирования сущностей Data Vault.
Может показаться, что процесс моделирования данных для Data Vault хорошо регламентирован, но это не так. Например, один и тот же атрибут в исходном сообщении можно моделировать, как колонку в satellite, а можно — как полноценную сущность с помощью Hub и Link. Вопреки ожиданиям, такие решения не всегда очевидны.
3. Необходимость иметь некий фреймворк для автоматизации загрузки в Data Vault.
Ручная загрузка данных в таблицы Data Vault — это очень неудобно. Если писать вручную огромное количество похожих процедур по заполнению таблиц, то в разы повышается риск ошибок: например, легко пропустить нужное поле при расчете хеша в satellite. Да и теряется одно из главных преимуществ модели Data Vault — ускорение разработки и быстрая поставка ценности пользователям. Из-за высокой нормализации приходится иметь дело с огромным количеством таблиц с повторяющейся структурой, поэтому их ручное заполнение очень сильно замедляет разработку.
После внедрения новых подходов архитектура нашего хранилища стала выглядеть так:
В качестве слоя представления данных используется текущее DWH, которое централизованно хранит необходимые для пользователей данные в неизбыточной форме, удобной для аналитических отчетов. Она является источником данных для формирования таких отчетов в том числе и в OLAP-системах.
В свою очередь, Data Vault в структурированном виде хранит все данные за все время. При этом такая модель данных легко расширяется и преобразуется при изменении исходных данных и добавлении новых источников, а также в случае, если такие изменения необходимо внести для повышения удобства разработки или корректности ETL-процессов. В то же время структура DWH согласована с пользователями и является более устойчивой к изменениям в исходных данных.
Таким образом, оба слоя данных дополняют друг друга и позволяют гибко реагировать на изменения, не ломая существующую систему аналитической отчетности.
Из чего состоит наш новый фреймворк
Для автоматизации процессов загрузки в Data Vault нам был необходим некий фреймворк, который мог бы взять на себя следующие задачи:
Определение инкремента в источнике данных. Для нас такой источник — external-таблицы в Hive, в которые сохраняются данные из брокеров сообщений. Почему не забирать данные напрямую из брокеров? Такой подход также имеет право на жизнь — тем более что определение инкремента в этом случае будет происходить естественно, ведь при чтении из Kafka используется механизм оффсетов. Однако такое решение противоречит концепции ELT, в которой именно озеро данных является единой точкой входа для всех процессов обработки.
Чтение порции данных из источника и применение необходимых трансформаций. Сообщения, приходящие из брокеров, имеют сложную структуру с большой степенью вложенности и часто содержат массивы. Часто для того, чтобы получить необходимую нам плоскую структуру данных, необходимо применять оконные функции и операции explode.
Обогащение данных техническими полями. Необходимо посчитать хеши от ключей, а для satellite-ов также хеши от всех атрибутов, добавить даты загрузки, начало действия записи для реализации SCD2 и ссылку на источник данных.
Маппинг атрибутов на поля сущностей Data Vault. Полученная на предыдущих шагах структура данных может быть источником для нескольких таблиц hub, link и satellite.
Загрузка данных в реляционную БД, в нашем случае SQL Server. При этом загрузка сущностей из одного источника должна происходить параллельно. Это возможно благодаря тому, что ключи в таблицах вычисляются с помощью хеш-функции. Таким образом, при записи данных мы можем не проверять ссылочную целостность.
Заполнение хранилища метаданных и логирование.
Все шаги по загрузке данных можно разделить на рутинные, которые лучше полностью автоматизировать (чтение данных, генерация SQL-скриптов для загрузки в таблицы Data Vault и реализации механизма SCD2), и более творческие (трансформация данных и маппинг полей), для реализации которых не обойтись без разработчика.
Чтобы получить инструмент, который бы полностью удовлетворял всем нашим требованиям, мы решили разработать свой фреймворк для загрузки данных в Data Vault.
Основным движком для процессинга данных мы выбрали Spark из-за его популярности, удобства работы с вложенными структурами данных и возможности переносить данные между экосистемой Hadoop и СУБД.
Извлекаем данные
Для этого используется класс, реализующий простой интерфейс Source:
class Source:
"""
Базовый класс для извлечения данных
"""
__metaclass__ = ABCMeta
def __init__(self, spark: SparkSession, source: str):
self.spark = spark
self.source = source
@abstractmethod
def iter_source(self) -> t.Iterable[DataFrame]:
pass
Важно заметить, что необходимо иметь возможность читать данные из источника не все разом, а пачками, размер которых можно контролировать. Это значительно ускоряет загрузку и позволяет избежать ошибок, связанных с нехваткой памяти у spark-приложения.
В случае чтения данных из таблиц Hive мы можем итерироваться по партициям. Хотя в данный момент у нас есть реализация, работающая только с таблицами Hive, наличие единого интерфейса доступа к данным позволяет относительно легко расширить функционал фреймворка и добавить поддержку других источников данных, если это вдруг потребуется.
Трансформация и отображение данных
Эта часть — наиболее творческая. По сути, для загрузки данных в структуры Data Vault необходимо написать трансформации и отображение полей источника на поля целевой таблицы. Вся остальная работа по извлечению и загрузке данных, а также их обогащение техническими полями могут быть автоматизированы.
В качестве решения для трансформации данных часто используют стандартный SQL. Однако мы решили использовать DataFrame API из-за его большей гибкости и пригодности к тестированию, а также возможности повторного использования кода.
Для загрузки данных в Data Vault необходимо указать, какой атрибут является ключом, при этом учитывая, что ключ может быть составным. Для link-ов необходимо указать внешний ключ. Также необходимо учитывать, что прочитанные нами данные могут отображаться в несколько таблиц satellite, и для каждого необходимо указать набор атрибутов, по которым будет считаться хеш. Для описания маппинга можно использовать файлы конфигурации, однако мы решили использовать другой подход, при котором информация о ключах и принадлежности атрибутов тому или иному satellite хранится в метаданных полей датафрейма. Этот подход показался нам более естественным и читаемым.
Таким образом, для загрузки новой сущности в Data Vault разработчику необходимо только разработать класс, реализующий интерфейс Mapper:
class Mapper:
__metaclass__ = ABCMeta
def __init__(self, df: DataFrame, spark: SparkSession):
self.df = df
self.spark = spark
@abstractmethod
def map(self) -> DataFrame:
pass
Чтобы было нагляднее, приведу небольшой пример. Предположим, нам необходимо отобразить в сущности Data Vault сообщения следующего простого вида:
{
"OrderId": 125,
"OrderDate": "2024-09-27T00:00:00",
"ProductCode": "XXX-XX",
"FinancialInfo": {
"Price": 1997.08,
"Quantity": 2
}
}
Тут у нас есть ID некоторого заказа, его дата, некоторая финансовая информация и код продукта, который был продан в рамках заказа. При этом атрибут OrderId является ключом сущности «заказ» и должен попасть в таблицу hub_Orders, атрибут OrderDate является описанием заказа, и его место в таблице satellite sat_Orders, а цена и количество проданных единиц продукта — это финансовая информация, и для этого предназначен другой satellite sat_Orders_ FinancialInfo. Также заказ содержит ссылку на другую сущность — «продукт», и эта связь должна попасть в link lnk_Orders_Products.
ER-диаграмма выглядит следующим образом:
Новое отображение для сущности Data Vault будет выглядеть так:
def map(self) -> DataFrame:
return df.select(
col("value.OrderId").alias("order_id", metadata={"key_column": True, "key_column_name": "order_key"}),
col("value.ProductCode").alias("product_code", metadata={"foreign_key_column": True, "foreign_key_column_name": "product_key"}),
col("value.OrderDate").alias("order_date", metadata={"column_group": ""}),
col("value.FinancialInfo.Price").alias("price", metadata={"column_group": " FinancialInfo"}),
col("value.FinancialInfo.Quantity").alias("quantity", metadata={"column_group": " FinancialInfo"}),
)
Так мы распарсили сообщение и указали фреймворку необходимую метаинформацию.
{"key_column": True, "key_column_name": "order_key"} говорит о том, что данный атрибут — это бизнес-ключ сущности и от него нужно рассчитать хеш и сохранить в hub с названием order_key.
{"foreign_key_column": True, "foreign_key_column_name": "product_key"} — это указание на другую сущность, ссылка на которую должна быть сохранена в link.
{"column_group": ""}, {"column_group": " FinancialInfo"} — тут мы с помощью метаданных указали, в какие таблицы satellite нужно сохранить атрибуты сообщения.
Получив на вход данный DataFrame, фреймворк осуществляет дедупликацию сообщения, расчет необходимых хешей и сохраняет данные в таблицы Data Vault, руководствуясь полученными метаданными.
О записи данных стоит поговорить отдельно.
Генератор SQL-запросов
Одно из преимуществ методологии Data Vault — возможность сохранения данных используя только операцию INSERT, без необходимости обновления существующих данных. Это делает методологию удобной для работы с Big Data. Однако операция сохранения требует сравнения с уже существующими данными, чтобы добавлять только новыйновые значения в hub, или для реализации SCD2 для satellite.
Чтобы не писать много повторяющихся SQL-скриптов, мы используем шаблонизацию. Данные из Spark DataFrame записываются во временные таблицы, а затем запускаются скрипты, сгенерированные на основе шаблонов для записи новых или измененных данных в целевые таблицы.
Например, для заполнения таблиц hub используется достаточно простой шаблон:
INSERT INTO {self.target_table}
SELECT tmp.*
FROM {self.tmp_table} tmp
LEFT JOIN {self.target_table} src
ON tmp.{key_column} = src.{key_column}
WHERE src.{key_column} IS NULL
Управление метаданными
Это важная часть нашего подхода к разработке Data Vault, необходимая как для отладки, так и для управления загрузкой данных.
Каждой записи в таблицах Data Vault присваивается некий TraceId — это ссылка на таблицу метаданных, в которой хранится информация об источнике данных, времени запуска и окончания работы процесса загрузки, а также информация о последнем сохраненном инкременте. При следующем запуске эта информация будет прочитана фреймворком, чтобы определить новую пачку данных для загрузки. Таким образом, метаданные предоставляют нам информацию об источнике записи, которая может использоваться для построения Data Lineage.
Кроме того, редактируя метаданные, мы можем гибко указать фреймворку, какие данные он должен обработать. Например, если необходимо перезагрузить данные за какой-то период, этого можно добиться, просто отредактировав запись о последнем инкременте в таблице метаданных.
По итогу…
Несколько выводов:
Использование архитектуры Data Lake в сочетании с методологией Data Vault позволило в нашем случае значительно сократить Time to Market для витрин данных в слое DWH.
Наличие фреймворка облегчает построение Data Vault и снижает вероятность ошибок.
При этом важно учитывать особенности вашего стека и архитектуры. Высокая нормализация, которая является особенностью Data Vault, может быть неоптимальна в некоторых случаях. Это же касается выбора между разработкой собственного фреймворка и использованием существующих решений для построения Data Vault.
На этом все! Если у вас остались вопросы — приходите в комментарии, буду рад ответить. А если вы уже не раз погружались в бескрайний океан данных и знаете много эффективных способов их хранения — присоединяйтесь к нашей команде: прямо сейчас у нас открыты вакансии разработчика и тестировщика. Давайте исследовать пучины Data Lake вместе :)
GeorgeII
Архитектура
Кусок архитектуры на скрине неясен. Верхний Spark — это скорее всего Spark-Streaming, который вычитывает из Kafka и Rabbit, здесь понятно. Но как и зачем к спарк-стримингу подключается Airflow?
Kilinsky_MA Автор
Про верхний Spark вы правы, это стриминг для чтения из очередей. Airflow у нас также используется как шедулер для запуска batch процессов по переносу данных из hadoop в слой data vault.
GeorgeII
Я к тому, что у вас по схеме Airflow кладет какие-то данные в верхний спарк-стриминг.
Kilinsky_MA Автор
Airflow выступает в качестве шедулера, возможно по схеме не совсем понятно.