Всем привет от команды Greengage!

В этой статье мы поделимся с вами новостями, касающимися:

  • прогресса разработки фичи для сокращения числа сегментов кластера (далее для краткости я буду называть этот процесс шринком), которая "подросла" до полноценного ребаланса кластера, включающего в себя либо шринк, либо экспанд кластера;

  • новой фичи перераспределения вставки по заданным колонкам в foreign-таблицы;

  • некоторых спойлеров о будущей интеграции с Apache Iceberg.

Как говорится, устраивайтесь поудобнее, мы начинаем.

ggrebalance

В начале 2025 года наша команда заявила о старте работ по реализации решения задачи шринка кластера. Практически сразу стало понятно, что шринк в общем случае приводит к необходимости последующего приведения кластера к сбалансированному виду, т.е. к ребалансу кластера. Таким образом, утилита шринка кластера преобразилась до утилиты ребаланса кластера ggrebalance.

В этой статье я сосредоточусь на пользовательских характеристиках решения и не буду углубляться в технические аспекты. Детали мы раскроем в отдельных статьях.

Итак, ggrebalance решает сразу несколько задач:

  • удаление сегментов с последующим ребалансом оставшихся сегментов между хостами;

  • удаление/добавление хостов с последующим ребалансом сегментов.

Также ggrebalance будет включать в себя знакомые всем возможности, предоставляемые утилитой gpexpand.

Основной набор сценариев, который рассматривался при проектировании:

  1. Добавление в кластер новых хостов без изменения числа сегментов.

  2. Добавление в кластер новых хостов с добавлением новых сегментов. Это может быть как добавление некоторого фиксированного числа новых сегментов в кластер с последующим ребалансом, так и добавление такого числа сегментов, которое определяется текущей конфигурацией кластера (сценарий аналогичен привычному gpexpand).

  3. Удаление хостов из кластера без изменения числа сегментов.

  4. Удаление хостов из кластера с удалением существующих сегментов.

  5. Перенос сегментов на некоторый набор хостов без изменения числа сегментов.

  6. Перенос сегментов на некоторый набор хостов с изменением числа сегментов (шринк или экспанд).

  7. Добавление новых сегментов без изменения множества хостов.

  8. Удаление существующих сегментов без изменения множества хостов.

  9. Изменение стратегии зеркалирования и приведение текущей схемы к одной из стандартных: grouped или spread.

Основные принципы и идеи, в соответствии с которыми мы разрабатываем ggrebalance:

  1. Одна утилита для реализации разных сценариев, связанных с изменением "топологии" кластера.

  2. Особое внимание уделялось аспектам возобновления начатого сценария ребаланса после приостановки работы (например, по причине аварийного завершения).

  3. Связанная с предыдущим пунктом возможность предоставления администратору средств принятия решения в интерактивном режиме:

    • Выбор дальнейших действий в случае возникновения ошибок (отмена попыток перемещения сегментов, откат перемещения к предыдущему состоянию).

    • Выбора момента переключения ролей, который связан с отменой текущих запросов.

  4. Сокращение времени недоступности зеркала при выполнении перемещения сегмента.

Среди основных фичей:

  1. Шринк таблиц на базе INSERT SELECT, а не CREATE TABLE AS SELECT (перенести строки с удаляемых сегментов в общем случае занимает меньше времени, чем пересоздать таблицу заново).

  2. Поддержка возможности задать списки добавляемых или удаляемых хостов, в том числе задать новый список хостов, на котором необходимо осуществить ребаланс кластера (опции, принимающие список хостов: --target-hosts, --add-hosts, --remove-hosts и ряд других).

  3. Возможность задать целевое число сегментов, на которых нужно осуществить ребаланс кластера (сценарии шринка или экпанда кластера, опция --target-segment-count).

  4. Возможность определить временные интервалы выполнения операций ребаланса (по аналогии с параметрам --end и --duration в утилите gpexpand).

  5. Возможность посмотреть план ребаланса (dry run) без физического переноса сегментов (опция --show-plan).

  6. Возможность в некоторых сценариях осуществить откат перемещений (опция --rollback).

  7. Поддержка возможности осуществить ребаланс кластера в соответствии с выбранной стратегией зеркалирования (grouped или spread).

Покажем работу ggrebalance на примере нескольких сценариев (в "боевой" версии вывод на консоль и какие-то другие нюансы могут поменяться).

Допустим, наш кластер представляет собой такой набор сегментов, распределенных на следующем множестве хостов:

postgres=# SELECT hostname, COUNT(content), role
FROM gp_segment_configuration GROUP BY hostname, role ORDER BY hostname;
      hostname      | count | role
--------------------+-------+------
 mdw                |     1 | p
 sdw1               |     5 | m
 sdw1               |     2 | p
 sdw2               |     2 | m
 sdw2               |     4 | p
 sdw3               |     3 | p
 sdw3               |     2 | m
(7 rows)

Как мы видим, сейчас кластер не сбалансирован:

Несбалансированный кластер
Несбалансированный кластер

Поручим ggrebalance задачу приведения кластера в сбалансированное состояние без изменения множества хостов в соответствии со стратегией grouped.

Сначала запросим план перемещений, запустив ggrebalance с опцией --show-plan (для стратегии зеркалирования --mirror-mode выберем grouped):

$ ggrebalance --show-plan --mirror-mode grouped
20251127:13:19:56:3990504 ggrebalance:mdw:gpadmin-[INFO]:-Init gparray from catalog
20251127:13:19:56:3990504 ggrebalance:mdw:gpadmin-[INFO]:-Validation of rebalance possibility
20251127:13:19:56:3990504 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance moves. Can take up to 60s.
20251127:13:19:56:3990504 ggrebalance:mdw:gpadmin-[INFO]:-Running randomized plan improvement with seed:155941400336372386876681048343334492835
20251127:13:19:56:3990504 ggrebalance:mdw:gpadmin-[INFO]:-Final plan:

