Руководство «Быстрый старт по StarRocks Lakehouse» помогает быстро разобраться с технологиями Lakehouse (лейкхаус): ключевые особенности, уникальные преимущества, сценарии использования и то, как со StarRocks оперативно собрать решение. В финале — лучшие практики на основе реальных сценариев.

В этой статье представлены лучшие практики StarRocks Hive Catalog на примере прикладной задачи управления заказами.

Введение в Apache Hive

Apache Hive — распределённая, отказоустойчивая система хранилища данных, поддерживающая масштабную аналитику. Hive Metastore (HMS) предоставляет репозиторий метаданных, которые можно анализировать для принятия решений на основе данных; потому HMS — ключевой компонент во многих архитектурах озёр данных.

Hive построен поверх Apache Hadoop и через HDFS поддерживает управление данными в хранилищах S3, Azure Data Lake Storage (ADLS), Google Cloud Storage (GCS) и др. Hive позволяет с помощью SQL читать, записывать и управлять данными объёмов до петабайт.

Архитектура и ключевые особенности

  • HiveServer2 (HS2): поддержка параллельной работы множества клиентов и аутентификации; улучшенная поддержка JDBC и ODBC.

  • Hive Metastore Server (HMS): хранение метаданных таблиц и партиций; поддержка множества OSS‑инструментов (Spark, Presto и др.); важная часть озера данных.

  • Hive ACID: полная поддержка ACID для таблиц ORC; поддержка только вставок для других форматов.

  • Hive Iceberg: нативная поддержка таблиц Apache Iceberg через Hive StorageHandler; подходит для облачных высокопроизводительных сценариев.

  • Безопасность и наблюдаемость: поддержка Kerberos, интеграция с Apache Ranger и Apache Atlas.

  • Hive LLAP: интерактивные SQL‑запросы с низкой задержкой; оптимизация кэширования и ускорение запросов.

  • Оптимизация запросов: оптимизатор на основе стоимости (CBO) на базе Apache Calcite.

Преимущества Apache Hive

  • Функции DWH: базы данных, таблицы, партиции и др. упрощают управление и запросы.

  • Несколько движков выполнения: MapReduce, Tez, Spark — можно выбирать для нужной производительности.

  • Расширяемость: пользовательские функции (UDF), интеграция с инструментами экосистемы Hadoop — гибкость обработки.

  • Пакетная обработка: особенно подходит для крупномасштабной аналитики, отчётности и ETL.

  • Простая интеграция: Flume, Sqoop, Oozie и др. — расширение возможностей обработки больших данных.

Сценарии использования Apache Hive

  • Хранилище данных: преобразует данные Hadoop в SQL‑представление, предоставляя функции DWH.

  • Аналитика данных: HiveQL‑запросы, агрегирование и фильтрация для масштабной аналитики.

  • Интеллектуальный анализ данных (data mining): интеграция с ML‑инструментами для добычи знаний и поиска паттернов.

  • ETL‑операции: анализ логов и обработка исторических данных для оптимизации производительности и понимания поведения пользователей.

  • Офлайн‑обработка: подходит для офлайн‑сценариев обработки больших данных; пакетные движки поддерживают крупные задания.

  • Интеграция инструментов: бесшовная работа с Apache Spark, Mahout и др. для ускорения запросов и моделирования данных.

StarRocks Hive Catalog

Hive — классический движок на базе MapReduce, широко применяемый для пакетной и офлайн‑аналитики. В задачах аналитики в реальном времени ему часто не хватает производительности запросов и эффективности использования ресурсов.

StarRocks — это MPP‑СУБД, которая быстро обрабатывает сложные запросы на больших наборах данных, поддерживает аналитику в реальном времени и обеспечивает быстрый отклик — для сценариев, где важна мгновенная обратная связь.

StarRocks не только эффективно анализирует локально хранимые данные, но и выступает вычислительным движком для прямого анализа данных в озере (data lake). С помощью External Catalog пользователи могут без миграции данных запрашивать данные из Apache Hive, Apache Iceberg, Apache Hudi, Delta Lake и др. Поддерживаемые системы хранения: HDFS, S3, OSS; форматы — Parquet, ORC, CSV.

