Руководство по быстрому началу работы с StarRocks Lakehouse предоставляет обзор технологии Lakehouse, чтобы помочь вам быстро освоить ее ключевые функции, уникальные преимущества и сценарии применения. Данное руководство подскажет, как эффективно использовать StarRocks для построения решений. В конце статьи мы собрали практический опыт лидеров отрасли — Alibaba Cloud, Ele.me, Ximalaya и Tongcheng Travel — из мероприятия StarRocks × Paimon Streaming Lakehouse. Эти реальные кейсы помогут наглядно понять, как максимально эффективно применять StarRocks Lakehouse в реальных проектах. Надеемся, что представленные практики вдохновят вас применить StarRocks Lakehouse в собственных проектах, раскрыть его потенциал и максимизировать ценность данных.
Введение в Apache Paimon
Apache Paimon (далее — Paimon) берет начало в под-проекте Apache Flink (далее — Flink): изначально это был лишь формат для встроенного в Flink Table Store. После нескольких лет развития в 2024 году Paimon успешно вышел из инкубатора Apache Software Foundation (ASF) и стал проектом верхнего уровня Apache (Top-Level Project). Paimon построен вокруг хранилища озера данных с ACID-свойствами, поддерживает DML-операции и полностью покрывает пакетную (batch) и потоковую (stream) обработку. Он инновационно сочетает LSM-дерево с форматом хранилища озера данных, обеспечивая эффективные обновления в реальном времени и высокую эффективность компакции (compaction).
Архитектура и ключевые особенности Paimon
Архитектура

Ключевые особенности
Apache Paimon — высокопроизводительная система хранения для озера данных, ориентированная на пакетную и потоковую обработку. Основные возможности включают:
Единый формат для пакетной (batch) и потоковой (stream) обработки: Paimon предоставляет единый формат хранения, пригодный для обеих парадигм, что обеспечивает бесшовную аналитику между разными режимами обработки.
Schema Evolution: позволяет изменять схему без полной перезаписи данных, что критично при постоянно меняющихся требованиях к данным.
ACID-транзакции: Paimon обеспечивает ACID (атомарность, согласованность, изоляция, долговечность), что важно для сохранения целостности данных в сложных конвейерах обработки.
Time Travel: позволяет обращаться к историческим версиям данных, упрощая аудит, отладку и историческую аналитику.
Интеграция с экосистемой Big Data: Paimon бесшовно интегрируется с популярными фреймворками, такими как Apache Flink, Apache Spark и Apache Hive, упрощая внедрение и совместимость.
Преимущества Apache Paimon
Преимущества Paimon проявляются в четырех ключевых направлениях:
Эффективные обновления в реальном времени: Paimon предоставляет гибкие возможности потоковых обновлений с минимальной задержкой порядка одной минуты. Поддерживаются частичные обновления столбцов, агрегирующие обновления и генерация журнала изменений (change log) для нижестоящих (downstream) систем.
Оптимизированные потоковые операции записи и чтения: как система, происходящая из встроенного формата Flink, Paimon тесно совместим с Flink и поддерживает эффективные стримовые операции. Также Paimon тесно интегрирован со Spark, будучи отличным выбором для пакетных вычислений.
Высокая производительность запросов: Paimon поддерживает эффективные OLAP-запросы, точечный доступ и богатые индексы. Сообщество активно развивает индексные технологии, такие как bitmap-индексы и Bloom-фильтры, для дальнейшего повышения эффективности запросов.
Масштабная пакетная (офлайн) обработка: Paimon способен обрабатывать очень крупные офлайн-наборы данных и предоставляет полноценную поддержку для Append-таблиц, удовлетворяя требованиям сверхмасштабной обработки.
Сценарии использования Paimon
Хранилище озера данных: в качестве фундамента озера данных для управления структурированными, полуструктурированными и неструктурированными данными.
Потоковая обработка данных: прием потоковых данных в озеро, поддержка записи, обновлений и запросов в реальном времени, построение витрин и отчетности в реальном времени (например, мониторинг и отчеты).
Загрузка БД в озеро: повышение оперативности слоя ODS, замена традиционных подходов (например, онлайн-синхронизации Hive, ночных слияний) для загрузки данных.
Построение нижестоящего слоя данных: использование инкрементальности озера для построения слоя DWD, экономия вычислительных ресурсов.
Частичные обновления: поддержка локальных обновлений данных для сценариев с частыми изменениями — построение оперативных статистических витрин и отчетов, формирование широких таблиц и т.д.
Потоковое/инкрементальное чтение: чтение инкремента в потоке, поддержка построения DWH реального времени, значительное сокращение времени до видимости данных и снижение нагрузки на нижележащие хранилища.
Модель таблиц Paimon
Primary Key
Таблица с первичным ключом поддерживает вставки, обновления и удаления данных. Если в таблицу Paimon с первичным ключом записать несколько строк с одинаковым ключом, данные будут объединены согласно механизму слияния. Таблицы с первичным ключом подходят для сценариев, где требуются обновления/удаления и строгая согласованность данных.
Append Table
Если таблица не имеет определенного первичного ключа, по умолчанию это таблица типа Append (Append Table) — по сути аналог детальной таблицы в StarRocks. Запись нескольких одинаковых строк не приводит к перезаписи — все дубликаты сохраняются. Такой тип таблиц подходит для случаев без стримовых обновлений (например, синхронизация логов).
Append Queue
Append Queue следует рассматривать как особый тип Append Table. Каждая запись в одном бакете строго упорядочена, а потоковое чтение передает записи в нижестоящие системы точно в порядке их записи. Это похоже на раздел (partition) в Kafka, где внутри одного раздела обеспечивается строгий порядок. Применяется в пайплайнах данных (data pipelines), сценариях комплексного мониторинга состояния, обработке временных потоков и в финансовой торговле.
Time Travel
Реализован на основе файлов snapshot. Потребители могут с помощью разных snapshot-файлов запрашивать состояние данных в таблице Paimon на момент создания соответствующего snapshot.
Стратегии компакции
В Paimon стратегии компакции (compaction) схожи с universal compaction в RocksDB. По умолчанию доступны две стратегии:
leveled compaction (уровневая компакция) — стратегия по умолчанию в RocksDB
size-tiered compaction (компакция по размерным уровням)
Подход схож с текущей компакцией в StarRocks — это size-tiered compaction. Базовая идея — по возможности выполнять компакцию rowset’ов близкого размера, чтобы избежать write amplification, вызванного компакцией.
Ускоренная lakehouse-аналитика: StarRocks × Paimon