---------------------------------BALANCE MOVES----------------------------------
Total moves planned: 4

Move #1:
Move Segment(content=1, dbid=5, role=m)
      From: sdw3:50310 → /data1/mirror/gpseg1
      To:   sdw2:10223 → /data1/mirror/gpseg1

Move #2:
Move Segment(content=3, dbid=7, role=m)
      From: sdw1:50130 → /data1/mirror/gpseg3
      To:   sdw3:10367 → /data1/mirror/gpseg3

Move #3:
Move Segment(content=4, dbid=8, role=m)
      From: sdw1:50140 → /data1/mirror/gpseg4
      To:   sdw3:10369 → /data1/mirror/gpseg4

Move #4:
Move Segment(content=5, dbid=14, role=p)
      From: sdw2:10350 → /data1/primary/gpseg5
      To:   sdw1:50141 → /data1/primary/gpseg5

Как мы видим, ggrebalance планирует распределить сегменты следующим образом:

План балансировки кластера
План балансировки кластера

Перераспределим сегменты в соответствии с планом:

$ ggrebalance --mirror-mode grouped
20251117:16:39:40:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Init gparray from catalog
20251117:16:39:40:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Validation of rebalance possibility
20251117:16:39:40:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance moves. Can take up to 60s.
20251117:16:39:40:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Running randomized plan improvement with seed:83285156933668510164345869375561513208
20251117:16:39:41:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance done
20251117:16:39:41:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Created rebalance schema ggrebalance
20251117:16:39:41:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Executor started
20251117:16:39:44:2261495 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 8, content = 4) sdw1|50140|/data1/mirror/gpseg4 sdw3|10369|/data1/mirror/gpseg4
20251117:16:39:44:2261495 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 5, content = 1) sdw3|50310|/data1/mirror/gpseg1 sdw2|10223|/data1/mirror/gpseg1
20251117:16:39:44:2261495 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 7, content = 3) sdw1|50130|/data1/mirror/gpseg3 sdw3|10367|/data1/mirror/gpseg3
20251117:16:40:54:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 7): /data1/mirror/gpseg3
20251117:16:40:54:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 8): /data1/mirror/gpseg4
20251117:16:40:55:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 5): /data1/mirror/gpseg1
20251117:16:40:59:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Executing role swaps for 1 segments
20251117:16:41:33:2261495 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 14, content = 5) sdw2|10350|/data1/primary/gpseg5 sdw1|50141|/data1/primary/gpseg5
20251117:16:42:44:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 14): /data1/primary/gpseg5
20251117:16:42:48:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Executing role swaps for 1 segments
20251117:16:43:07:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Executor done
20251117:16:43:08:2261495 ggrebalance:mdw:gpadmin-[INFO]:-Rebalance done

В итоге получаем сбаланисированный кластер:

postgres=# SELECT hostname, COUNT(content), role
FROM gp_segment_configuration GROUP BY hostname, role ORDER BY hostname;
      hostname      | count | role
--------------------+-------+------
 mdw                |     1 | p
 sdw1               |     3 | m
 sdw1               |     3 | p
 sdw2               |     3 | m
 sdw2               |     3 | p
 sdw3               |     3 | p
 sdw3               |     3 | m
(7 rows)

Усложним задачу — запросим шринк кластера до 6 сегментов, но так как шринк (т.е. удаление последних 3 сегментов) в нашем случае приводит к несбалансированному кластеру, то планировщиком ggrebalance будет учтен этап ребаланса. Заодно запросим вывести хосты sdw1 и sdw3 из кластера и заменить их на sdw4 и sdw5.

Для начала запросим план:

$ ggrebalance --mirror-mode grouped \
    --show-plan \
    --target-segment-count 6 \
    --add-hosts="sdw4,sdw5" \
    --remove-hosts="sdw1,sdw3" \
    --target-datadirs="/data1/primary/gpseg{content}","/data1/mirror/gpseg{content}"

20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Init gparray from catalog
20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Planning shrink
20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Validation of rebalance possibility
20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance moves. Can take up to 60s.
20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Running randomized plan improvement with seed:182202519713633465208461660014454186954
20251127:12:16:04:3981145 ggrebalance:mdw:gpadmin-[INFO]:-Final plan:
================================================================================
                                  SHRINK PLAN
================================================================================

Target Segment Count: 6

-------------------------------SEGMENTS TO REMOVE-------------------------------
Total segments to shrink: 3

  [1] Segment Pair:
      Primary:
        Content:  6
        DbId:     17
        Host:     sdw3
        Datadir:  /data1/primary/gpseg6
        Port:     10360
      Mirror:
        Content:  6
        DbId:     12
        Host:     sdw1
        Datadir:  /data1/mirror/gpseg6
        Port:     50160

  [2] Segment Pair:
      Primary:
        Content:  7
        DbId:     18
        Host:     sdw3
        Datadir:  /data1/primary/gpseg7
        Port:     10370
      Mirror:
        Content:  7
        DbId:     15
        Host:     sdw1
        Datadir:  /data1/mirror/gpseg7
        Port:     50170

  [3] Segment Pair:
      Primary:
        Content:  8
        DbId:     19
        Host:     sdw3
        Datadir:  /data1/primary/gpseg8
        Port:     10380
      Mirror:
        Content:  8
        DbId:     16
        Host:     sdw1
        Datadir:  /data1/mirror/gpseg8
        Port:     50180

---------------------------------BALANCE MOVES----------------------------------
Total moves planned: 9

Move #1:
Move Segment(content=1, dbid=5, role=m)
      From: sdw3:50310 → /data1/mirror/gpseg1
      To:   sdw5:10382 → /data1/primary/gpseg1

