Не зря технология называется Iceberg, потому что с первого взгляда кажется – простой инструмент, а оказывается что столько всего скрыто, что можно изучать и изучать.
В данной статье я хотел бы пробежаться по основным моментам, которые будут интересны дата-инженерам.
В статье мы рассмотрим зачем это нужно, когда это нужно, как это можно применить на практике и конечно же рассмотрим несколько примеров использования Apache Iceberg.
Введение
Apache Iceberg является более лучшей версией Hive Meta Store (HMS).
HMS имеет множество проблем, которые уже давно известны в дата-инженерии:
Привязка к HDFS
Имеет трудности при изменении модели данных
etc
Так как Apache Iceberg является улучшением концепции HMS, то он имеет ряд преимуществ:
Изменение модели данных "на лету", без лишних перегрузок данных.
Версионирование данных после каждого изменения.
Ветки при работе с данными.
Путешествие "во времени" в данных.
Можно использовать почти любой вычислитель (compute).
Я думаю, что на этом моменте стоить закончить с теорией и перейти к практике. Но если у вас есть какие-то вопросы по инструменту, то рекомендую обращаться как всегда к документации.
Стоит держать в уме, что там не описаны все краевые случаи, но точно сможете найти ответы на общие вопросы.
Также стоит отметить, что Iceberg является продуктом Apache и поэтому все инструменты, которые используются в нём тоже поддерживаются Apache (ну почти): .avro
, .parquet
, Spark
, etc.
По каждому из инструментов вы сможете также легко найти документацию. А я постараюсь ниже обратить внимание на какие-то моменты, которые нашёл при исследовании Apache Iceberg.
Поднятие сервисов
Для поднятия всех сервисов я использовал docker-compose из - Quickstart. Я его немного модифицировал под себя, но об этом ниже.
Весь код также доступен в моём репозитории.
docker-compose.yaml
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: iceberg
ports:
- "5432:5432"
networks:
- iceberg_net
rest:
image: tabulario/iceberg-rest:1.6.0
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- "8181:8181"
environment:
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres
- CATALOG_JDBC_DRIVER=org.postgresql.Driver
- CATALOG_JDBC_USER=postgres
- CATALOG_JDBC_PASSWORD=postgres
depends_on:
- postgres
minio:
image: minio/minio:RELEASE.2024-07-04T14-25-45Z
restart: always
command: server /data --console-address ":9001"
volumes:
- ./data:/data
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
- MINIO_DOMAIN=minio
ports:
- "9000:9000" # MinIO S3 API
- "9001:9001" # MinIO Console
networks:
iceberg_net:
aliases:
- warehouse.minio
mc:
depends_on:
- minio
image: minio/minio:RELEASE.2024-07-04T14-25-45Z
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minioadmin minioadmin) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
spark-iceberg:
image: tabulario/spark-iceberg:3.5.1_1.5.0
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION=us-east-1
ports:
- "8888:8888"
- "8080:8080"
- "10000:10000"
- "10001:10001"
networks:
iceberg_net:
На что здесь стоит обратить внимание:
Я указал версию для образов (на октябрь 2024).
AWS_ACCESS_KEY_ID
иAWS_SECRET_ACCESS_KEY
заменил наminioadmin
для удобства.Убрал из скрипта команду:
/usr/bin/mc rm -r --force minio/warehouse;
Если данную строку вернуть в docker-compose, то при каждому перезапуске проекта будет очищаться хранилище.Добавил сохранение мета-данных в PostgreSQL.
Meta-store
Как я указал в последнем пункте: "Добавил сохранение мета-данных в PostgreSQL". В оригинальном docker-compose
создания мета-стора выглядит так:
...
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
...
Если создать с указанным environment
, то ваш meta-store будет создаваться в формате memory
.
Данная команда будет выполнена в оригинальном образе:
CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
И ваш meta-store будет доступен внутри контейнера в папке tmp/
iceberg@e2d60003b684:/tmp$ ls -lah
total 1.1M
drwxrwxrwt 1 root root 4.0K Oct 7 10:58 .
drwxr-xr-x 1 root root 4.0K Oct 7 06:43 ..
drwxr-xr-x 2 iceberg iceberg 4.0K Oct 7 06:43 hsperfdata_iceberg
-rw-r--r-- 1 iceberg iceberg 20K Oct 7 10:58 'iceberg_rest_mode=memory'
-rwxr--r-- 1 iceberg iceberg 1.1M Oct 7 06:43 sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so
-rw-r--r-- 1 iceberg iceberg 0 Oct 7 06:43 sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so.lck
Я хотел контролировать meta-store и беспрепятственно его просматривать, поэтому изменил сборку образа на такую:
...
postgres:
image: postgres:13
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: iceberg
ports:
- "5432:5432"
networks:
- iceberg_net
rest:
image: tabulario/iceberg-rest:1.6.0
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- "8181:8181"
environment:
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres
- CATALOG_JDBC_DRIVER=org.postgresql.Driver
- CATALOG_JDBC_USER=postgres
- CATALOG_JDBC_PASSWORD=postgres
depends_on:
- postgres
...
Важно: Логин и пароль в сборке открыты, но для pet-задач этого будет достаточно.
Взаимодействие с Apache Iceberg
Основным способом для взаимодействия с Apache Iceberg является Apache Spark, но я хотел бы уделить немного времени ещё парочки инструментов.
Информация про Spark будет ниже.
API
Начнём с того что с Iceberg можно взаимодействовать при помощи официального API.
Вся спецификация API доступна по ссылке – Iceberg REST Open API specification.
Получить список namespaces
:
curl http://localhost:8181/v1/namespaces
Получить список таблиц:
curl http://localhost:8181/v1/namespaces/default/tables
DuckDB
У DuckDB существует расширение Iceberg Extension.
Стоит сразу отметить несколько моментов при работе DuckDB с Iceberg:
Расширение находится в "сыром" виде и имеет множество багов при работе.
Расширение не позволяет в "полную" силу работать с Data Lake, разрешено проводить только операцию
SELECT
и то с ограничениями. Надеюсь, что это доработают для дальнейшей интеграции DuckDB в Data Lake, а пока (октябрь 2024) не доступны операции DDL, DML. Но если вы всё таки решитесь на использование DuckDB + Iceberg, то рекомендую обращаться к официальному репозиторию duckdb_iceberg.
Ниже пару примеров того с чем я столкнулся и как это можно исправить.
Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB
В начале предлагаю сконфигурировать нашу сессию:
INSTALL iceberg;
LOAD iceberg;
INSTALL httpfs;
LOAD httpfs;
SET s3_url_style = 'path';
SET s3_endpoint = 'localhost:9000';
SET s3_access_key_id = 'minioadmin';
SET s3_secret_access_key = 'minioadmin';
SET s3_use_ssl = FALSE;
Теперь если выполнить простой запрос как указано в документации:
SELECT
*
FROM
iceberg_metadata(
's3://warehouse/default/animals/',
allow_moved_paths = TRUE
);
То мы получим ошибку:
SQL Error: java.sql.SQLException: HTTP Error: HTTP GET error on 'http://localhost:9000/warehouse/default/animals//metadata/version-hint.text' (HTTP 400)
Решение данной проблемы можно найти в данном issues. Но есть НО.
Можно читать определённый кусок мета-данных или проще говоря снэпшот:
SELECT
*
FROM
iceberg_scan('s3://warehouse/default/animals/metadata/00001-f0d7c171-b179-4857-8eec-078f0108c1a9.metadata.json');
Но если сделать операцию INSERT
/UPDATE
/DELETE
, то прошлые мета-даннные будут ссылаться на прошлый снэпшот и чтобы получить "актуальные" данные необходимо найти новые мета-данные и выполнить запрос снова:
SELECT
*
FROM
iceberg_scan('s3://warehouse/default/animals/metadata/00002-ff3b9eaf-3952-4494-a2c0-b29a78cc6bb7.metadata.json');
Важно: существует "маска" для создания новых снепшотов: 00001-*
, 00002-*
, etc
И теперь НО, про которое я сказал ранее. Если читать первый файл метаднных, то получим ошибку:
SQL Error: java.sql.SQLException: IO Error: No snapshots found
Чтобы корректно работать с Iceberg через DuckDB необходимо ждать исправления багов или вставлять костыли.
ClickHouse
ClickHouse тоже поддерживает Iceberg, но также только на уровне чтения. Об этом можно почитать в официальной документации.
Если вы ничего не знаете про ClickHouse, то рекомендую ознакомиться с моей статьей – Инфраструктура для Data-Engineer ClickHouse.
PyIceberg
Теперь давайте перейдём к одному из основных способов взаимодействия с Iceberg – это PyIceberg.
Доступ к каталогу
В начале рекомендую настроить доступ к нашему каталогу.
Из документации: This information must be placed inside a file called .pyiceberg.yaml
located either in the $HOME
or %USERPROFILE%
directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME
directory (if the corresponding environment variable is set).
Я создал файл в .pyiceberg.yaml
в корне своей учётки:
catalog:
s3_warehouse:
uri: http://127.0.0.1:8181
py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO
s3.endpoint: http://127.0.0.1:9000
s3.access-key-id: minioadmin
s3.secret-access-key: minioadmin
И теперь мы можем код вызывать таким образом:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
catalog.create_namespace("default")
Если не создать .pyiceberg.yaml
, то необходимо будет каждый раз инициализировать конфиг для подключения к каталогу таким образом:
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
name="warehouse",
**{
"uri": "http://127.0.0.1:8181",
"s3.endpoint": "http://127.0.0.1:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
},
)
catalog.create_namespace("default")
Я думаю, что лучше создать .pyiceberg.yaml
, чем дублировать код.
Создание namespace
Для дальнейшей работы нам необходимо создать namespace
– это как схема в базе данных. Для этого выполним код:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
catalog.create_namespace("default")
Создание таблицы
Таблицы можно создавать несколькими способами:
С использованием
pyiceberg.types
.С использованием
pa.field
. Второй вариант как по мне удобнее и проще. Но давайте рассмотрим каждый.
Для того чтобы создать таблицу с использованием pyiceberg.types
необходимо:
Импортировать нужные типы из пакета
pyiceberg.types
.Создать схему.
Создать таблицу с созданной схемой. Пример:
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import BinaryType, LongType, NestedField, StringType, TimestamptzType
schema = Schema(
fields=[
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
NestedField(field_id=2, name="uuid", field_type=BinaryType(), required=False, doc="Binary -> UUID"),
NestedField(field_id=3, name="name", field_type=StringType(), required=False),
NestedField(field_id=4, name="created_at", field_type=TimestamptzType(), required=False),
],
)
catalog = load_catalog("s3_warehouse")
# Comment if Table does not exist
catalog.drop_table("default.custom_table_pyiceberg_fields")
catalog.create_table(
identifier="default.custom_table_pyiceberg_fields",
schema=schema,
)
Для того чтобы создать таблицу с использованием pa.field
необходимо:
Импортировать
pyarrow
.Создать схему.
Создать таблицу. Пример:
import pyarrow as pa
from pyiceberg.catalog import load_catalog
schema = pa.schema(
[
pa.field(name="id", type=pa.int64(), nullable=True),
pa.field(name="uuid", type=pa.binary(), nullable=True),
pa.field(name="name", type=pa.string(), nullable=True),
pa.field(name="created_at", type=pa.timestamp(unit="s", tz="UTC"), nullable=True),
],
)
catalog = load_catalog("s3_warehouse")
# Comment if Table does not exist
catalog.drop_table("default.custom_table_pyarrow_fields")
catalog.create_table(
identifier="default.custom_table_pyarrow_fields",
schema=schema,
)
Пример попроще:
import pyarrow as pa
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
table = pa.table(
{
"year": [2028, 2022, 2021, 2022, 2019, 2021],
"n_legs": [2, 2, 4, 4, 5, 100],
"animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],
},
)
tbl = catalog.create_table(
identifier="default.animals", schema=table.schema,
)
tbl.append(table)
Вставка данных в таблицы
Для вставки данных в таблицу их необходимо привести к формату pyarrow.lib.Table
.
Пример:
import datetime
import uuid
from random import randint
import pyarrow as pa
from faker import Faker
from pyiceberg.catalog import load_catalog
fake = Faker(locale="ru_RU")
catalog = load_catalog("s3_warehouse")
tbl = catalog.load_table("default.custom_table_pyiceberg_fields")
pa_table = pa.table(
{
"id": [randint(a=1, b=9223372036854775806)], # noqa: S311
"uuid": [uuid.uuid4().bytes],
"name": [fake.first_name()],
"created_at": [fake.date_time_ad(tzinfo=datetime.UTC)],
},
)
tbl.append(pa_table)
print(
tbl.scan().to_arrow(),
)
Также, если вы привыкли работать с pandas.DataFrame
, то его также необходимо трансформировать в pyarrow.lib.Table
.
Пример:
import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
catalog.drop_table("default.yellow_taxi")
df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet")
table = pa.Table.from_pandas(df=df)
tbl = catalog.create_table(
identifier="default.yellow_taxi", schema=table.schema,
)
tbl.append(table)
Удаление данных из таблицы
Для удаления данных из таблицы необходимо воспользоваться методом tbl.delete()
предварительно инициализировав tbl
.
Пример:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
tbl = catalog.load_table("default.animals")
tbl.delete(delete_filter="animal == 'Bear'")
Чтение таблицы
Для чтения необходимо вызвать метод tbl.scan()
предварительно инициализировав catalog
и tbl
.
Пример:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
tbl = catalog.load_table(
"default.animals",
)
print(
tbl.scan().to_arrow(),
)
Тут стоит обратить внимание, что после вызова метода tbl.scan()
мы получаем объект типа <class 'pyiceberg.table.DataScan'>
и можем преобразовать в удобный нам формат:
.to_pandas()
.to_arrow()
etc
Удаление таблицы
Для удаления таблицы необходимо вызвать метод catalog.drop_table()
предварительно инициализировав catalog
.
Пример:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
catalog.drop_table("default.animals")
Важно: При удалении таблицы происходит удаление из мета-стора, а файлы остаются на месте. Таблица в данном случае – это просто ссылка на какой-то объект в S3.
Если вы не знаете что такое S3 или хотите самостоятельно удалить файлы из S3, то можете ознакомиться с моей статьей – Инфраструктура для data engineer S3.
Time Travel
Отличительной особенностью Iceberg является – это создание snapshots
на каждое изменение таблицы и поэтому мы можем между ними перемещаться.
Для того, чтобы получить текущий список snapshots
для таблицы необходимо выполнить следующий код:
from pyiceberg.catalog import load_catalog
import pandas as pd
pd.set_option("display.max_columns", None)
catalog = load_catalog("s3_warehouse")
table = catalog.load_table("default.custom_table_pyarrow_fields")
print(table.inspect.snapshots())
И в выводе мы увидим все id
наших snapshots
, которые сможем считать следующим образом:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("s3_warehouse")
table = catalog.load_table("default.custom_table_pyarrow_fields")
print(table.scan(snapshot_id=3826389439422852561))
Spark
Вот мы добрались и до Spark. На самом деле Spark – это на текущий момент единственный инструмент, который позволяет производить DDL и DML операции в Apache Iceberg и также позволяет использовать привычный для всех SQL-интерфейс и также Spark-синтаксис.
Вообще, все примеры работы Spark + Iceberg находятся в блокнотах, которые были предварительно созданы командой Iceberg. Для того чтобы посмотреть примеры на Spark перейдите по ссылке http://localhost:8888/.
Я постараюсь не дублировать весь код, а показать только самое важное, чтобы это можно было использовать в дальнейшем как "справочник".
Доступ к каталогу
Во время сборки контейнера уже произошла инициализация подключения к нашему хранилищу, поэтому мы можем сразу же обращаться запросами к нему, исключая инициализацию.
Для начала давайте посмотрим какие таблицы нам доступны в namespace
default
, который мы создавали ранее. Для этого выполним следующий скрипт:
%%sql
show tables in default;
Создание namespace
Для создания namespace
необходимо выполнить команду:
%%sql
CREATE DATABASE IF NOT EXISTS foo;
После этого в наш мета-стор запишется информация о создании данного namespace
, но физически он не создастся. Данный namespace
появится только после создания в нём первой таблицы.
Создание таблицы
Как я говорил выше после создания namespace
foo
мы его можем видеть в нашем мета-сторе, но физически его ещё не существует, поэтому давайте создадим таблицу следующим кодом:
%%sql
CREATE TABLE foo.bar(
id bigint
);
Также мы можем создать таблицу с партиционированием по выбранному полю:
CREATE TABLE foo.bar_partition (
id bigint,
created_at timestamp,
)
USING iceberg
PARTITIONED BY (days(created_at))
Данный код нам создаст таблицу и при каждой вставке данных в неё он будет создавать партицию за нужный день. Вот так это выглядит в самом хранилище:
.
├── data
│ ├── created_at=YYYY-MM-DD
│ ├── created_at=YYYY-MM-DD
│ ├── created_at=YYYY-MM-DD
│ ├── ...
│ ├── created_at=YYYY-MM-DD
Вы можете создавать партиции не только по дням, но и по часам, а также по перечислениям. К примеру по продавцам/покупателям/гендеру и прочее.
Важно: если вы некорректно создали партиции или вам не достаточно текущего уровня партиционирования, то можете его изменить через удаление партиций и создание новых.
Удаление текущих партиций:
%%sql
ALTER TABLE foo.bar_partition DROP PARTITION FIELD days(created_at)
Создание новых партиций:
%%sql
ALTER TABLE foo.bar_partition ADD PARTITION FIELD hours(created_at)
И после этого необходимо вызвать метод rewrite_data_files
, чтобы перезаписать партиции.
%%sql
CALL system.rewrite_data_files('foo.bar_partition')
Вставка данных в таблицы
Так как мы работаем через Spark, то у нас есть два варианта вставки данных:
через
dataframe
:
df = spark.read.parquet(".parquet")
df.writeTo("foo.bar").append()
Через привычный
INSERT
:
%%sql
INSERT INTO foo.bar(id)
VALUES (1)
Для примера не важен формат вставки, но вы можете без проблем совершать вставки на основании данных из Data Lake или из внешних источников.
Если вы хотите сгенерировать данные или поработать с реальными данными, то можете ознакомиться с моей статьей – Pet-проекты и данные для Data-Engineer.
Удаление данных из таблицы
Для удаления данных из таблицы необходимо выполнить следующий код:
%%sql
DELETE FROM foo.bar
WHERE id = 1
Чтение таблицы
Чтобы просто прочитать данные из таблицы foo.bar
необходимо выполнить код:
%%sql
SELECT * FROM foo.bar
Для того чтобы прочитать данные для дальнейшей работы над ними, необходимо выполнить код:
df = spark.sql('SELECT * FROM foo.bar')
df.show()
Удаление таблицы
Удаление таблицы происходит стандартной командой:
%%sql
DROP TABLE IF EXISTS foo.bar;
Time Travel
Как я говорил ранее, отличительной способность Iceberg является – путешествие "во времени". Поэтому каждый раз, когда мы изменяли нашу таблицу foo.bar
, при помощи INSERT
, DELETE
, UPDATE
, то у нас создавался snapshot
, к которому мы можем обратиться или даже "откатиться".
Давайте рассмотрим это по порядку.
В начале давайте посмотрим список доступных нам snapshots
по таблице:
%%sql
SELECT *
FROM foo.bar.snapshots
ORDER BY committed_at DESC
Теперь мы знаем все snapshot_id
для нашей таблицы и можем прочитать любое состояние таблицы по snapshot_id
:
%%sql
SELECT
count(*) as c
FROM
foo.bar
FOR VERSION AS OF 3331575308018494635
И также можем это сделать по полю committed_at
:
%%sql
SELECT
count(*) as c
FROM
foo.bar
FOR TIMESTAMP AS OF TIMESTAMP 'YYYY-MM-DD HH:MM:SS.000000'
Ну и конечно же мы можем "откатиться" на любое прошлое состояние. Для этого необходимо выполнить команду:
CALL system.rollback_to_snapshot('foo.bar', 3331575308018494635)
Я считаю, что это одна из уникальных особенностей Iceberg, который позволяет не только путешествовать во времени, но и соблюдать идемпотентность при построении pipeline.
Branching and Tagging DDL
Также важным свойством Iceberg является – возможность создавать ветки и теги.
Создание веток позволяет нам версионировать данные, не меняя продовые данные.
Давайте рассмотрим это на примере. Мы ранее с вами создали таблицу foo.bar
, теперь если вставить в неё несколько значений, от одного до пяти при помощи следующего кода:
%%sql
INSERT INTO foo.bar(id)
VALUES (1)
Затем мы можем проверить количество данных в нашей таблице:
SELECT count(*) AS cnt
FROM foo.bar
Теперь нам необходимо установить возможность версионирования через ветки для таблицы, для этого выполним следующий код:
%%sql
ALTER TABLE foo.bar
SET TBLPROPERTIES (
'write.wap.enabled'='true'
)
Теперь мы можем создавать ветки для нашей таблицы, давайте создадим ветку delete_one_value
:
%%sql
ALTER TABLE foo.bar
CREATE BRANCH delete_one_value
После этого нам нужно переключиться на неё:
spark.conf.set('spark.wap.branch', 'delete_one_value')
И теперь выполним нашу задачу в нужной ветке:
%%sql
DELETE FROM foo.bar
WHERE id = 1
И для проверки того, что мы всё сделали корректно можем выполнить следующий код:
%%sql
SELECT count(*) AS cnt
-- FROM foo.bar VERSION AS OF 'main'
FROM foo.bar.branch_main
И здесь мы получим то количество строк, который у нас находятся в main
.
А при выполнении следующего кода мы сможем получить то количество строк, которое соответствует ветке delete_one_value
.
%%sql
SELECT count(*) AS cnt
-- FROM foo.bar VERSION AS OF 'delete_one_value'
FROM foo.bar.branch_delete_one_value
Важно: В коде указано две версии того, как мы можем обращаться к нашим веткам.
Через указание ветки:
...
FROM foo.bar VERSION AS OF 'main'
...
Через точку и добавлением префикса:
...
FROM foo.bar.branch_main
...
Ну и теперь самое главное – это публикация изменений в main
.
Для начала нам нужно узнать какой snapshot_id
использовать для публикации.
%%sql
SELECT * FROM foo.bar.refs
Важно: мы можем опубликовать любой snapshot_id
из нашей ветки delete_one_value
, но рекомендуется использоваться последний, чтобы было всё логично.
И чтобы сделать саму публикацию необходимо выполнить код:
%%sql
CALL system.cherrypick_snapshot('foo.bar', 5235792610289419748)
Ну и в конце нужно убрать за собой, поэтому удалим ветку delete_one_value
при помощи следующего кода:
%%sql
ALTER TABLE foo.bar DROP BRANCH delete_one_value
И чтобы проверить, что всё удалено корректно мы можем выполнить данный код:
%%sql
SELECT * FROM foo.bar.refs
Заключение
Я постарался показать основные возможности для построения Data Lake на базе Apache Iceberg.
Вообще, у этого инструмента довольно много применений. Можно как строить полноценный аналитический продукт на его базе, так и использовать как "холодное" или "тёплое" хранилище. Над "горячим" я бы подумал, как его организовать, с учётом постоянных изменений модели данных и частых изменений dds, dm слоя.
Iceberg по своей сути является только абстракцией, предлагая удобный интерфейс для взаимодействия с данными. Он не является серебряной пулей, он имеет свои ограничения и тонкости.
На что я бы обратил внимание при работе с Iceberg:
Качественный DDL ваших таблиц: правильная модель данных, правильное партиционирование, название таблицы и нужная схема (
namespace
).Следить за изменениями таблицы:
INSERT
,UPDATE
,DELETE
,DROP
. В частности надDROP
, потому что при удалении таблицы вы удаляете ссылку на файл из мета-стора, а физически файлы хранятся в вашем S3. Если для вас вопрос storage важен, то это стоит контролировать.Вопрос мета-данных на уровне дата-каталога, а не на уровне DDL. Важно за данными следить, отслеживать источники, pipeline загрузки и про данные не забывать. Стоит продумать варианты того, как информировать пользователей о наличии тех или иных данных.
Бэкап meta-store, чтобы не потерять ссылки на текущие "таблицы".
Если резюмировать, то Iceberg является хорошим продуктом, который позволит вам разделить compute и storage при работе с данными. Его стоит попробовать, если в вашей инфраструктуре есть Apache Spark или есть опыт работы с Spark, чтобы его правильно "сварить" с Apache Iceberg.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.