Не зря технология называется Iceberg, потому что с первого взгляда кажется – простой инструмент, а оказывается что столько всего скрыто, что можно изучать и изучать.

В данной статье я хотел бы пробежаться по основным моментам, которые будут интересны дата-инженерам.

В статье мы рассмотрим зачем это нужно, когда это нужно, как это можно применить на практике и конечно же рассмотрим несколько примеров использования Apache Iceberg.

Введение

Apache Iceberg является более лучшей версией Hive Meta Store (HMS).

HMS имеет множество проблем, которые уже давно известны в дата-инженерии:

  • Привязка к HDFS

  • Имеет трудности при изменении модели данных

  • etc

Так как Apache Iceberg является улучшением концепции HMS, то он имеет ряд преимуществ:

  • Изменение модели данных "на лету", без лишних перегрузок данных.

  • Версионирование данных после каждого изменения.

  • Ветки при работе с данными.

  • Путешествие "во времени" в данных.

  • Можно использовать почти любой вычислитель (compute).

Я думаю, что на этом моменте стоить закончить с теорией и перейти к практике. Но если у вас есть какие-то вопросы по инструменту, то рекомендую обращаться как всегда к документации.

Стоит держать в уме, что там не описаны все краевые случаи, но точно сможете найти ответы на общие вопросы.

Также стоит отметить, что Iceberg является продуктом Apache и поэтому все инструменты, которые используются в нём тоже поддерживаются Apache (ну почти): .avro, .parquet, Spark, etc.

По каждому из инструментов вы сможете также легко найти документацию. А я постараюсь ниже обратить внимание на какие-то моменты, которые нашёл при исследовании Apache Iceberg.

Поднятие сервисов

Для поднятия всех сервисов я использовал docker-compose из - Quickstart. Я его немного модифицировал под себя, но об этом ниже.

Весь код также доступен в моём репозитории.

docker-compose.yaml
version: '3.8'

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: iceberg
    ports:
      - "5432:5432"
    networks:
      - iceberg_net

  rest:
    image: tabulario/iceberg-rest:1.6.0
    container_name: iceberg-rest
    networks:
      iceberg_net:
    ports:
      - "8181:8181"
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
      - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres
      - CATALOG_JDBC_DRIVER=org.postgresql.Driver
      - CATALOG_JDBC_USER=postgres
      - CATALOG_JDBC_PASSWORD=postgres
    depends_on:
      - postgres

  minio:
    image: minio/minio:RELEASE.2024-07-04T14-25-45Z
    restart: always
    command: server /data --console-address ":9001"
    volumes:
      - ./data:/data
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
      - MINIO_DOMAIN=minio
    ports:
      - "9000:9000"  # MinIO S3 API
      - "9001:9001"  # MinIO Console
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio

  mc:
    depends_on:
      - minio
    image: minio/minio:RELEASE.2024-07-04T14-25-45Z
    container_name: mc
    networks:
      iceberg_net:
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 minioadmin minioadmin) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "
  spark-iceberg:
    image: tabulario/spark-iceberg:3.5.1_1.5.0
    container_name: spark-iceberg
    build: spark/
    networks:
      iceberg_net:
    depends_on:
      - rest
      - minio
    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - AWS_REGION=us-east-1
    ports:
      - "8888:8888"
      - "8080:8080"
      - "10000:10000"
      - "10001:10001"

networks:
  iceberg_net:

На что здесь стоит обратить внимание:

  • Я указал версию для образов (на октябрь 2024).

  • AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY заменил на minioadmin для удобства.

  • Убрал из скрипта команду: /usr/bin/mc rm -r --force minio/warehouse; Если данную строку вернуть в docker-compose, то при каждому перезапуске проекта будет очищаться хранилище.

  • Добавил сохранение мета-данных в PostgreSQL.

Meta-store