Move #2:
Move Segment(content=2, dbid=6, role=m)
      From: sdw1:50320 → /data1/mirror/gpseg2
      To:   sdw4:10385 → /data1/mirror/gpseg2

Move #3:
Move Segment(content=3, dbid=7, role=m)
      From: sdw3:50130 → /data1/mirror/gpseg3
      To:   sdw4:10387 → /data1/mirror/gpseg3

Move #4:
Move Segment(content=4, dbid=8, role=m)
      From: sdw1:50140 → /data1/mirror/gpseg4
      To:   sdw4:10388 → /data1/primary/gpseg4

Move #5:
Move Segment(content=5, dbid=11, role=m)
      From: sdw2:50250 → /data1/mirror/gpseg5
      To:   sdw4:10390 → /data1/primary/gpseg5

Move #6:
Move Segment(content=0, dbid=2, role=p)
      From: sdw1:10100 → /data1/primary/gpseg0
      To:   sdw5:10381 → /data1/mirror/gpseg0

Move #7:
Move Segment(content=1, dbid=3, role=p)
      From: sdw1:10110 → /data1/primary/gpseg1
      To:   sdw2:10223 → /data1/mirror/gpseg1

Move #8:
Move Segment(content=4, dbid=13, role=p)
      From: sdw2:10340 → /data1/primary/gpseg4
      To:   sdw5:10389 → /data1/primary/gpseg4

Move #9:
Move Segment(content=5, dbid=14, role=p)
      From: sdw2:10350 → /data1/primary/gpseg5
      To:   sdw5:10391 → /data1/mirror/gpseg5

В этом сценарии ggrebalance планирует перенос сегментов так:

Перенос сегментов в более сложном сценарии
Перенос сегментов в более сложном сценарии

Запрашиваем ребаланс, совмещенный с сокращением числа сегментов и переносом части сегментов на другие хосты:

$ ggrebalance --mirror-mode grouped \
    --target-segment-count 6 \
    --add-hosts="sdw4,sdw5" \
    --remove-hosts="sdw1,sdw3" \
    --target-datadirs="/data1/primary/gpseg{content}","/data1/mirror/gpseg{content}"

20251118:14:19:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Init gparray from catalog
20251118:14:19:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Planning shrink
20251118:14:19:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Validation of rebalance possibility
20251118:14:19:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance moves. Can take up to 60s.
20251118:14:19:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Running randomized plan improvement with seed:40453766366548528338949156091668265999
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Planning rebalance done
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Created rebalance schema ggrebalance
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Executor started
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Updated target segment count to 6
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Initiated ggrebalance.table_shrink_status_detail
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Start tables rebalance
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Tables to process 1
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-0.00% of jobs completed
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Tables rebalance complete
20251118:14:19:18:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Start catalog shrink
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Catalog shrink complete
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Stopping shrinked segments...
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-0.00% of jobs completed
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Shrinked segments were stopped
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Shrink is complete
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 5, content = 1) sdw3|50310|/data1/mirror/gpseg1 sdw5|10382|/data1/primary/gpseg1
20251118:14:19:19:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 8, content = 4) sdw1|50140|/data1/mirror/gpseg4 sdw4|10388|/data1/primary/gpseg4
20251118:14:19:20:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 7, content = 3) sdw1|50130|/data1/mirror/gpseg3 sdw4|10387|/data1/mirror/gpseg3
20251118:14:19:20:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 6, content = 2) sdw3|50320|/data1/mirror/gpseg2 sdw4|10385|/data1/mirror/gpseg2
20251118:14:21:06:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 5): /data1/mirror/gpseg1
20251118:14:21:06:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 8): /data1/mirror/gpseg4
20251118:14:21:06:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 6): /data1/mirror/gpseg2
20251118:14:21:06:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 7): /data1/mirror/gpseg3
20251118:14:21:07:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 11, content = 5) sdw2|50250|/data1/mirror/gpseg5 sdw4|10390|/data1/primary/gpseg5
20251118:14:22:11:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 11): /data1/mirror/gpseg5
20251118:14:22:16:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Executing role swaps for 4 segments
20251118:14:22:16:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 3, content = 1) sdw1|10110|/data1/primary/gpseg1 sdw2|10223|/data1/sdw2/mirror/gpseg1
20251118:14:22:16:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 13, content = 4) sdw2|10340|/data1/primary/gpseg4 sdw5|10389|/data1/mirror/gpseg4
20251118:14:22:16:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 2, content = 0) sdw1|10100|/data1/primary/gpseg0 sdw5|10381|/data1/primary/gpseg0
20251118:14:22:17:2750825 ggrebalance:mdw:gpadmin-[INFO]:-About to run gprecoverseg for mirror move (dbid = 14, content = 5) sdw2|10350|/data1/primary/gpseg5 sdw5|10391|/data1/mirror/gpseg5
20251118:14:23:53:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 2): /data1/primary/gpseg0
20251118:14:23:53:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 13): /data1/primary/gpseg4
20251118:14:23:53:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 14): /data1/primary/gpseg5
20251118:14:23:53:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Removing old segment's datadir (dbid = 3): /data1/primary/gpseg1
20251118:14:23:57:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Executing role swaps for 1 segments
20251118:14:24:20:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Executor done
20251118:14:24:20:2750825 ggrebalance:mdw:gpadmin-[INFO]:-Rebalance done
$ psql postgres
psql (12.12)
Type "help" for help.

postgres=# SELECT hostname, COUNT(content), role
FROM gp_segment_configuration GROUP BY hostname, role ORDER BY hostname;
      hostname      | count | role
--------------------+-------+------
 mdw                |     1 | p
 sdw2               |     2 | p
 sdw2               |     2 | m
 sdw4               |     2 | m
 sdw4               |     2 | p
 sdw5               |     2 | p
 sdw5               |     2 | m
(7 rows)

