Всем привет, меня зовут Максим Крупенин, я работаю Data & Analytics Solution Architect в EPAM Systems. За 4 года работы в EPAM мне пришлось поработать в разных проектах, связанных с BI, Big Data, Data warehouse и другими технологиями. В этой статье поделюсь одним из клиентских проектов, где мы реализовали кастомное решение для near real-time аналитики на базе Snowflake. Надеюсь, статья будет полезной, оставляйте фидбек в комментариях.
Если вы интересуетесь аналитической платформой Snowflake, то вы, вероятно, знаете, что этот продукт сейчас в тренде. Компания Snowflake Inc. вышла на IPO в сентябре 2020 года, что стало крупнейшим IPO в истории среди компаний-разработчиков программного обеспечения. Даже инвестиционный гуру Уоррен Баффет инвестировал в Snowflake (это его первое участие в IPO со времен Ford в 1956 году). Тем не менее, эта статья не об инвестициях, а о технологиях. Snowflake — действительно отличная аналитическая платформа, которая позволяет реализовывать современные интегрированные приложения для работы с данными.
За последние несколько лет компания EPAM внедрила много интересных решений, в которых мы использовали Snowflake для различных бизнес-задач наших клиентов. Об архитектуре одной из таких систем мы и расскажем в этой статье.
Бизнес-требования
Наш клиент принял решение разработать аналитический продукт нового поколения для своих клиентов в отрасли образования. Данный продукт собирает и обрабатывает данные из различных источников, позволяя клиентам принимать обоснованные решения. Функции аналитической платформы включают настраиваемые интерактивные дашборды, подключение к различным типам источников данных, self-service data discovery и аналитику в режиме реального времени.
Архитектура
Не все исходные системы, с которыми нам необходимо интегрироваться, обеспечивают механизмы для извлечения данных в режиме реального времени. Поэтому нам пришлось разработать решение, позволяющее комбинировать пакетную и потоковую обработку данных.
Snowflake и Tableau, которые являются основными технологиями нашей аналитической платформы, должны были быть дополнены другими инструментами, чтобы реализовать все функциональные и нефункциональные требования. Также важно отметить, что продукт необходимо было разработать на облачной платформе Amazon Web Services (AWS). В результате, наша команда спроектировала и внедрила решение, схожее по дизайну с подходом Lambda Architecture:
Если обобщить, то архитектура состоит из следующих трёх логических уровней:
BATCH LAYER извлекает и обрабатывает данные один раз в день, используя API систем источников. Данный слой нацелен на подготовку точных и качественных данных за предыдущий день. Это означает, что на этом уровне исправляются ошибки, данные согласовываются с другими системами-источниками, и проверяется качество данных. Выходные данные хранятся в Snowflake в слоях Core DWH и Data Mart и заменяют данные предыдущего дня, которые были сформированы в Speed Layer.
SPEED LAYER обрабатывает потоки данных из исходных систем в режиме реального времени (где это возможно), без требований к полноте и высокому качеству данных. Этот уровень направлен на минимизацию задержек, предоставляя доступ к данным в реальном времени. По сути, Speed Layer отвечает за заполнение «пробелов» — данных за текущий день отсутствующих в Batch Layer. Представления (table views) этого уровня могут быть не такими точными и полными, как те, которые, в конечном счёте, создаются на Batch Layer, но данные для аналитики доступны почти сразу по мере обработки событий в системах источниках.
SERVING LAYER объединяет данные из Batch Layer и Speed Layer через Snowflake DB views и отвечает на ad-hoc запросы конечных пользователей, поступающие из Tableau.
Детали реализации нашего решения
Далее, я подробнее остановлюсь на реализации каждого из уровней.
Извлечение и обработка пакетных данных (Batch Layer)
Ниже представлен пошаговый процесс работы с данными:
Извлечение данных (data ingestion). В качестве первого шага, данные поступают из исходной системы в Data Lake (AWS S3), используя API исходной системы. Скрипты Python запускаются с помощью Apache Airflow, который является инструментом оркестрации для всего пайплайна обработки данных. Amazon Elastic Kubernetes Service (EKS) используется для управления ресурсами для сервисов Airflow.
Загрузка данных в Snowflake. «Сырые» (raw) данные из Data Lake копируются в Snowflake в слой Landing Area. Для этого Apache Airflow запускает команды Snowflake copy, которые выполняются на Snowflake Virtual Warehouse.
Преобразование в слой Core DWH. Исходные данные из Snowflake Landing Area преобразуются в Core DWH слой, где у нас строится каноническая доменная модель данных (здесь используется подход Data Vault). Apache Airflow запускает SQL-скрипты преобразования данных, которые выполняются на Snowflake Virtual Warehouse.
Преобразование в слой Data Mart. Данные из слоя Core DWH преобразуются в data marts (тут моделируем данные по схеме “звезда”) в соответствии с конкретными бизнес-требованиями. Здесь есть два варианта в зависимости от сложности преобразований:
Виртуальные витрины данных с использованием представлений (Snowflake DB views): в этом варианте данные не сохраняются физически.
Snowflake таблицы: Apache Airflow запускает SQL-скрипты, которые выполняются на Snowflake Virtual Warehouse. Скрипты вычисляют сложные бизнес-правила и записывают информацию в таблицы витрин данных.
Обработка данных в реальном времени (Speed Layer)
На Speed Layer данные проходят следующие этапы:
Сервис асинхронных событий системы источника выступает как фильтр для захвата трафика из исходной системы в виде сообщений в формате JSON. Затем он направляет сообщения в топики (topics) Amazon MSK (сервис Kafka в AWS).
Инструмент StreamSets Data Collector (SDC) используется для извлечения событий из топиков Kafka и их обработки:
фильтрует необходимые сообщения JSON;
обогащает данные на лету с помощью API исходной системы (например, получает имя объекта, передав его ID);
применяет другие необходимые преобразования к данным (например, маскирование конфиденциальных данных и фильтрацию);
конвертирует данные в CSV-формат и помещает файл в Data Lake S3 bucket – Online Data (current date) на диаграмме;
параллельно отфильтрованные исходные сообщения JSON помещаются в архив (Data Lake S3 bucket – Online Raw Data Archive).
Используется Snowflake external table (Live View на диаграмме), которая позволяет нам запрашивать информацию непосредственно из файлов в Data Lake S3 buckets. Таким образом, данные не хранятся в базе данных Snowflake, а проходят через Snowflake Virtual Warehouse только при запросе данных с сервера Tableau.
Serving Layer
Serving Layer реализован в виде набора представлений (Snowflake DB views), которые объединяют (union SQL operator) информацию из витрин данных (подготовленных на этапе пакетной обработки) и из Snowflake external table (Live View на диаграмме).
В результате, фактические данные готовы для запросов пользователей. Tableau сервер в режиме Live Connection формирует запросы непосредственно к Snowflake.
Альтернативные варианты реализации аналитики в реальном времени с помощью Snowflake
Если у вас есть опыт работы со Snowflake, вам может быть интересно, почему мы не использовали Snowpipe для реализации непрерывной загрузки данных в базу данных. Действительно, Snowpipe позволяет загружать данные небольшими пакетами (micro-batches) из файлов, как только они становятся доступными в AWS S3. Но при таком варианте данные попадают в базу с задержкой около минуты.
В нашем же случае было требование сделать данные доступными для пользователей с меньшей задержкой (до 5 секунд). Еще одним аргументом в пользу Snowflake external tables была стоимость непрерывной загрузки данных в Snowflake. С помощью модели бессерверных (serverless) вычислений Snowpipe, Snowflake хоть и управляет эффективно выделением необходимых ресурсов за вас, но итоговая стоимость может оказаться достаточно высокой, если события формируются непрерывно.
В случае с external tables, мы не загружаем данные в Snowflake (они хранятся в S3 bucket). Соответственно, нам не нужно платить за загрузку данных. Однако также важно помнить, что по мере добавления файлов с новыми событиями в S3, внешние таблицы должны обновляться (alter external table … refresh).
Выводы
Сегодня всё больше организаций стремятся стать data-driven. В результате, требования бизнеса усложняются. Например, компании больше не хотят ждать дни, чтобы анализировать события в операционных системах, а хотят действовать проактивно, основываясь на информации, полученной почти в реальном времени.
Это приводит к необходимости внедрять новые возможности анализа данных. Snowflake — аналитическая платформа нового поколения, которая позволяет эффективно реализовывать различные сложные сценарии, об одном из которых вы узнали из этой статьи.
leventov
Основная претензия к Snowflake — data lock-in.
Open-source версия, или хотя бы публичная документация формата файлов на S3 нужна индустрии.
mkrupenin Автор
Да, есть такое мнение, но немногие это учитывают как важный фактор при выборе платформы для аналитики.
Кому-то, возможно, будет интересно почитать короткую дискуссию на эту тему — она в комментариях к статье:
Selling the Data Lakehouse
alexxz
Несколько странное требование к базе данных. Большинство OLAP решений по факту пропиетарные и не раскрывают свои форматы. А зачем? Если вам надо загрузить или выгрузить данные - куча открытых форматов подерживается. В чем там lock in вы усматриваете?
leventov
Ну если рассматривать Snowflake как базу то ОК, но если рассматривать как ground-truth data lake, то пока что есть аргументы что лучше по старинке хранить Parquet файлы на S3. Athena по функционалу и мощности — лишь подобие Snowflake. А хотелось бы большего.