Благодаря StarRocks Hive Catalog достигается бесшовная интеграция StarRocks и Hive, объединяющая их сильные стороны. В аналитике по озеру данных StarRocks отвечает за вычисления и анализ, а озеро — за хранение, организацию и поддержку данных. Преимущества озера — открытые форматы хранения и гибкая схема (schema), что обеспечивает для BI/AI/ad‑hoc/отчётности единый источник истины (single source of truth). В роли вычислительного движка для озера StarRocks задействует векторизованный движок и CBO, существенно ускоряя аналитику.

Модель данных

изображение модели данных
изображение модели данных

Эволюция технической архитектуры

StarRocks Hive Catalog может эволюционировать так:

  1. Прямые запросы к данным таблиц Hive →

  2. Ускорение запросов к таблицам Hive с помощью Data Cache →

  3. Ускорение с помощью Data Cache и асинхронных материализованных представлений.

  • Прямые запросы к результатам через StarRocks Hive Catalog: весь ETL выполняется в Hive; StarRocks через Hive Catalog запрашивает витрины DWD, DWS и ADS.

  • Запросы «на лету» с использованием StarRocks Hive Catalog + Data Cache: в Hive формируются только ODS и DWD; уровни DWS и ADS считаются на лету (on the fly) через Hive Catalog StarRocks.

  • Ускорение асинхронными материализованными представлениями: в Hive остаётся лишь ODS; уровни DWD и DWS строятся асинхронными MVs StarRocks; ADS запрашивается напрямую.

изображение архитектуры
изображение архитектуры

Быстрый старт

1. Базовая среда

Component

Version

Zookeeper

3.5.7

HDFS

3.3.4

Hive

3.1.2

StarRocks

3.3.0

Node

Deployed Services

Machine Specifications

cs01.starrocks.com

StarRocks-BE

8C32G 2T High-performance Cloud Disk

Zookeeper

DataNode

JournalNode

NameNode

cs02.starrocks.com

StarRocks-FE

8C64G 2T High-performance Cloud Disk

StarRocks-BE

Zookeeper

DataNode

JournalNode

NameNode

HiveServer

HiveMetaStore

cs03.starrocks.com

StarRocks-BE

8C64G 1T High-performance Cloud Disk

Zookeeper

DataNode

JournalNode

NameNode

2. Создание таблиц в Hive

CREATE DATABASE orders;

-- ODS: вспомогательная внешняя таблица для загрузки локально сгенерированных тестовых данных
CREATE EXTERNAL TABLE IF NOT EXISTS ods_orders_text (
    order_id STRING,
    user_id STRING,
    order_time STRING,
    product_id STRING,
    quantity INT,
    price DECIMAL(10, 2),
    order_status STRING
)
COMMENT 'Таблица хранения операционных данных заказов'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

CREATE TABLE IF NOT EXISTS ods_orders (
    order_id INT,
    user_id INT,
    order_time STRING,
    product_id INT,
    quantity INT,
    price DOUBLE,
    order_status STRING
)
COMMENT 'Таблица хранения операционных данных заказов'
PARTITIONED BY (order_date STRING)
STORED AS PARQUET;

CREATE TABLE IF NOT EXISTS dim_products (
    product_id INT,
    product_name STRING,
    category_id INT,
    price DECIMAL(10, 2),
    product_description STRING
)
COMMENT 'Таблица измерений продуктов'
STORED AS PARQUET;

CREATE TABLE IF NOT EXISTS dim_categories (
    category_id INT,
    category_name STRING,
    category_description STRING
)
COMMENT 'Таблица измерений категорий'
STORED AS PARQUET;

-- DWD
CREATE TABLE IF NOT EXISTS dwd_order_facts (
    order_id STRING,
    user_id STRING,
    order_time STRING,
    product_id STRING,
    quantity INT,
    price DECIMAL(10, 2),
    order_status STRING,
    product_name STRING,
    category_id STRING,
    category_name STRING
)
COMMENT 'Фактовая таблица заказов'
PARTITIONED BY (order_date DATE)
STORED AS PARQUET;

3. Генерация данных

3.1 Таблицы измерений