Как мы видим, ggrebalance успешно справился и с этой задачей. Кластер перешел в сбалансированное состояние, учитывая частичное изменение множества хостов, на которых потребовалось осуществить ребаланс.

Перераспределение данных при вставке в foreign-таблицы

Следующую фичу, вошедшую в релиз Greengage 6.29.1, мы сначала рассмотрим отдельно, но в следующем разделе покажем ее применимость в контексте интеграции с Apache Iceberg.

Некоторым пользователям foreign-таблиц требуется возможность задать распределение внешней таблицы по аналогии с конструкцией DISTRIBUTED BY в external-таблицах с типом WRITABLE EXTERNAL TABLE. Такая необходимость возникает с желанием либо осуществлять вставку напрямую с сегментов, при совпадении типа ключа(ей) распределения с локальным распределением (т.е. без узла Redistribute Motion), либо для решения задачи группировки строк, которая бы позволила осуществлять вставку во внешний источник некоторого, непересекающегося между сегментами, подмножества строк (согласно значению Hash Key в Redistribute Motion).

Второй вариант позволяет получать экономию сканирования файлов внешнего приемника, например при записи Parquet-файлов, такими движками как Apache Impala при отбросе файлов для сканов на основе статистик (min/max, словари).

В качестве решения нашей командой была реализована возможность задать настройки ключей распределения на уровне объявления колонок FOREIGN TABLE:

CREATE FOREIGN TABLE [ IF NOT EXISTS ] <table_name> ( [
    <column_name> <data_type> [ OPTIONS ( insert_dist_by_key { 'true' | 'false' }, [ insert_dist_by_key_weight '<weight>' ], [, ... ] ) ] [ COLLATE <collation> ] [ <column_constraint> [ ... ] ]
      [, ... ]
] )
SERVER <server_name>
[ OPTIONS ( [ mpp_execute { 'master' | 'any' | 'all segments' } [, ] ] <option> '<value>' [, ... ] ) ]

где параметры:

  • insert_dist_by_key — включает/выключает учет данной колонки в выборе ключа распределения;

  • insert_dist_by_key_weight — задает "вес" данной колонки при формировании списка из нескольких ключей (задает порядок ключей).

Рассмотрим несколько примеров вставки данных, при заданном ключе(ах) в случае необходимости перераспределения данных:

  • локальная таблица local имеет распределение по колонке local.col2;

  • foreign-таблица имеет распределение по колонке ft.col1.

Перераспределение по хэшу
Перераспределение по хэшу

План запроса в этом случае будет выглядеть так:

                      QUERY PLAN
------------------------------------------------------
 Insert on public.ft
   ->  Redistribute Motion 3:3  (slice1; segments: 3)
         Output: local.col1, local.col2, local.col3
         Hash Key: ft.col1 
         ->  Seq Scan on public.local
               Output: local.col1, local.col2, local.col3
 Optimizer: Postgres query optimizer
(7 rows)

Если мы обратим внимание на строку 6, то увидим, что для перераспределения строк между сегментами Redistribute Motion применяется промежуточная группировка по полю ft.col1.

При совпадении ключей (распределение внешней таблицы меняется на ft.c2):

postgres=# ALTER FOREIGN TABLE ft ALTER COLUMN col1 OPTIONS (SET insert_dist_by_key 'false');
ALTER FOREIGN TABLE

postgres=# ALTER FOREIGN TABLE ft ALTER COLUMN col2 OPTIONS (SET insert_dist_by_key 'true');
ALTER FOREIGN TABLE
"Прямая" вставка без перераспределения
"Прямая" вставка без перераспределения

Для данной конфигурации колонок распределения план запроса становится таким:

             QUERY PLAN
-------------------------------------
 Insert on public.ft
   ->  Seq Scan on public.local
         Output: local.c1, local.c2, local.c3
 Optimizer: Postgres query optimizer
(4 rows)

В случае отсутствия новых настроек планы строятся прежним образом:

postgres=# ALTER FOREIGN TABLE ft ALTER COLUMN col2 OPTIONS (DROP insert_dist_by_key);
ALTER FOREIGN TABLE
Исходный вариант с random-перераспределением
Исходный вариант с random-перераспределением
postgres=# EXPLAIN (COSTS off, VERBOSE on) INSERT INTO ft SELECT * FROM t;
                      QUERY PLAN
------------------------------------------------------
 Insert on public.ft
   ->  Redistribute Motion 3:3  (slice1; segments: 3)
         Output: local.col1, local.col2, local.col3
         ->  Seq Scan on public.local
               Output: local.col1, local.col2, local.col3
 Optimizer: Postgres query optimizer
(6 rows)

Особенности реализации:

  • Требуемая политика распределения запроса строится "на лету" — планировщик учитывает заданный набор колонок распределения при построении плана.

  • Если выбранные пользователем колонки не позволяют осуществить распределение согласно ограничениям типов для колонок, которые можно использовать для DISTRIBUTED BY обычных таблиц, то планировщик сообщает пользователю об этом в виде WARNING и далее использует политику по умолчанию, возвращаемую функцией createRandomPartitionedPolicy.

  • Если колонок распределения нет, то используется createRandomPartitionedPolicy.

  • Если используется отличное от mpp_execute 'all segments' значение, то план в случае необходимости сбора данных с сегментов остается таким же, как если бы опции колонок распределения не были заданы (т.е. в общем случае используется Gather Motion и заданные опции распределения не учитываются).

В следующем разделе мы увидим, как эти опции влияют на операции вставки во внешние источники на примере Iceberg-коннектора.

Интеграция с Apache Iceberg

В данной статье я напомню базовые понятия формата Iceberg-таблиц, которые будут необходимы для понимания особенностей реализации коннектора. Если точнее, то со связанными с такими таблицами файлами метаданных и подходами к работе с этим метаданными в соответствии с Apache Iceberg спецификацией.