Как я указал в последнем пункте: "Добавил сохранение мета-данных в PostgreSQL". В оригинальном docker-compose создания мета-стора выглядит так:

...
rest:
    image: tabulario/iceberg-rest
    container_name: iceberg-rest
    networks:
      iceberg_net:
    ports:
      - 8181:8181
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
...

Если создать с указанным environment, то ваш meta-store будет создаваться в формате memory.

Данная команда будет выполнена в оригинальном образе:

CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory

И ваш meta-store будет доступен внутри контейнера в папке tmp/

iceberg@e2d60003b684:/tmp$ ls -lah
total 1.1M
drwxrwxrwt 1 root    root    4.0K Oct  7 10:58  .
drwxr-xr-x 1 root    root    4.0K Oct  7 06:43  ..
drwxr-xr-x 2 iceberg iceberg 4.0K Oct  7 06:43  hsperfdata_iceberg
-rw-r--r-- 1 iceberg iceberg  20K Oct  7 10:58 'iceberg_rest_mode=memory'
-rwxr--r-- 1 iceberg iceberg 1.1M Oct  7 06:43  sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so
-rw-r--r-- 1 iceberg iceberg    0 Oct  7 06:43  sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so.lck

Я хотел контролировать meta-store и беспрепятственно его просматривать, поэтому изменил сборку образа на такую:

...
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: iceberg
    ports:
      - "5432:5432"
    networks:
      - iceberg_net

  rest:
    image: tabulario/iceberg-rest:1.6.0
    container_name: iceberg-rest
    networks:
      iceberg_net:
    ports:
      - "8181:8181"
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
      - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres
      - CATALOG_JDBC_DRIVER=org.postgresql.Driver
      - CATALOG_JDBC_USER=postgres
      - CATALOG_JDBC_PASSWORD=postgres
    depends_on:
      - postgres
...

Важно: Логин и пароль в сборке открыты, но для pet-задач этого будет достаточно.

Взаимодействие с Apache Iceberg

Основным способом для взаимодействия с Apache Iceberg является Apache Spark, но я хотел бы уделить немного времени ещё парочки инструментов.

Информация про Spark будет ниже.

API

Начнём с того что с Iceberg можно взаимодействовать при помощи официального API.

Вся спецификация API доступна по ссылке – Iceberg REST Open API specification.

Получить список namespaces:

curl http://localhost:8181/v1/namespaces

Получить список таблиц:

curl http://localhost:8181/v1/namespaces/default/tables

DuckDB

У DuckDB существует расширение Iceberg Extension.

Стоит сразу отметить несколько моментов при работе DuckDB с Iceberg:

  • Расширение находится в "сыром" виде и имеет множество багов при работе.

  • Расширение не позволяет в "полную" силу работать с Data Lake, разрешено проводить только операцию SELECT и то с ограничениями. Надеюсь, что это доработают для дальнейшей интеграции DuckDB в Data Lake, а пока (октябрь 2024) не доступны операции DDL, DML. Но если вы всё таки решитесь на использование DuckDB + Iceberg, то рекомендую обращаться к официальному репозиторию duckdb_iceberg.

Ниже пару примеров того с чем я столкнулся и как это можно исправить.

Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB

В начале предлагаю сконфигурировать нашу сессию:

INSTALL iceberg;
LOAD iceberg;
INSTALL httpfs;
LOAD httpfs;
SET s3_url_style = 'path';
SET s3_endpoint = 'localhost:9000';
SET s3_access_key_id = 'minioadmin';
SET s3_secret_access_key = 'minioadmin';
SET s3_use_ssl = FALSE;

Теперь если выполнить простой запрос как указано в документации:

SELECT
	*
FROM
	iceberg_metadata(
		's3://warehouse/default/animals/',
		allow_moved_paths = TRUE
);

То мы получим ошибку:

SQL Error: java.sql.SQLException: HTTP Error: HTTP GET error on 'http://localhost:9000/warehouse/default/animals//metadata/version-hint.text' (HTTP 400)

