Руководство по быстрому началу работы с StarRocks Lakehouse предоставляет обзор технологии Lakehouse, чтобы помочь вам быстро освоить ее ключевые функции, уникальные преимущества и сценарии применения. Данное руководство подскажет, как эффективно использовать StarRocks для построения решений. В конце статьи мы собрали практический опыт лидеров отрасли — Alibaba Cloud, Ele.me, Ximalaya и Tongcheng Travel — из мероприятия StarRocks × Paimon Streaming Lakehouse. Эти реальные кейсы помогут наглядно понять, как максимально эффективно применять StarRocks Lakehouse в реальных проектах. Надеемся, что представленные практики вдохновят вас применить StarRocks Lakehouse в собственных проектах, раскрыть его потенциал и максимизировать ценность данных.

Введение в Apache Paimon

Apache Paimon (далее — Paimon) берет начало в под-проекте Apache Flink (далее — Flink): изначально это был лишь формат для встроенного в Flink Table Store. После нескольких лет развития в 2024 году Paimon успешно вышел из инкубатора Apache Software Foundation (ASF) и стал проектом верхнего уровня Apache (Top-Level Project). Paimon построен вокруг хранилища озера данных с ACID-свойствами, поддерживает DML-операции и полностью покрывает пакетную (batch) и потоковую (stream) обработку. Он инновационно сочетает LSM-дерево с форматом хранилища озера данных, обеспечивая эффективные обновления в реальном времени и высокую эффективность компакции (compaction).

Архитектура и ключевые особенности Paimon

Архитектура

Ключевые особенности

Apache Paimon — высокопроизводительная система хранения для озера данных, ориентированная на пакетную и потоковую обработку. Основные возможности включают:

  • Единый формат для пакетной (batch) и потоковой (stream) обработки: Paimon предоставляет единый формат хранения, пригодный для обеих парадигм, что обеспечивает бесшовную аналитику между разными режимами обработки.

  • Schema Evolution: позволяет изменять схему без полной перезаписи данных, что критично при постоянно меняющихся требованиях к данным.

  • ACID-транзакции: Paimon обеспечивает ACID (атомарность, согласованность, изоляция, долговечность), что важно для сохранения целостности данных в сложных конвейерах обработки.

  • Time Travel: позволяет обращаться к историческим версиям данных, упрощая аудит, отладку и историческую аналитику.

  • Интеграция с экосистемой Big Data: Paimon бесшовно интегрируется с популярными фреймворками, такими как Apache Flink, Apache Spark и Apache Hive, упрощая внедрение и совместимость.

Преимущества Apache Paimon

Преимущества Paimon проявляются в четырех ключевых направлениях:

  • Эффективные обновления в реальном времени: Paimon предоставляет гибкие возможности потоковых обновлений с минимальной задержкой порядка одной минуты. Поддерживаются частичные обновления столбцов, агрегирующие обновления и генерация журнала изменений (change log) для нижестоящих (downstream) систем.

  • Оптимизированные потоковые операции записи и чтения: как система, происходящая из встроенного формата Flink, Paimon тесно совместим с Flink и поддерживает эффективные стримовые операции. Также Paimon тесно интегрирован со Spark, будучи отличным выбором для пакетных вычислений.

  • Высокая производительность запросов: Paimon поддерживает эффективные OLAP-запросы, точечный доступ и богатые индексы. Сообщество активно развивает индексные технологии, такие как bitmap-индексы и Bloom-фильтры, для дальнейшего повышения эффективности запросов.

  • Масштабная пакетная (офлайн) обработка: Paimon способен обрабатывать очень крупные офлайн-наборы данных и предоставляет полноценную поддержку для Append-таблиц, удовлетворяя требованиям сверхмасштабной обработки.

Сценарии использования Paimon

  • Хранилище озера данных: в качестве фундамента озера данных для управления структурированными, полуструктурированными и неструктурированными данными.

  • Потоковая обработка данных: прием потоковых данных в озеро, поддержка записи, обновлений и запросов в реальном времени, построение витрин и отчетности в реальном времени (например, мониторинг и отчеты).

  • Загрузка БД в озеро: повышение оперативности слоя ODS, замена традиционных подходов (например, онлайн-синхронизации Hive, ночных слияний) для загрузки данных.

  • Построение нижестоящего слоя данных: использование инкрементальности озера для построения слоя DWD, экономия вычислительных ресурсов.

  • Частичные обновления: поддержка локальных обновлений данных для сценариев с частыми изменениями — построение оперативных статистических витрин и отчетов, формирование широких таблиц и т.д.

  • Потоковое/инкрементальное чтение: чтение инкремента в потоке, поддержка построения DWH реального времени, значительное сокращение времени до видимости данных и снижение нагрузки на нижележащие хранилища.

Модель таблиц Paimon

Primary Key