Для коннектора Greengage-Iceberg для нас важны следующие аспекты и ключевые особенности Apache Iceberg:

  • Частью метаданных таблиц (пространства имен, маппинг таблиц на их метаданные в виде файлов) управляет сервис, который называется каталог (catalog). Есть различные реализации каталогов: Hive Metastore (HiveCatalog), RESTCatalog, JdbcCatalog, HadoopTables, HadoopCatalog и ряд других вплоть до кастомных, реализующих соответствующие интерфейсы. Также в области ответственности каталога находятся DDL-операции, но в случае read-write коннекторов, которые работают с существующими таблицами, сценарии создания/удаления таблиц, как и любые другие сервисные операции находятся вне нашей области интересов.

  • Данные таблиц могут храниться в файлах с типами Parquet, Avro и ORC.

  • Среди базовых понятий формата:

    • Снимок таблицы (snapshot) — состояние таблицы на определенный момент времени. Если в случае PostgreSQL/Greengage понятие снимка прежде всего связано с идеей видимости транзакциями строк таблицы и тесно связано с идентификаторами этих самых транзакций (и в большей степени является runtime-характеристикой), то снимок Iceberg-таблицы определяет, какие файлы с данными будут использоваться при запросе для требуемого снимка (самый свежий или историчный). Что может быть крайне важно для написания коннекторов, особенно в распределенной среде — каждая запись в Iceberg-таблицу создает новый снимок, который сохраняется в новом файле метаданных таблицы (вместе с предыдущими снимками). Каталог ссылается на самый свежий файл с метаданными (metadata file), который в свою очередь хранит лог (список) снимков.

    • Список файлов мантифестов (manifest list) представляет собой набор манифест-файлов, который составляет конкретный снимок таблицы и связанные с этими файлами метаданные, в том числе определения (спецификации) партиций (partition spec).

    • Файл манифеста (manifest file) в свою очередь уже содержит информацию о конкретных файлах, относящихся к снимку. Помимо пути к файлу с данными, в нем также хранится метаинформация для оптимизации запросов получения данных (статистики, данные для механизма трансформаций партицирования, об этом далее).

  • Отличительная особенность Iceberg-партицирования от, например, партицирования Hive заключается в реализации схем (спецификаций) партицирования на уровне метаданных в соответствии с так называемым hidden-партицированием, когда управление партициями осуществляется на логическом уровне (метаданных таблицы), без привязки к физическому хранению (структуре каталогов, как это, например, делает Apache Hive или Apache Impala).

    • Спецификация партиции описывает каким образом осуществлять разбиение таблиц на партиции. Основные элементы этого описания — колонка (column) и трансформация (transform). Если колонка привычное для мира PostgreSQL/Greengage понятие, то понятие трансформации требует пояснения. Трансформация описывает способ извлечения значения партиции (partition value) — значения, которое нас приведет к конкретному набору файлов, составляющих партицию из значения колонки (source value). Например, трансформация month извлекает порядковый номер месяца из значения колонки event_ts:

{
  "spec-id": 1,
  "fields": [
    {
      "source-id": 1,
      "field-id": 1000,
      "name": "event_ts_month",
      "transform": "month"
    }
  ]
}
  • Данный формат поддерживает удаление записей (в том числе в рамках UPDATE-выражений), но так как исходно Parquet и другие подобные форматы не предполагают возможности их изменений после записи, то эти операции реализуются либо через отдельные файлы удаления (начиная с версии 2, подход merge-on-read), либо через copy-on-write, что приводит к перезаписи файлов с учетом изменений и может создавать проблемы при частых обновлениях. В первом случае коннектору, работающему на уровне физических файлов, так или иначе нужно уметь работать с delete-файлами.

У этого формата есть ряд других интересных возможностей — исторические запросы (time travel), ветки и теги (branch, tag) — все это на основе снимков (в случае веток и тегов — именованных снимков со своим временем жизни). Эти фичи интересны пользователями, и коннектор должен предоставлять средства работы с такими сценариями.

По мере продвижения в разработке соответствующего коннектора мы планируем рассказывать о текущем прогрессе, но некоторые аспекты можно раскрыть и сейчас.

Во-первых, нужно подсветить, какие возможности предлагает в распоряжение разработчиков Java-реализация Iceberg-спецификации. Во-вторых, раскрыть некоторые аспекты возможной реализации коннектора в рамках PXF. В-третьих, проиллюстрировать обещанную связку фичи распределения foreign-таблиц с Iceberg-коннектором.

SELECT from Iceberg

Одним из ключевых элементов Java-реализации с точки зрения организации выборки данных (SELECT-запросы) является класс TableScan. Данный класс позволяет делегировать Iceberg "планирование" выполнения запроса в части пред-фильтрации строк, выборки отдельных колонок, проброса параметров партицирования.

Таким образом, зная параметры фильтрации, имея под рукой список интересующих нас колонок, можно сделать, например, так:

TableScan scan = table.newScan();

TableScan filteredScan = scan
        .filter(Expressions.lessThan("event_id", 1000))
        .select("event_id", "event_ts", "subsystem_id", "event_details");

scan.select();

CloseableIterable<FileScanTask> filesToScan = scan.planFiles();

По сути, в последней строке в нашем распоряжении окажется список путей к файлам (в нашем случае на HDFS), которые нужно прочитать, чтобы вернуть пользователю интересующие его данные.

Если посмотреть на содержимое filesToScan в условиях хранения в Parquet-файлах в HDFS, то результат будет такой:

hdfs://hdfs-node1:20500/test-warehouse/ggdb.db/events_ice/data/subsystem_id=101/event_ts_day=2026-01-20/9f477aec1d8dd71c-fed0ba8600000000_1498292252_data.0.parq

hdfs://hdfs-node1:20500/test-warehouse/ggdb.db/events_ice/data/subsystem_id=110/event_ts_day=2025-12-20/b641d716435f6be4-7fd42c1400000000_1600164749_data.0.parq