Решение данной проблемы можно найти в данном issues. Но есть НО.

Можно читать определённый кусок мета-данных или проще говоря снэпшот:

SELECT
	*
FROM
	iceberg_scan('s3://warehouse/default/animals/metadata/00001-f0d7c171-b179-4857-8eec-078f0108c1a9.metadata.json');

Но если сделать операцию INSERT/UPDATE/DELETE, то прошлые мета-даннные будут ссылаться на прошлый снэпшот и чтобы получить "актуальные" данные необходимо найти новые мета-данные и выполнить запрос снова:

SELECT
	*
FROM
	iceberg_scan('s3://warehouse/default/animals/metadata/00002-ff3b9eaf-3952-4494-a2c0-b29a78cc6bb7.metadata.json');

Важно: существует "маска" для создания новых снепшотов: 00001-*, 00002-*, etc

И теперь НО, про которое я сказал ранее. Если читать первый файл метаднных, то получим ошибку:

SQL Error: java.sql.SQLException: IO Error: No snapshots found

Чтобы корректно работать с Iceberg через DuckDB необходимо ждать исправления багов или вставлять костыли.

ClickHouse

ClickHouse тоже поддерживает Iceberg, но также только на уровне чтения. Об этом можно почитать в официальной документации.

Если вы ничего не знаете про ClickHouse, то рекомендую ознакомиться с моей статьей – Инфраструктура для Data-Engineer ClickHouse.

PyIceberg

Теперь давайте перейдём к одному из основных способов взаимодействия с Iceberg – это PyIceberg.

Доступ к каталогу

В начале рекомендую настроить доступ к нашему каталогу.

Из документации: This information must be placed inside a file called .pyiceberg.yaml located either in the $HOME or %USERPROFILE% directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME directory (if the corresponding environment variable is set).

Я создал файл в .pyiceberg.yaml в корне своей учётки:

catalog:
  s3_warehouse:
    uri: http://127.0.0.1:8181
    py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO
    s3.endpoint: http://127.0.0.1:9000
    s3.access-key-id: minioadmin
    s3.secret-access-key: minioadmin

И теперь мы можем код вызывать таким образом:

from pyiceberg.catalog import load_catalog  
    
catalog = load_catalog("s3_warehouse")  
  
catalog.create_namespace("default")

Если не создать .pyiceberg.yaml, то необходимо будет каждый раз инициализировать конфиг для подключения к каталогу таким образом:

from pyiceberg.catalog import load_catalog  
  
catalog = load_catalog(  
    name="warehouse",  
    **{  
        "uri": "http://127.0.0.1:8181",  
        "s3.endpoint": "http://127.0.0.1:9000",  
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",  
        "s3.access-key-id": "minioadmin",  
        "s3.secret-access-key": "minioadmin",  
    },  
)
  
catalog.create_namespace("default")

Я думаю, что лучше создать .pyiceberg.yaml, чем дублировать код.

Создание namespace

Для дальнейшей работы нам необходимо создать namespace – это как схема в базе данных. Для этого выполним код:

from pyiceberg.catalog import load_catalog  
    
catalog = load_catalog("s3_warehouse")  
  
catalog.create_namespace("default")

Создание таблицы

Таблицы можно создавать несколькими способами:

  • С использованием pyiceberg.types.

  • С использованием pa.field. Второй вариант как по мне удобнее и проще. Но давайте рассмотрим каждый.

Для того чтобы создать таблицу с использованием pyiceberg.types необходимо:

  1. Импортировать нужные типы из пакета pyiceberg.types.

  2. Создать схему.

  3. Создать таблицу с созданной схемой. Пример:

from pyiceberg.catalog import load_catalog  
from pyiceberg.schema import Schema  
from pyiceberg.types import BinaryType, LongType, NestedField, StringType, TimestamptzType  
  