Таблица с первичным ключом поддерживает вставки, обновления и удаления данных. Если в таблицу Paimon с первичным ключом записать несколько строк с одинаковым ключом, данные будут объединены согласно механизму слияния. Таблицы с первичным ключом подходят для сценариев, где требуются обновления/удаления и строгая согласованность данных.

Append Table

Если таблица не имеет определенного первичного ключа, по умолчанию это таблица типа Append (Append Table) — по сути аналог детальной таблицы в StarRocks. Запись нескольких одинаковых строк не приводит к перезаписи — все дубликаты сохраняются. Такой тип таблиц подходит для случаев без стримовых обновлений (например, синхронизация логов).

Append Queue

Append Queue следует рассматривать как особый тип Append Table. Каждая запись в одном бакете строго упорядочена, а потоковое чтение передает записи в нижестоящие системы точно в порядке их записи. Это похоже на раздел (partition) в Kafka, где внутри одного раздела обеспечивается строгий порядок. Применяется в пайплайнах данных (data pipelines), сценариях комплексного мониторинга состояния, обработке временных потоков и в финансовой торговле.

Time Travel

Реализован на основе файлов snapshot. Потребители могут с помощью разных snapshot-файлов запрашивать состояние данных в таблице Paimon на момент создания соответствующего snapshot.

Стратегии компакции

В Paimon стратегии компакции (compaction) схожи с universal compaction в RocksDB. По умолчанию доступны две стратегии:

  • leveled compaction (уровневая компакция) — стратегия по умолчанию в RocksDB

  • size-tiered compaction (компакция по размерным уровням)

Подход схож с текущей компакцией в StarRocks — это size-tiered compaction. Базовая идея — по возможности выполнять компакцию rowset’ов близкого размера, чтобы избежать write amplification, вызванного компакцией.

Ускоренная lakehouse-аналитика: StarRocks × Paimon

Текущие возможности StarRocks × Paimon включают:

  • Поддержка различных систем хранения, включая HDFS и объектные хранилища S3/OSS/OSS-HDFS.

  • Поддержка HMS (Hive Metastore) и системы управления метаданными Alibaba Cloud DLF.

  • Поддержка запросов к таблицам Paimon типов Primary Key и Append-Only.

  • Поддержка запросов к системным таблицам Paimon, например Read-Optimized, snapshots и т.п.

  • Поддержка join-запросов между таблицами Paimon и другими lake-форматами.

  • Поддержка join-запросов между таблицами Paimon и внутренними таблицами StarRocks.

  • Поддержка Data Cache для ускорения запросов.

  • Поддержка построения материализованных представлений на базе таблиц Paimon для прозрачного ускорения и переписывания запросов.

  • Поддержка включения Delete Vector для ускорения запросов к таблицам Paimon.

Для таблиц типа Primary Key сообщество Paimon выполнило глубокую оптимизацию производительности системной таблицы Read-Optimized, что позволяет в полной мере использовать возможности Native Reader и достигать наилучшей производительности при прямом чтении данных Paimon. При прямых запросах к таблицам Primary Key, если в них есть данные, еще не прошедшие компакцию, StarRocks читает эту часть через JNI-вызовы Java, что влечет некоторую потерю производительности. Даже в этом случае, согласно обратной связи от пользователей, среднее ускорение по сравнению с Trino превышает 3 раза.

Быстрый старт

Развертывание Paimon

Используемые версии компонентов:

  • StarRocks 3.3.0

  • Flink 1.19.1

  • Paimon 0.8.2

  • Kafka 3.7.0

Скачать Flink

Следующая ссылка — на зеркала с ускорением для машин Alibaba Cloud; для не-ALIYUN окружений можно заменить на https://mirrors.aliyun.com

wget "http://mirrors.cloud.aliyuncs.com/apache/flink/flink-1.19.1/flink-1.19.1-bin-scala\_2.12.tgz"

Распаковать:

tar -xf flink-1.19.1-bin-scala_2.12.tgz

Скачать Paimon и сопутствующие зависимости

wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/0.8.2/paimon-flink-1.19-0.8.2.jar"

# Если используется объектное хранилище, необходимо скачать следующий пакет
wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.8.2/paimon-oss-0.8.2.jar"

Скачать зависимость flink-hadoop

wget "https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar"

Если вышеуказанных JAR нет, возникнет ошибка:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

Скачать зависимость flink-sql-connector-kafka

wget "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar"

Скачать зависимость flink-connector-starrocks

wget "https://github.com/StarRocks/starrocks-connector-for-apache-flink/releases/download/v1.2.9/flink-connector-starrocks-1.2.9\_flink-1.18.jar"

Скопировать загруженные пакеты в flink/lib

cp paimon-flink-1.19-0.8.2.jar paimon-oss-0.8.2.jar flink-shaded-hadoop-2-uber-2.7.5-10.0.jar flink-connector-starrocks-1.2.9_flink-1.18.jar flink-sql-connector-kafka-3.2.0-1.18.jar flink-1.19.1/lib/