hdfs://hdfs-node1:20500/test-warehouse/ggdb.db/events_ice/data/event_ts_month=2025-09/subsystem_id=110/bd4e94ee19f89fe5-
ddfbdc8700000000_782720467_data.0.parq

hdfs://hdfs-node1:20500/test-warehouse/ggdb.db/events_ice/data/event_ts_month=2025-10/subsystem_id=101/d640261450465339-6fc6d78b00000000_1549314526_data.0.parq

hdfs://hdfs-node1:20500/test-warehouse/ggdb.db/events_ice/data/event_ts_month=2025-11/subsystem_id=100/a1485ebd281a0665-b003f13700000000_458826306_data.0.parq

Разумеется, в данном случае о полной фильтрации записей согласно заданному фильтру речь не идет (если только это не колонка, входящая в спецификацию партицирования). Конкретный физический файл может включать и другие записи, например, попадающей по min-max или словарной фильтрации исходя из статистик и других метаданных таблицы. SQL-движок в любом случае должен произвести дополнительную фильтрацию полученных кортежей, но экономия на подобном отбросе "лишних" для исполняемого запроса файлов может являться ощутимой.

Один из главных вопросов, возникающих после планирования на стороне Iceberg — каким образом распределить обработку этих файлов между обработчиками? PXF предлагает лишь интерфейс раздачи "заданий" (так называемых фрагментов) между обработчиками (интерфейс Fragmenter), но фрагментами могут быть как файлы целиком, так и части (split) одного файла.

Выбор конкретного варианта разбиения может зависеть от результата планирования — количества файлов, которые нужно прочитать, их размера, числа обработчиков и т.д. Возможно, в каких-то случаях потребуется использовать комбинацию вариантов разбиения.

INSERT into Iceberg

Вторым интересным аспектом является способ организации записи, где важным элементом является упомянутая выше спецификация партицирования. Опять-таки в рамках данной статьи рассмотрим только интересующие нас вопросы записи строк в партицированные Iceberg-таблицы, которую должен поддерживать двунаправленный коннектор.

Представим, что нужная нам таблица, в которую нужно вставить новые записи, имеет следующую схему (в синтаксисе Apache Impala):

[hdfs-node1:21050] ggdb> CREATE TABLE ggdb.events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (MONTH(event_ts), IDENTITY(subsystem_id))
STORED AS ICEBERG;

Как мы видим, в строке 2 объявлена следующая схема партицирования: сначала идет трансформация MONTH, которая, как нетрудно догадаться, извлекает порядковый номер месяца из поля event_ts. Следом идет партицирование по некоторому целочисленному идентификатору (в данном случае, не по "хешу от", а по значению).

Если посмотреть на структуру каталогов, то мы увидим следующее дерево для некоторого множества строк в таблице, распределенных по двум уровням:

 |---events_ice
 |-----data
 |-------event_ts_month=2025-09
 |---------subsystem_id=110
 |-----------bd4e94ee19f89fe5-ddfbdc8700000000_782720467_data.0.parq
 |-------event_ts_month=2025-10
 |---------subsystem_id=101
 |-----------d640261450465339-6fc6d78b00000000_1549314526_data.0.parq
 |-------event_ts_month=2025-11
 |---------subsystem_id=100
 |-----------a1485ebd281a0665-b003f13700000000_458826306_data.0.parq

Схему партицирования можно изменить. Например, мы поняли, что с ростом числа событий гранулярность с точностью до месяца нас перестала устраивать и мы решили уточнить ее до календарного дня. Обновим параметры партицирования и рассмотрим, что же произошло с данными:

[hdfs-node1:21050] ggdb> ALTER TABLE ggdb.events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id));
Query: ALTER TABLE events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id))
+-------------------------+
| summary                 |
+-------------------------+
| Updated partition spec. |
+-------------------------+

 |---events_ice
 |-----data
 |-------event_ts_month=2025-09
 |---------subsystem_id=110
 |-----------bd4e94ee19f89fe5-ddfbdc8700000000_782720467_data.0.parq
 |-------event_ts_month=2025-10
 |---------subsystem_id=101
 |-----------d640261450465339-6fc6d78b00000000_1549314526_data.0.parq
 |-------event_ts_month=2025-11
 |---------subsystem_id=100
 |-----------a1485ebd281a0665-b003f13700000000_458826306_data.0.parq
 |-------subsystem_id=101
 |---------event_ts_day=2026-01-20
 |-----------9f477aec1d8dd71c-fed0ba8600000000_1498292252_data.0.parq
 |-------subsystem_id=110
 |---------event_ts_day=2025-12-20
 |-----------b641d716435f6be4-7fd42c1400000000_1600164749_data.0.parq

Начиная с даты 2026-01-20 схема каталогов была перестроена, и на первый уровень партицирования Iceberg вынес поле subsystem_id, а из поля event_ts начал извлекаться календарный день.

Из этого следует первый очевидный вывод: с точки зрения записи, вопросы организации структуры физического хранения проще делегировать Iceberg-реализации (хотя всегда остается вариант записи напрямую в Parquet-файлы с последующим append этих файлов, но при этом много чего придется "изобрести" заново).

Какие средства нам предлагает базовая библиотека?

Библиотека реализует 4 стратегии записи, представленные следующими классами:

  • FanoutWriter;

  • PartitionedFanoutWriter;

  • ClusteredDataWriter;

  • PartitionedWriter.

Стратегия FanoutWriter
Стратегия FanoutWriter

Данная стратегия записи подразумевает наличие нескольких классов-писателей для каждой спецификации партицирования (PartitionSpec). Поддерживается сценарии записи в таблицы с несколькими спецификациями. Например, в случае если пользователь со временем определил несколько схем, то это называется эволюцией спецификации партицирования.