-- Вспомогательная таблица для генерации последовательности
CREATE TABLE aux_order_data (seq_num INT);
#!/usr/bin/env python3
with open('aux_order_data.txt', 'w') as f:
    for i in range(1, 10000001):
        f.write("{}\n".format(i))
LOAD DATA LOCAL INPATH '/home/disk1/sr/aux_order_data.txt' INTO TABLE aux_order_data;

INSERT INTO dim_products
SELECT
    floor(RAND() * 10000) + 1 AS product_id,
    CONCAT('Название продукта', floor(RAND() * 10000) + 1) AS product_name,
    floor(RAND() * 1000) + 1 AS category_id,
    ROUND(100 + RAND() * 5000, 2) AS price,
    CONCAT('Описание продукта', floor(RAND() * 100)) AS product_description
FROM aux_order_data a
CROSS JOIN aux_order_data b
LIMIT 10000;

INSERT INTO dim_categories
SELECT
    floor(RAND() * 1000) + 1 AS category_id,
    CONCAT('Название категории', floor(RAND() * 1000) + 1) AS category_name,
    CONCAT('Описание категории', floor(RAND() * 100)) AS category_description
FROM aux_order_data a
CROSS JOIN aux_order_data b
LIMIT 1000;

3.2 Данные ODS

Отдельно формируем данные за 3–5 августа 2024 года.

#!/usr/bin/env python3
import random
import time

def generate_order_data(num_records):
    with open('ods_orders.txt', 'w') as f:
        for i in range(1, num_records + 1):
            order_id = i
            user_id = random.randint(1, 1000)
            # временные метки охватывают 3–5 августа
            order_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(random.randint(1722614400, 1722700800)))
            product_id = random.randint(1, 10000)
            quantity = random.randint(1, 10)
            price = round(random.uniform(10, 1000), 2)
            order_status = '已完成' if random.random() < 0.9 else '已取消'
            f.write(f"{order_id},{user_id},{order_time},{product_id},{quantity},{price},{order_status}\n")

generate_order_data(10000000)
LOAD DATA LOCAL INPATH '/home/disk1/sr/ods_orders.txt' INTO TABLE ods_orders_text;

INSERT OVERWRITE TABLE ods_orders PARTITION (order_date)
SELECT
  CAST(order_id AS INT),
  CAST(user_id AS INT),
  order_time,
  CAST(product_id AS INT),
  CAST(quantity AS INT),
  CAST(price AS DOUBLE),
  order_status,
  substr(order_time, 1, 10) AS order_date
FROM ods_orders_text;

3.3 Данные DWD

INSERT OVERWRITE TABLE dwd_order_facts PARTITION (order_date)
SELECT
    o.order_id,
    o.user_id,
    o.order_time,
    o.product_id,
    o.quantity,
    o.price,
    COALESCE(o.order_status, 'UNKNOWN'),
    p.product_name,
    p.category_id,
    c.category_name,
    o.order_date
FROM ods_orders o
JOIN dim_products p ON o.product_id = p.product_id
JOIN dim_categories c ON p.category_id = c.category_id
WHERE o.price > 0;

3.4 Данные DWS

CREATE TABLE IF NOT EXISTS dws_order_aggregates (
    user_id STRING,
    category_name STRING,
    order_date DATE,
    total_quantity INT,
    total_revenue DECIMAL(10, 2),
    total_orders INT
)
COMMENT 'Агрегированная витрина заказов'
STORED AS PARQUET;

INSERT OVERWRITE TABLE dws_order_aggregates
SELECT
    user_id,
    category_name,
    order_date,
    SUM(quantity) AS total_quantity,
    SUM(price * quantity) AS total_revenue,
    COUNT(DISTINCT order_id) AS total_orders
FROM dwd_order_facts
WHERE order_status = '已完成'
GROUP BY user_id, category_name, order_date;

3.5 Данные ADS

CREATE TABLE IF NOT EXISTS ads_product_order_report (
    category_name STRING,
    report_date STRING,
    total_orders INT,
    total_quantity INT,
    total_revenue DECIMAL(10, 2)
)
COMMENT 'Таблица отчёта по топ‑товарам'
STORED AS PARQUET;