Запуск кластера Flink

cd flink-1.19.1

Измените параметр в конфигурации для задания числа слотов (пример):

# файл: conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 10

Запустите кластер:

./bin/start-cluster.sh

Развертывание Kafka

Скачать установочный пакет. Ссылка ниже — на зеркала с ускорением для машин Alibaba Cloud; для не-ALIYUN окружений можно заменить на https://mirrors.aliyun.com

wget "http://mirrors.cloud.aliyuncs.com/apache/kafka/3.7.0/kafka\_2.12-3.7.0.tgz"

Распаковать:

tar -xf kafka_2.12-3.7.0.tgz

Запустить Kafka:

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties

Тестовый сценарий

В данном примере источником данных заказов является Kafka, данные пользователей — из MySQL, а итогом является хранение в Paimon агрегатов по 5‑минутным окнам. Для упрощения демо ниже мы заменили MySQL на StarRocks.

Создать таблицу измерений и загрузить тестовые данные

CREATE TABLE `users` (
  `user_id` bigint(20) NOT NULL COMMENT "",
  `region` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`);

insert into users values
 (1,'BeiJing'),(2,'TianJin'),(3,'XiAn'),(4,'ShenZhen'),
 (5,'BeiJing'),(6,'BeiJing'),(7,'ShenZhen'),(8,'ShenZhen');

Создать в Kafka топик фактов и записать тестовые данные

./bin/kafka-topics.sh --create --topic order-details --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Генерация тестовых данных

Требуется установить пакет:

pip install kafka-python

Код генерации:

from kafka import KafkaProducer
import time
import json
import random
from datetime import datetime, timedelta

start_time = datetime(2024, 7, 24, 15, 0, 0)
end_time = datetime(2024, 7, 24, 18, 0, 0)

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

while True:
    order_id = random.randint(1, 10000)
    user_id = random.randint(1, 8)
    order_amount = round(random.uniform(10.0, 1000.0), 2)
    random_time = start_time + timedelta(seconds=random.randint(0, 3600))

    data = {
        "order_id": order_id,
        "user_id": user_id,
        "order_amount": order_amount,
        "order_time": random_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    }

    producer.send('order-details', value=json.dumps(data).encode('utf-8'))
    time.sleep(3)

producer.close()

Создать таблицы Paimon и записать тестовые данные

./bin/sql-client.sh
CREATE CATALOG my_catalog_oss WITH (
    'type' = 'paimon',
    'warehouse' = 'oss://starrocks-public/dba/jingdan/paimon',
    'fs.oss.endpoint' = 'oss-cn-zhangjiakou-internal.aliyuncs.com',
    'fs.oss.accessKeyId' = 'ak',
    'fs.oss.accessKeySecret' = 'sk'
);

USE CATALOG my_catalog_oss;

CREATE TABLE hourly_regional_sales (
  event_time TIMESTAMP(3),
  region STRING,
  total_sales DECIMAL(10, 2)
);

USE CATALOG default_catalog;

CREATE TABLE orders_kafka (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10, 2),
  order_time TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'order-details',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'order-consumer',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

CREATE TABLE users_starrocks (
  user_id BIGINT,
  region STRING
) WITH (
  'connector'='starrocks',
  'scan-url'='172.26.92.154:8030',
  'jdbc-url'='jdbc:mysql://172.26.92.154:9030',
  'username'='root',
  'password'='xxx',
  'database-name'='jd',
  'table-name'='users'
);

SET 'execution.checkpointing.interval' = '10 s';

INSERT INTO my_catalog_oss.`default`.hourly_regional_sales
SELECT
  TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS event_time,
  u.region,
  CAST(SUM(o.order_amount) AS DECIMAL(10, 2)) AS total_sales
FROM default_catalog.`default_database`.orders_kafka AS o
JOIN default_catalog.`default_database`.users_starrocks AS u
  ON o.user_id = u.user_id
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE), u.region;

Запрос данных

SELECT * FROM my_catalog_oss.`default`.hourly_regional_sales;

Запрос инкрементальных данных (Batch Time Travel)

SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('scan.snapshot-id' = '2') */;
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('incremental-between' = '5,10') */;

Создать Paimon Catalog в StarRocks

CREATE EXTERNAL CATALOG paimon_catalog_oss
PROPERTIES
(
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "oss://starrocks-public/dba/jingdan/paimon",
    "aliyun.oss.access_key" = "ak",
    "aliyun.oss.secret_key" = "sk",
    "aliyun.oss.endpoint" = "oss-cn-zhangjiakou-internal.aliyuncs.com"
);
SET CATALOG paimon_catalog_oss;
USE `default`;
SELECT * FROM hourly_regional_sales;

На стороне StarRocks можно в реальном времени наблюдать изменения в сводной таблице.

Комментарии (0)