Для каждой комбинации спецификаций при выполнении записи создается свой экземпляр класса. Запись распараллеливается.

Как можно предположить, в случае разнородных данных (строки попадают в разные партиции) данная стратегия характеризуется большими накладными расходами, так как для каждого экземпляра класса требуется своя часть ресурсов (например, память под буферы и т.п.).

С другой стороны, для данной стратегии отпадает необходимость предагрегации данных, строки на вход можно подать любые — FanoutWriter самостоятельно "раскидает" их по нужным партициям.

Стратегия PartitionedFanoutWriter
Стратегия PartitionedFanoutWriter

Эта стратегия во многом пересекается с FanoutWriter-стратегией, но в отличие от последней не поддерживает схему с эволюцией партиций. Поддерживается только некоторая фиксированная спецификация с перенаправлением в экземпляр класса-писателя на базе некоторого ключа, представленного экземпляром класса PartitionKey.

Стратегия ClusteredDataWriter
Стратегия ClusteredDataWriter

ClusteredWriter предполагает, что строки сгруппированы согласно спецификации партицирования, однако поддерживает множественные спецификации. В противном случае (если строки не сгруппированы согласно спецификации) вставка завершается ошибкой.

Данная стратегия подходит для сгруппированных для вставки данных, при этом для множественных спецификаций вставка записей "вразнобой" не поддерживается — то есть строки должны быть сгруппированы по ключу(ам) в рамках каждой спецификации и идти последовательно. Характеризуется экономным потреблением ресурсов и высокой пропускной способностью.

Стратегия PartitionedWriter
Стратегия PartitionedWriter

Предусловием использования стратегии PartitionedWriter является сгруппированные согласно одной спецификации партицирования данные, что и отличает ее от ClusteredDataWriter.

От стратегий реализации записи согласно спецификациям партицирования вернемся к структуре каталогов и, точнее, к параметрам трансформаций.

Второй вывод базируется на том, что на данный момент в Greengage нет возможности смоделировать трансформации, которые пользователи могут задействовать в Iceberg-таблицах.

Вывод такой: в общем случае, с точки зрения подготовки данных для загрузки, Greengage может частично облегчить жизнь классам, отвечающим за запись в Iceberg-таблицы, но полностью подготовить данные, которые бы "легли" в нужную партицию без каких-либо ухищрений (создание колонок, моделирующих трансформации) пока выглядит нереализуемым.

И вот тут нам в помощь выступает та самая фича распределения вставки foreign-таблиц. Redistribute Motion может в случае трансформаций наподобие IDENTITY сгруппировать по нужному ключу (по сути, по значению в колонке), тем самым помочь, например, классу PartitionedFanoutWriter тем, что ему на вход придут уже сгруппированные данные. На выходе есть возможность получить ощутимо меньшее число файлов.

Проиллюстрируем это на примере, но перед этим создадим таблицу только с IDENTITY-трансформацией:

[hdfs-node1:21050] ggdb> CREATE TABLE events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (IDENTITY(subsystem_id))
STORED AS ICEBERG;

Сначала объявим таблицы на стороне Greengage без учета спецификации партицирования на стороне Iceberg:

postgres=# CREATE FOREIGN TABLE events_ice_ft(event_id INT,  event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT)
SERVER iceberg_server OPTIONS (
    catalog_impl 'org.apache.iceberg.hive.HiveCatalog',
    catalog_uri 'thrift://hdfs-node1:9083',
    warehouse_location '/test-warehouse',
    resource 'ggdb.events_ice'
);

postgres=# CREATE TABLE events_ice_local (event_id INT,  event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT) DISTRIBUTED BY (event_id);
CREATE TABLE
postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
             QUERY PLAN
------------------------------------
 Insert on events_ice_ft 
   ->  Seq Scan on events_ice_local 
 Optimizer: Postgres-based planner
(3 rows)

Если посмотреть на план (строки 14 и 15), то можно увидеть вставку с каждого сегмента напрямую, но вставка не будет учитывать ожидаемое распределение на стороне Iceberg.

Сделаем вставку 1 миллиона строк из 8 сегментов, где домен атрибута subsystem_id состоит из 4 уникальных значений. Какая картина будет на HDFS после вставки?

 |---events_ice
 |-----data
 |-------subsystem_id=1
 |---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00001.parquet
 |---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00001.parquet
 |---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00001.parquet
 |---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00001.parquet
 |---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00001.parquet
 |---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00001.parquet
 |---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00001.parquet
 |---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00001.parquet
 |-------subsystem_id=2
 |---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00002.parquet
 |---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00002.parquet
 |---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00002.parquet
 |---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00002.parquet
 |---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00002.parquet
 |---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00002.parquet
 |---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00002.parquet
 |---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00002.parquet
 |-------subsystem_id=3
 |---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00003.parquet
 |---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00003.parquet
 |---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00003.parquet
 |---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00003.parquet
 |---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00003.parquet
 |---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00003.parquet
 |---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00003.parquet
 |---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00003.parquet
 |-------subsystem_id=4
 |---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00004.parquet
 |---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00004.parquet
 |---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00004.parquet
 |---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00004.parquet
 |---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00004.parquet
 |---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00004.parquet
 |---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00004.parquet
 |---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00004.parquet

Для каждого Parquet-файла пятизначный префикс от 00000 до 00007 означает номер сегмента (поле content), который осуществлял вставку в файл. Можно увидеть, что для каждой партиции записью "отметились" все 8 сегментов.

Очистим таблицу-приемник, поменяем настройки перераспределения для foreign-таблицы и повторим вставку:

postgres=# ALTER FOREIGN TABLE events_ice_ft ALTER COLUMN subsystem_id OPTIONS (ADD insert_dist_by_key 'true');
ALTER FOREIGN TABLE

postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
                      QUERY PLAN