schema = Schema(  
    fields=[  
        NestedField(field_id=1, name="id", field_type=LongType(), required=False),  
        NestedField(field_id=2, name="uuid", field_type=BinaryType(), required=False, doc="Binary -> UUID"),  
        NestedField(field_id=3, name="name", field_type=StringType(), required=False),  
        NestedField(field_id=4, name="created_at", field_type=TimestamptzType(), required=False),  
    ],  
)  
  
catalog = load_catalog("s3_warehouse")  
  
# Comment if Table does not exist  
catalog.drop_table("default.custom_table_pyiceberg_fields")  
  
catalog.create_table(  
    identifier="default.custom_table_pyiceberg_fields",  
    schema=schema,  
)

Для того чтобы создать таблицу с использованием pa.field необходимо:

  1. Импортировать pyarrow.

  2. Создать схему.

  3. Создать таблицу. Пример:

import pyarrow as pa  
from pyiceberg.catalog import load_catalog  
  
schema = pa.schema(  
    [  
        pa.field(name="id", type=pa.int64(), nullable=True),  
        pa.field(name="uuid", type=pa.binary(), nullable=True),  
        pa.field(name="name", type=pa.string(), nullable=True),  
        pa.field(name="created_at", type=pa.timestamp(unit="s", tz="UTC"), nullable=True),  
    ],  
)  
  
catalog = load_catalog("s3_warehouse")  
  
# Comment if Table does not exist  
catalog.drop_table("default.custom_table_pyarrow_fields")  
  
catalog.create_table(  
    identifier="default.custom_table_pyarrow_fields",  
    schema=schema,  
)

Пример попроще:

import pyarrow as pa
from pyiceberg.catalog import load_catalog

catalog = load_catalog("s3_warehouse")

table = pa.table(
    {
        "year": [2028, 2022, 2021, 2022, 2019, 2021],
        "n_legs": [2, 2, 4, 4, 5, 100],
        "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],
    },
)

tbl = catalog.create_table(
    identifier="default.animals", schema=table.schema,
)

tbl.append(table)

Вставка данных в таблицы

Для вставки данных в таблицу их необходимо привести к формату pyarrow.lib.Table.

Пример:

import datetime
import uuid
from random import randint

import pyarrow as pa
from faker import Faker
from pyiceberg.catalog import load_catalog

fake = Faker(locale="ru_RU")

catalog = load_catalog("s3_warehouse")

tbl = catalog.load_table("default.custom_table_pyiceberg_fields")

pa_table = pa.table(
        {
            "id": [randint(a=1, b=9223372036854775806)],  # noqa: S311
            "uuid": [uuid.uuid4().bytes],
            "name": [fake.first_name()],
            "created_at": [fake.date_time_ad(tzinfo=datetime.UTC)],
        },
    )

tbl.append(pa_table)

print(
    tbl.scan().to_arrow(),
)

Также, если вы привыкли работать с pandas.DataFrame, то его также необходимо трансформировать в pyarrow.lib.Table.

Пример:

import pandas as pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog

catalog = load_catalog("s3_warehouse")

catalog.drop_table("default.yellow_taxi")

df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet")
table = pa.Table.from_pandas(df=df)

tbl = catalog.create_table(
    identifier="default.yellow_taxi", schema=table.schema,
)

tbl.append(table)

Удаление данных из таблицы

Для удаления данных из таблицы необходимо воспользоваться методом tbl.delete() предварительно инициализировав tbl.

Пример:

from pyiceberg.catalog import load_catalog  
  
catalog = load_catalog("s3_warehouse")  
  
tbl = catalog.load_table("default.animals")  
  
tbl.delete(delete_filter="animal == 'Bear'")

Чтение таблицы

Для чтения необходимо вызвать метод tbl.scan() предварительно инициализировав catalog и tbl.

Пример:

from pyiceberg.catalog import load_catalog

catalog = load_catalog("s3_warehouse")