Текущие возможности StarRocks × Paimon включают:
Поддержка различных систем хранения, включая HDFS и объектные хранилища S3/OSS/OSS-HDFS.
Поддержка HMS (Hive Metastore) и системы управления метаданными Alibaba Cloud DLF.
Поддержка запросов к таблицам Paimon типов Primary Key и Append-Only.
Поддержка запросов к системным таблицам Paimon, например Read-Optimized, snapshots и т.п.
Поддержка join-запросов между таблицами Paimon и другими lake-форматами.
Поддержка join-запросов между таблицами Paimon и внутренними таблицами StarRocks.
Поддержка Data Cache для ускорения запросов.
Поддержка построения материализованных представлений на базе таблиц Paimon для прозрачного ускорения и переписывания запросов.
Поддержка включения Delete Vector для ускорения запросов к таблицам Paimon.
Для таблиц типа Primary Key сообщество Paimon выполнило глубокую оптимизацию производительности системной таблицы Read-Optimized, что позволяет в полной мере использовать возможности Native Reader и достигать наилучшей производительности при прямом чтении данных Paimon. При прямых запросах к таблицам Primary Key, если в них есть данные, еще не прошедшие компакцию, StarRocks читает эту часть через JNI-вызовы Java, что влечет некоторую потерю производительности. Даже в этом случае, согласно обратной связи от пользователей, среднее ускорение по сравнению с Trino превышает 3 раза.
Быстрый старт
Развертывание Paimon
Используемые версии компонентов:
StarRocks 3.3.0
Flink 1.19.1
Paimon 0.8.2
Kafka 3.7.0
Скачать Flink
Следующая ссылка — на зеркала с ускорением для машин Alibaba Cloud; для не-ALIYUN окружений можно заменить на https://mirrors.aliyun.com
wget "http://mirrors.cloud.aliyuncs.com/apache/flink/flink-1.19.1/flink-1.19.1-bin-scala\_2.12.tgz"
Распаковать:
tar -xf flink-1.19.1-bin-scala_2.12.tgz
Скачать Paimon и сопутствующие зависимости
wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/0.8.2/paimon-flink-1.19-0.8.2.jar"
# Если используется объектное хранилище, необходимо скачать следующий пакет
wget "https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.8.2/paimon-oss-0.8.2.jar"
Скачать зависимость flink-hadoop
wget "https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar"
Если вышеуказанных JAR нет, возникнет ошибка:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
Скачать зависимость flink-sql-connector-kafka
wget "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar"
Скачать зависимость flink-connector-starrocks
wget "https://github.com/StarRocks/starrocks-connector-for-apache-flink/releases/download/v1.2.9/flink-connector-starrocks-1.2.9\_flink-1.18.jar"
Скопировать загруженные пакеты в flink/lib
cp paimon-flink-1.19-0.8.2.jar paimon-oss-0.8.2.jar flink-shaded-hadoop-2-uber-2.7.5-10.0.jar flink-connector-starrocks-1.2.9_flink-1.18.jar flink-sql-connector-kafka-3.2.0-1.18.jar flink-1.19.1/lib/
Запуск кластера Flink
cd flink-1.19.1
Измените параметр в конфигурации для задания числа слотов (пример):
# файл: conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 10
Запустите кластер:
./bin/start-cluster.sh
Развертывание Kafka
Скачать установочный пакет. Ссылка ниже — на зеркала с ускорением для машин Alibaba Cloud; для не-ALIYUN окружений можно заменить на https://mirrors.aliyun.com
wget "http://mirrors.cloud.aliyuncs.com/apache/kafka/3.7.0/kafka\_2.12-3.7.0.tgz"
Распаковать:
tar -xf kafka_2.12-3.7.0.tgz
Запустить Kafka:
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties
Тестовый сценарий
В данном примере источником данных заказов является Kafka, данные пользователей — из MySQL, а итогом является хранение в Paimon агрегатов по 5‑минутным окнам. Для упрощения демо ниже мы заменили MySQL на StarRocks.