------------------------------------------------------
 Insert on events_ice_ft
   ->  Redistribute Motion 8:8  (slice1; segments: 8)
         Hash Key: events_ice_local.subsystem_id
         ->  Seq Scan on events_ice_local
 Optimizer: Postgres-based planner
(5 rows)

Как поменялась ситуация на HDFS:

 |---events_ice
 |-----data
 |-------subsystem_id=1
 |---------00004-1-58954502-1c91-457a-b7bb-2c945e8229c1-00001.parquet
 |-------subsystem_id=2
 |---------00003-1-bf3cf4e5-7507-4f3e-ac0e-52f9932ba495-00001.parquet
 |-------subsystem_id=3
 |---------00007-1-5c2ba531-77a9-4240-b639-37522eaefb37-00001.parquet
 |-------subsystem_id=4
 |---------00000-1-dd1445b5-d5d8-4d04-9248-ff70cfa490fb-00001.parquet

В случае выборки данных, при прочих равных, придется читать меньшее количество файлов, загрузка выполняется быстрее. Да, есть некоторые накладные расходы на узел перераспределения Redistribute Motion, но, похоже, выгода от предварительного агрегирования перевешивает потери на перераспределении.

В дальнейшем, по мере прогресса разработки коннектора и его нагрузочного тестирования, мы поделимся с вами результатами сравнительных тестов.

Однако открытым вопросом остаются "вычислительные" трансформации Iceberg-спецификаций партицирования. Возможным вариантом таких трансформаций является расширение списка опций foreign-таблиц функциями трансформации.

Последним, крайне важным вопросом реализации вставки я бы назвал вопрос организации запроса на commit и фактической фиксации вставки.

Как мы помним, каждая вставка порождает новый файл с данными, новый снимок, новые файлы метаданных. Например, три операции вставки дадут нам следующий набор сущностей.

Связь метаданных и данных в Parquet-файлах
Связь метаданных и данных в Parquet-файлах

Если представить, что сегментов в кластере сотни, как это бывает в больших кластерах, то число снимков во-первых, будет зависеть от числа сегментов (при условии, что каждому достались его строки для вставки). Чем больше таких вставок, тем больше будет снимков в списке в файле метаданных, это приведет к разрастанию метаданных, что в будущем может создать проблемы.

Например, для запроса из примера без использования промежуточной группировки по полю events_ice_local.subsystem_id при вставке будет создано 8 новых снимков (изначально таблица была пуста):

[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM events_ice.snapshots;
+---------------------+-------------------------------+-----------+---------------------+
| snapshot_id         | committed_at                  | operation | parent_id           |
+---------------------+-------------------------------+-----------+---------------------+
| 6936299145864004987 | 2025-11-22 16:29:58.197000000 | append    | NULL                |
| 4635881949092382796 | 2025-11-22 16:29:58.788000000 | append    | 6936299145864004987 |
| 6620167082491361265 | 2025-11-22 16:29:59.370000000 | append    | 4635881949092382796 |
| 9127144169729220665 | 2025-11-22 16:29:59.793000000 | append    | 6620167082491361265 |
| 2335157739194732622 | 2025-11-22 16:30:00.214000000 | append    | 9127144169729220665 |
| 7107683723762847370 | 2025-11-22 16:30:00.554000000 | append    | 2335157739194732622 |
| 8876670218399049190 | 2025-11-22 16:30:00.933000000 | append    | 7107683723762847370 |
| 7213472037626658727 | 2025-11-22 16:30:01.198000000 | append    | 8876670218399049190 |
+---------------------+-------------------------------+-----------+---------------------+

Во-вторых, неприятным моментом является видимость отдельных снимков по мере того, как какие-то сегменты завершили вставку, а какие-то еще в процессе. Да, сама архитектура Iceberg в плане организации работы через снимки подразумевает возможность видеть разные срезы, но в данном случае подобная частичная вставка вместе с созданием снимков под каждую загрузку может быть не совсем тем, что ожидает пользователь.

Добавим искусственную задержку для одного из сегментов при вставке. Остальные семь сегментов закончат вставку раньше. Однако в какой-то момент мы может увидеть не 1 миллион строк, а меньшее количество:

[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM ggdb.events_ice.snapshots;
+---------------------+-------------------------------+-----------+---------------------+
| snapshot_id         | committed_at                  | operation | parent_id           |
+---------------------+-------------------------------+-----------+---------------------+
| 7768755069134084319 | 2025-11-22 16:40:05.673000000 | append    | NULL                |
| 1280842592617033420 | 2025-11-22 16:40:06.563000000 | append    | 7768755069134084319 |
| 3036780713488633888 | 2025-11-22 16:40:07.693000000 | append    | 1280842592617033420 |
| 1477877215760239613 | 2025-11-22 16:40:07.944000000 | append    | 3036780713488633888 |
| 1377726043692182832 | 2025-11-22 16:40:08.324000000 | append    | 1477877215760239613 |
| 8464938146211865619 | 2025-11-22 16:40:08.596000000 | append    | 1377726043692182832 |
| 662259372451030286  | 2025-11-22 16:40:09.017000000 | append    | 8464938146211865619 |
+---------------------+-------------------------------+-----------+---------------------+
Fetched 7 row(s) in 0.15s

[hdfs-node1:21050] ggdb> SELECT COUNT(*) FROM ggdb.events_ice;
+----------+
| count(*) |
+----------+
| 874576   |
+----------+
Fetched 1 row(s) in 0.11s

Коннектор, в целях поддержания свойства атомарности вставки, должен решать и эту проблему. И не исключено, что не без помощи самого PXF…​

На этом на сегодня предлагаю завершить обзор того, что уже вышло в свежем релизе Greengage и находится в процессе активной разработки для выхода в одном из следующих релизов.

Оставайтесь с нами, будет интересно.

Комментарии (0)