tbl = catalog.load_table(
    "default.animals",
)

print(
    tbl.scan().to_arrow(),
)

Тут стоит обратить внимание, что после вызова метода tbl.scan() мы получаем объект типа <class 'pyiceberg.table.DataScan'> и можем преобразовать в удобный нам формат:

  • .to_pandas()

  • .to_arrow()

  • etc

Удаление таблицы

Для удаления таблицы необходимо вызвать метод catalog.drop_table() предварительно инициализировав catalog.

Пример:

from pyiceberg.catalog import load_catalog  
  
catalog = load_catalog("s3_warehouse")  
  
catalog.drop_table("default.animals")

Важно: При удалении таблицы происходит удаление из мета-стора, а файлы остаются на месте. Таблица в данном случае – это просто ссылка на какой-то объект в S3.

Если вы не знаете что такое S3 или хотите самостоятельно удалить файлы из S3, то можете ознакомиться с моей статьей – Инфраструктура для data engineer S3.

Time Travel

Отличительной особенностью Iceberg является – это создание snapshots на каждое изменение таблицы и поэтому мы можем между ними перемещаться.

Для того, чтобы получить текущий список snapshots для таблицы необходимо выполнить следующий код:

from pyiceberg.catalog import load_catalog
import pandas as pd

pd.set_option("display.max_columns", None)
catalog = load_catalog("s3_warehouse")

table = catalog.load_table("default.custom_table_pyarrow_fields")

print(table.inspect.snapshots())

И в выводе мы увидим все id наших snapshots, которые сможем считать следующим образом:

from pyiceberg.catalog import load_catalog  

catalog = load_catalog("s3_warehouse")  
  
table = catalog.load_table("default.custom_table_pyarrow_fields")  

print(table.scan(snapshot_id=3826389439422852561))

Spark

Вот мы добрались и до Spark. На самом деле Spark – это на текущий момент единственный инструмент, который позволяет производить DDL и DML операции в Apache Iceberg и также позволяет использовать привычный для всех SQL-интерфейс и также Spark-синтаксис.

Вообще, все примеры работы Spark + Iceberg находятся в блокнотах, которые были предварительно созданы командой Iceberg. Для того чтобы посмотреть примеры на Spark перейдите по ссылке http://localhost:8888/.

Я постараюсь не дублировать весь код, а показать только самое важное, чтобы это можно было использовать в дальнейшем как "справочник".

Доступ к каталогу

Во время сборки контейнера уже произошла инициализация подключения к нашему хранилищу, поэтому мы можем сразу же обращаться запросами к нему, исключая инициализацию.

Для начала давайте посмотрим какие таблицы нам доступны в namespace default, который мы создавали ранее. Для этого выполним следующий скрипт:

%%sql

show tables in default;

Создание namespace

Для создания namespace необходимо выполнить команду:

%%sql

CREATE DATABASE IF NOT EXISTS foo;

После этого в наш мета-стор запишется информация о создании данного namespace, но физически он не создастся. Данный namespace появится только после создания в нём первой таблицы.

Создание таблицы

Как я говорил выше после создания namespace foo мы его можем видеть в нашем мета-сторе, но физически его ещё не существует, поэтому давайте создадим таблицу следующим кодом:

%%sql

CREATE TABLE foo.bar(
    id bigint
);

Также мы можем создать таблицу с партиционированием по выбранному полю:

CREATE TABLE foo.bar_partition (
    id bigint,
    created_at timestamp,
)
USING iceberg
PARTITIONED BY (days(created_at))

Данный код нам создаст таблицу и при каждой вставке данных в неё он будет создавать партицию за нужный день. Вот так это выглядит в самом хранилище:

.
├── data
│  ├── created_at=YYYY-MM-DD
│  ├── created_at=YYYY-MM-DD
│  ├── created_at=YYYY-MM-DD
│  ├── ...
│  ├── created_at=YYYY-MM-DD