WITH ranked_category_sales AS (
  SELECT
    category_name,
    order_date,
    total_quantity,
    total_revenue,
    total_orders,
    ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank
  FROM dws_order_aggregates
)
INSERT OVERWRITE TABLE ads_product_order_report
SELECT
  category_name,
  order_date,
  total_quantity,
  total_revenue,
  total_orders
FROM ranked_category_sales
WHERE revenue_rank <= 10;

4. Подключение Hive Catalog

Скопируйте конфигурацию Hive/Hadoop и перезапустите службы StarRocks:

scp hive-site.xml hdfs-site.xml core-site.xml sr@node:/home/disk1/sr/fe/conf
scp hdfs-site.xml core-site.xml sr@node:/home/disk1/sr/be/conf

./bin/stop_be.sh
./bin/start_be.sh --daemon

./bin/stop_fe.sh
./bin/start_fe.sh --daemon

5. Аналитика по озеру данных

5.1 Запрос результатов Hive через Hive Catalog

CREATE EXTERNAL CATALOG `hive_catalog_krb5_sr`
PROPERTIES (
  "hive.metastore.type"  = "hive",
  "hive.metastore.uris"  = "thrift://cs02.starrocks.com:9083",
  "type"                 = "hive"
);

SET CATALOG hive_catalog_krb5_sr;
USE orders;

-- DWS
SELECT * FROM dws_order_aggregates;

-- ADS
SELECT * FROM ads_product_order_report;

5.2 Запросы «на лету» с использованием StarRocks Hive Catalog + Data Cache

Включите Data Cache и при необходимости выполните предзагрузку кэша.

-- включить Data Cache на уровне сессии
SET enable_scan_datacache = true;

В конфигурации BE (be.conf):

datacache_disk_path = /data2/datacache
datacache_enable = true
datacache_disk_size = 200G

Предзагрузка кэша:

CACHE SELECT * FROM hive_catalog_krb5_sr.orders.dwd_order_facts;

Онлайновые витрины:

SET CATALOG hive_catalog_krb5_sr;
USE orders;

-- DWS
WITH dwd AS (
  SELECT
      user_id,
      category_name,
      order_date,
      quantity,
      price,
      order_id,
      order_status
  FROM dwd_order_facts
)
SELECT
    user_id,
    category_name,
    order_date,
    SUM(quantity) AS total_quantity,
    SUM(price * quantity) AS total_revenue,
    COUNT(DISTINCT order_id) AS total_orders
FROM dwd
WHERE order_status = '已完成'
GROUP BY user_id, category_name, order_date;

-- ADS
WITH dws_order_aggregates AS (
  SELECT
      user_id,
      category_name,
      order_date,
      SUM(quantity) AS total_quantity,
      SUM(price * quantity) AS total_revenue,
      COUNT(DISTINCT order_id) AS total_orders
  FROM dwd_order_facts
  WHERE order_status = '已完成'
  GROUP BY user_id, category_name, order_date
),
ranked_category_sales AS (
  SELECT
    category_name,
    order_date,
    total_quantity,
    total_revenue,
    total_orders,
    ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank
  FROM dws_order_aggregates
)
SELECT
  category_name,
  order_date,
  total_quantity,
  total_revenue,
  total_orders
FROM ranked_category_sales
WHERE revenue_rank <= 10;

5.3 Ускорение за счёт асинхронных материализованных представлений

SET CATALOG default_catalog;
USE orders;

CREATE MATERIALIZED VIEW dwd_order_facts_mv
PARTITION BY str2date(order_date, '%Y-%m-%d')
DISTRIBUTED BY HASH(`order_id`) BUCKETS 12
PROPERTIES ("replication_num" = "3")
REFRESH ASYNC START('2024-08-01 01:00:00') EVERY (interval 1 day) AS
SELECT
    o.order_date,
    o.order_id,
    o.user_id,
    o.order_time,
    o.product_id,
    o.quantity,
    o.price,
    COALESCE(o.order_status, 'UNKNOWN') AS order_status,
    p.product_name,
    p.category_id,
    c.category_name