Создать таблицу измерений и загрузить тестовые данные
CREATE TABLE `users` (
`user_id` bigint(20) NOT NULL COMMENT "",
`region` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`);
insert into users values
(1,'BeiJing'),(2,'TianJin'),(3,'XiAn'),(4,'ShenZhen'),
(5,'BeiJing'),(6,'BeiJing'),(7,'ShenZhen'),(8,'ShenZhen');
Создать в Kafka топик фактов и записать тестовые данные
./bin/kafka-topics.sh --create --topic order-details --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Генерация тестовых данных
Требуется установить пакет:
pip install kafka-python
Код генерации:
from kafka import KafkaProducer
import time
import json
import random
from datetime import datetime, timedelta
start_time = datetime(2024, 7, 24, 15, 0, 0)
end_time = datetime(2024, 7, 24, 18, 0, 0)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
while True:
order_id = random.randint(1, 10000)
user_id = random.randint(1, 8)
order_amount = round(random.uniform(10.0, 1000.0), 2)
random_time = start_time + timedelta(seconds=random.randint(0, 3600))
data = {
"order_id": order_id,
"user_id": user_id,
"order_amount": order_amount,
"order_time": random_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
}
producer.send('order-details', value=json.dumps(data).encode('utf-8'))
time.sleep(3)
producer.close()
Создать таблицы Paimon и записать тестовые данные
./bin/sql-client.sh
CREATE CATALOG my_catalog_oss WITH (
'type' = 'paimon',
'warehouse' = 'oss://starrocks-public/dba/jingdan/paimon',
'fs.oss.endpoint' = 'oss-cn-zhangjiakou-internal.aliyuncs.com',
'fs.oss.accessKeyId' = 'ak',
'fs.oss.accessKeySecret' = 'sk'
);
USE CATALOG my_catalog_oss;
CREATE TABLE hourly_regional_sales (
event_time TIMESTAMP(3),
region STRING,
total_sales DECIMAL(10, 2)
);
USE CATALOG default_catalog;
CREATE TABLE orders_kafka (
order_id BIGINT,
user_id BIGINT,
order_amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'order-details',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'order-consumer',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE users_starrocks (
user_id BIGINT,
region STRING
) WITH (
'connector'='starrocks',
'scan-url'='172.26.92.154:8030',
'jdbc-url'='jdbc:mysql://172.26.92.154:9030',
'username'='root',
'password'='xxx',
'database-name'='jd',
'table-name'='users'
);
SET 'execution.checkpointing.interval' = '10 s';
INSERT INTO my_catalog_oss.`default`.hourly_regional_sales
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS event_time,
u.region,
CAST(SUM(o.order_amount) AS DECIMAL(10, 2)) AS total_sales
FROM default_catalog.`default_database`.orders_kafka AS o
JOIN default_catalog.`default_database`.users_starrocks AS u
ON o.user_id = u.user_id
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE), u.region;
Запрос данных
SELECT * FROM my_catalog_oss.`default`.hourly_regional_sales;

Запрос инкрементальных данных (Batch Time Travel)
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('scan.snapshot-id' = '2') */;

SET 'execution.runtime-mode' = 'batch';
SELECT * FROM hourly_regional_sales /*+ OPTIONS('incremental-between' = '5,10') */;


Создать Paimon Catalog в StarRocks
CREATE EXTERNAL CATALOG paimon_catalog_oss
PROPERTIES
(
"type" = "paimon",
"paimon.catalog.type" = "filesystem",
"paimon.catalog.warehouse" = "oss://starrocks-public/dba/jingdan/paimon",
"aliyun.oss.access_key" = "ak",
"aliyun.oss.secret_key" = "sk",
"aliyun.oss.endpoint" = "oss-cn-zhangjiakou-internal.aliyuncs.com"
);
SET CATALOG paimon_catalog_oss;
USE `default`;
SELECT * FROM hourly_regional_sales;
На стороне StarRocks можно в реальном времени наблюдать изменения в сводной таблице.