Вы можете создавать партиции не только по дням, но и по часам, а также по перечислениям. К примеру по продавцам/покупателям/гендеру и прочее.

Важно: если вы некорректно создали партиции или вам не достаточно текущего уровня партиционирования, то можете его изменить через удаление партиций и создание новых.

Удаление текущих партиций:

%%sql

ALTER TABLE foo.bar_partition DROP PARTITION FIELD days(created_at)

Создание новых партиций:

%%sql

ALTER TABLE foo.bar_partition ADD PARTITION FIELD hours(created_at)

И после этого необходимо вызвать метод rewrite_data_files, чтобы перезаписать партиции.

%%sql

CALL system.rewrite_data_files('foo.bar_partition')

Вставка данных в таблицы

Так как мы работаем через Spark, то у нас есть два варианта вставки данных:

  1. через dataframe:

df = spark.read.parquet(".parquet")
df.writeTo("foo.bar").append()
  1. Через привычный INSERT:

%%sql 

INSERT INTO foo.bar(id)
VALUES (1)

Для примера не важен формат вставки, но вы можете без проблем совершать вставки на основании данных из Data Lake или из внешних источников.

Если вы хотите сгенерировать данные или поработать с реальными данными, то можете ознакомиться с моей статьей – Pet-проекты и данные для Data-Engineer.

Удаление данных из таблицы

Для удаления данных из таблицы необходимо выполнить следующий код:

%%sql

DELETE FROM foo.bar
WHERE id = 1

Чтение таблицы

Чтобы просто прочитать данные из таблицы foo.bar необходимо выполнить код:

%%sql

SELECT * FROM foo.bar

Для того чтобы прочитать данные для дальнейшей работы над ними, необходимо выполнить код:

df = spark.sql('SELECT * FROM foo.bar')
df.show()

Удаление таблицы

Удаление таблицы происходит стандартной командой:

%%sql

DROP TABLE IF EXISTS foo.bar;

Time Travel

Как я говорил ранее, отличительной способность Iceberg является – путешествие "во времени". Поэтому каждый раз, когда мы изменяли нашу таблицу foo.bar, при помощи INSERT, DELETE, UPDATE, то у нас создавался snapshot, к которому мы можем обратиться или даже "откатиться".

Давайте рассмотрим это по порядку.

В начале давайте посмотрим список доступных нам snapshots по таблице:

%%sql

SELECT *
FROM foo.bar.snapshots
ORDER BY committed_at DESC

Теперь мы знаем все snapshot_id для нашей таблицы и можем прочитать любое состояние таблицы по snapshot_id:

%%sql

SELECT
    count(*) as c
FROM
    foo.bar
FOR VERSION AS OF 3331575308018494635

И также можем это сделать по полю committed_at:

%%sql

SELECT
    count(*) as c
FROM
    foo.bar
FOR TIMESTAMP AS OF TIMESTAMP 'YYYY-MM-DD HH:MM:SS.000000'

Ну и конечно же мы можем "откатиться" на любое прошлое состояние. Для этого необходимо выполнить команду:

CALL system.rollback_to_snapshot('foo.bar', 3331575308018494635)

Я считаю, что это одна из уникальных особенностей Iceberg, который позволяет не только путешествовать во времени, но и соблюдать идемпотентность при построении pipeline.

Branching and Tagging DDL

Также важным свойством Iceberg является – возможность создавать ветки и теги.

Создание веток позволяет нам версионировать данные, не меняя продовые данные.

Давайте рассмотрим это на примере. Мы ранее с вами создали таблицу foo.bar, теперь если вставить в неё несколько значений, от одного до пяти при помощи следующего кода:

%%sql 

INSERT INTO foo.bar(id)
VALUES (1)

Затем мы можем проверить количество данных в нашей таблице:

SELECT count(*) AS cnt
FROM foo.bar