FROM hive_catalog_krb5_sr.orders.ods_orders o
JOIN hive_catalog_krb5_sr.orders.dim_products p ON o.product_id = p.product_id
JOIN hive_catalog_krb5_sr.orders.dim_categories c ON p.category_id = c.category_id
WHERE o.price > 0;

Автоматическое обнаружение новых партиций (пример с 6 августа):

#!/usr/bin/env python3
import random
import time

def generate_order_data(num_records):
    with open('ods_orders_0806.txt', 'w') as f:
        for i in range(1, num_records + 1):
            order_id = i
            user_id = random.randint(1, 1000)
            order_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(random.randint(1722873600, 1722959999)))
            product_id = random.randint(1, 10000)
            quantity = random.randint(1, 10)
            price = round(random.uniform(10, 1000), 2)
            order_status = '已完成' if random.random() < 0.9 else '已取消'
            f.write(f"{order_id},{user_id},{order_time},{product_id},{quantity},{price},{order_status}\n")

generate_order_data(10000000)

На стороне Hive:

LOAD DATA LOCAL INPATH '/home/disk1/sr/ods_orders_0806.txt' INTO TABLE ods_orders_text;

-- Добавляем новые данные с автоматическим формированием значения партиции
INSERT OVERWRITE TABLE ods_orders PARTITION (order_date)
SELECT
  CAST(order_id AS INT),
  CAST(user_id AS INT),
  order_time,
  CAST(product_id AS INT),
  CAST(quantity AS INT),
  CAST(price AS DOUBLE),
  order_status,
  substr(order_time, 1, 10) AS order_date
FROM ods_orders_text
WHERE substr(order_time, 1, 10) = '2024-08-06';

Обновление и проверка MV:

-- ручной запуск обновления
REFRESH MATERIALIZED VIEW dwd_order_facts_mv;

-- проверяем статус обновления (должен быть SUCCESS)
SELECT * FROM information_schema.task_runs ORDER BY CREATE_TIME DESC LIMIT 1;

-- проверяем, что MV видит новые данные
SELECT order_date, COUNT(1) FROM dwd_order_facts_mv GROUP BY order_date;

DWS на основе MV:

SET CATALOG default_catalog;
USE orders;

SELECT
    user_id,
    category_name,
    order_date,
    SUM(quantity) AS total_quantity,
    SUM(price * quantity) AS total_revenue,
    COUNT(DISTINCT order_id) AS total_orders
FROM dwd_order_facts_mv
WHERE order_status = '已完成'
GROUP BY user_id, category_name, order_date;

ADS на основе MV:

SET CATALOG default_catalog;
USE orders;

CREATE MATERIALIZED VIEW dws_order_aggregates_mv
PARTITION BY str2date(order_date, '%Y-%m-%d')
DISTRIBUTED BY HASH(`user_id`) BUCKETS 12
PROPERTIES ("replication_num" = "3")
REFRESH ASYNC START('2024-08-01 04:00:00') EVERY (interval 1 day) AS
SELECT
    user_id,
    category_name,
    order_date,
    SUM(quantity) AS total_quantity,
    SUM(price * quantity) AS total_revenue,
    COUNT(DISTINCT order_id) AS total_orders
FROM dwd_order_facts_mv
WHERE order_status = '已完成'
GROUP BY user_id, category_name, order_date;

-- ручной запуск обновления
REFRESH MATERIALIZED VIEW dws_order_aggregates_mv;

-- проверяем статус обновления (должен быть SUCCESS)
SELECT * FROM information_schema.task_runs ORDER BY CREATE_TIME DESC LIMIT 1;

WITH ranked_category_sales AS (
  SELECT
    category_name,
    order_date,
    total_quantity,
    total_revenue,
    total_orders,
    ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank
  FROM dws_order_aggregates_mv
)
-- топ‑10 категорий
SELECT
  category_name,
  order_date,
  total_quantity,
  total_revenue,
  total_orders
FROM ranked_category_sales
WHERE revenue_rank <= 10;

Итоги

  • В большинстве случаев связка StarRocks Hive Catalog + Data Cache отлично покрывает потребности аналитики по озеру данных.

  • Асинхронные материализованные представления StarRocks упрощают ETL‑процессы, снижают сложность бизнес‑логики и обеспечивают высокую производительность запросов.

Комментарии (0)