Теперь нам необходимо установить возможность версионирования через ветки для таблицы, для этого выполним следующий код:

%%sql

ALTER TABLE foo.bar
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

Теперь мы можем создавать ветки для нашей таблицы, давайте создадим ветку delete_one_value:

%%sql

ALTER TABLE foo.bar 
CREATE BRANCH delete_one_value

После этого нам нужно переключиться на неё:

spark.conf.set('spark.wap.branch', 'delete_one_value')

И теперь выполним нашу задачу в нужной ветке:

%%sql

DELETE FROM foo.bar
WHERE id = 1

И для проверки того, что мы всё сделали корректно можем выполнить следующий код:

%%sql
    
SELECT count(*) AS cnt
-- FROM foo.bar VERSION AS OF 'main'
FROM foo.bar.branch_main

И здесь мы получим то количество строк, который у нас находятся в main.

А при выполнении следующего кода мы сможем получить то количество строк, которое соответствует ветке delete_one_value.

%%sql
    
SELECT count(*) AS cnt
-- FROM foo.bar VERSION AS OF 'delete_one_value'
FROM foo.bar.branch_delete_one_value

Важно: В коде указано две версии того, как мы можем обращаться к нашим веткам.

  1. Через указание ветки:

...
FROM foo.bar VERSION AS OF 'main'
...
  1. Через точку и добавлением префикса:

...
FROM foo.bar.branch_main
...

Ну и теперь самое главное – это публикация изменений в main.

Для начала нам нужно узнать какой snapshot_id использовать для публикации.

%%sql

SELECT * FROM foo.bar.refs

Важно: мы можем опубликовать любой snapshot_id из нашей ветки delete_one_value, но рекомендуется использоваться последний, чтобы было всё логично.

И чтобы сделать саму публикацию необходимо выполнить код:

%%sql
    
CALL system.cherrypick_snapshot('foo.bar', 5235792610289419748)

Ну и в конце нужно убрать за собой, поэтому удалим ветку delete_one_value при помощи следующего кода:

%%sql
    
ALTER TABLE foo.bar DROP BRANCH delete_one_value

И чтобы проверить, что всё удалено корректно мы можем выполнить данный код:

%%sql

SELECT * FROM foo.bar.refs

Заключение

Я постарался показать основные возможности для построения Data Lake на базе Apache Iceberg.

Вообще, у этого инструмента довольно много применений. Можно как строить полноценный аналитический продукт на его базе, так и использовать как "холодное" или "тёплое" хранилище. Над "горячим" я бы подумал, как его организовать, с учётом постоянных изменений модели данных и частых изменений dds, dm слоя.

Iceberg по своей сути является только абстракцией, предлагая удобный интерфейс для взаимодействия с данными. Он не является серебряной пулей, он имеет свои ограничения и тонкости.

На что я бы обратил внимание при работе с Iceberg:

  • Качественный DDL ваших таблиц: правильная модель данных, правильное партиционирование, название таблицы и нужная схема (namespace).

  • Следить за изменениями таблицы: INSERT, UPDATE, DELETE, DROP. В частности над DROP, потому что при удалении таблицы вы удаляете ссылку на файл из мета-стора, а физически файлы хранятся в вашем S3. Если для вас вопрос storage важен, то это стоит контролировать.

  • Вопрос мета-данных на уровне дата-каталога, а не на уровне DDL. Важно за данными следить, отслеживать источники, pipeline загрузки и про данные не забывать. Стоит продумать варианты того, как информировать пользователей о наличии тех или иных данных.

  • Бэкап meta-store, чтобы не потерять ссылки на текущие "таблицы".

Если резюмировать, то Iceberg является хорошим продуктом, который позволит вам разделить compute и storage при работе с данными. Его стоит попробовать, если в вашей инфраструктуре есть Apache Spark или есть опыт работы с Spark, чтобы его правильно "сварить" с Apache Iceberg.


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.

Комментарии (0)