Всем привет! Меня зовут Андрей, я работаю дата аналитиком в Data Team продукта Dialog.X5/Insights в X5 Tech. Мы предоставляем аналитику по продажам и покупательскому поведению на данных X5 Group. Для обработки больших объёмов данных в продукте используется СУБД (система управления базами данных) Greenplum.
Greenplum – это распределённая СУБД с массово-параллельной архитектурой (Massive Parallel Processing). Она построена на основе PostgreSQL и хорошо подходит для хранения и обработки больших объёмов данных.
В статье рассмотрим ресурсоёмкую операцию для распределённых системCOUNT(DISTINCT)
и два способа оптимизации. Для предварительного погружения в планы запросов можно прочитать вот эту хорошую статью.
Список определений, используемых в статье:
Сегмент. Сегменты в Greenplum представляют собой экземпляры PostgreSQL. Каждый сегмент представляет собой независимую базу данных PostgreSQL, где хранится часть данных. Сегмент обрабатывает локальные данные, отдавая результаты мастеру. Мастер, в свою очередь, – это сервер, где развёрнут главный экземпляр PostgreSQL, к которому подключаются клиенты, отправляя SQL-запросы. Сами данные хранятся на серверах-сегментах.
Перераспределение данных. Операция в плане запроса (Redistribute Motion), когда каждый сегмент Greenplum повторно хеширует данные и отправляет строки в разные сегменты в соответствии с хеш-ключом.
Распределение таблицы по полю/списку полей. Хранение таблицы на различных сегментах кластера.Конкретный сегмент для хранения записей выбирается на основе хешей, которые рассчитываются по указанным полям.
Структура данных
Таблица чековых фактов:
fct_receipts (
receipt_id - идентификатор чека
, receipt_dttm - дата+время чека
, calendar_dk - числовое представление даты чека например 20240101
, store_id - идентификатор магазина
, plu_id - идентификатор товара
)
Таблица распределена по полю receipt_id достаточно равномерно и партицирована по полю receipt_dttm. Объём данных исчисляется в терабайтах.
Немного о природе данных:
receipt_dttm для чека уникально;
receipt_id относится только к одному магазину;
исходя из вышеперечисленных утверждений, метрика количество чеков аддитивна по времени и по группам магазинов.
Запрос для расчёта количества чеков
Рассмотрим расчёт количества чеков для групп магазинов (это может быть сегментация по регионам, сетям и т. д) и для групп товаров (это может быть сегментация по брендам, производителям и т. д).
На вход запроса подаётся следующий список параметров:
Период (в статье везде указан август 2023 года)
-
Таблица-параметр с группами магазинов selected_stores:
store_group_id – группа магазинов
store_id – магазин
-
Таблица-параметр с группами товаров selected_plu:
plu_group_id – группа товаров
plu_id – товар
Так как таблицы-параметры небольшого размера относительно таблицы чековых фактов, для таблиц-параметров выбран тип распределенияREPLICATED
. Таблицы с распределением REPLICATED
дублируются в полном объёме на всех сегментах кластера и при соединении с нимиJOIN
происходит локально.
Запрос на расчёт количества чеков в разрезе группы магазинов/товаров выглядит следующим образом:
INSERT INTO receipts_cnt_baskets_draft
SELECT
sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id) AS cnt_baskets
FROM fct_receipts AS fcre
INNER JOIN selected_stores AS sest
USING (store_id)
INNER JOIN selected_plu AS sepl
USING (plu_id)
WHERE 1 = 1
AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
AND fcre.receipt_dttm < '2023-09-01 00:00:00'::TIMESTAMP
GROUP BY
GROUPING SETS (
(store_group_id, plu_group_id)
, (store_group_id )
)
;
Немного контекста по запросу:
DISTINCT
нужен, так как разные plu_id одной и той же группы товаров могут встречаться в одном чеке.GROUPING SETS
используется, чтобы не делать несколько обращений к чековой таблице фактов для расчёта разных группировок.
Анализ запроса
На вход запроса подадим параметры:
5 групп магазинов:
Группа магазинов |
Количество магазинов |
1 |
22287 |
2 |
1209 |
3 |
1001 |
4 |
162 |
5 |
14 |
35 групп товаров (для сокращения приведён ТОП-5)
Группа товаров |
Количество товаров |
1 |
25702 |
2 |
65 |
3 |
31 |
4 |
27 |
5 |
26 |
Давайте посмотрим на план запроса, построенный оптимизатором GPORCA:
EXPLAIN ANALYZE
INSERT INTO receipts_cnt_baskets
SELECT
sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id) AS cnt_baskets
-- 1 Часть запроса
FROM fct_receipts AS fcre
INNER JOIN selected_stores AS sest
USING (store_id)
INNER JOIN selected_plu AS sepl
USING (plu_id)
WHERE 1 = 1
AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
AND fcre.receipt_dttm < '2023-09-01 00:00:00'::TIMESTAMP
-- 2 часть запроса
GROUP BY
GROUPING SETS (
(store_group_id, plu_group_id)
, (store_group_id )
)
;
Упрощённый план запроса. Комментарии к узлам плана пронумерованы.
Читаем снизу вверх:
1 Часть плана (Получение данных)
Итого:
Данные подготовлены и лежат на каждом сегменте
по ключу распределения fct_receipts
Shared Scan (share slice:id 4:0)
3) Соединения с таблицами-параметрами (JOIN локальный)
-> Hash Join
Hash Cond: (fct_receipts.plu_id = selected_plu.plu_id)
-> Hash Join
Hash Cond: (fct_receipts.store_id = selected_stores.store_id)
2) Выборка 1 партиции согласно условию по датам
-> Partition Selector for fct_receipts
Partitions selected: 1
1) Хэширование таблиц параметров
-> Hash
-> Seq Scan on selected_stores
-> Hash
-> Seq Scan on selected_plu
2 Часть плана - расчет COUNT(DISTINCT receipt_id)
Объединение результатов
-> Append
Ключ группировки (store_group_id)
3) COUNT(receipt_id)
-> HashAggregate
Group Key: share0_ref2.store_group_id
2) DISTINCT ключ группировки + receipt_id
-> HashAggregate
Group Key: share0_ref2.store_group_id, share0_ref2.receipt_id
1) Перераспределение данных по ключу группировки
-> Redistribute Motion
Hash Key: share0_ref2.store_group_id
Считывание данных из 1 части плана
-> Shared Scan (share slice:id 1:0)
Ключ группировки (store_group_id, plu_group_id)
3) COUNT(receipt_id)
-> HashAggregate
Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
2) DISTINCT ключ группировки + receipt_id
-> HashAggregate
Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
1) Перераспределение данных по ключу группировки
-> Redistribute Motion
Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
Считывание данных из 1 части плана
-> Shared Scan (share slice:id 2:0)
Итого, судя по плану запроса, расчёт количества чеков выполняется в три шага:
Перераспределение данных по ключу группировки.
DISTINCT
ключ группировки + receipt_id.COUNT(receipt_id).
Переданные в запрос группы товаров и группы магазинов явно не равномерны. После перераспределения данных (шаг 1) на одном или нескольких сегментах может оказаться слишком много данных, возникнет так называемый перекос. Соответственно, некоторые сегменты будут более нагружены, и выполнение запроса будет упираться в обработку данных на этих сегментах.
Чтобы посмотреть, сколько строк пришло на сегмент или сколько строк обработал сегмент, можно включить параметрSET gp_enable_explain_allstat = ON
; передEXPLAIN ANALYZE
.
Тогда в плане появится дополнительная информация под каждым узлом:
Путём нехитрого парсинга можно получить список сегментов. В элементах списка последнее значение будет равным количеству строк, обработанных сегментом.
Приведена часть списка сегментов:
Ключ группировки распределился по 58 сегментам, виден явный перекос на одном из сегментов. На 179 сегмент поступило около 269 млн строк, а на 129 сегмент поступило около 31 млн. На 179 сегмент поступило в 9 раз больше строк, а если сравнивать с другими сегментами, то разница будет ещё более ощутимой.
Вышеуказанный запрос выполняется около одной минуты на периоде 1 месяц, в зависимости от нагрузки на кластере.
Оптимизация запроса
Рассмотрим пару вариантов оптимизации такого запроса.
1 вариант. Использование параметра.
Для текущей версии нашего кластера параметр optimizer_force_multistage_agg
установлен в off
. Значение от версии к версии может изменяться. Для просмотра значения параметра можно воспользоваться командой:
SHOW optimizer_force_multistage_agg;
В документации написано, что данный параметр указывает оптимизатору GPORCA
на выбор многоступенчатого агрегатного плана для операций типа COUNT(DISTINCT)
.
Когда это значение выключено (по умолчанию), оптимизатор GPORCA
выбирает между одноэтапным и двухэтапным агрегированным планом в зависимости от стоимости SQL
-запроса.
Включаем параметр SET optimizer_force_multistage_agg = on
;
Приказываем оптимизатору выбирать двухэтапный агрегированный план.
План на примере ключа группировки (store_group_id, plu_group_id):
Ключ группировки (year_granularity, store_group_id, plu_group_id)
4) COUNT(receipt_id)
-> HashAggregate
Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
3) Перераспределение данных по ключу группировки
-> Redistribute Motion
Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
2) DISTINCT ключ группировки + receipt_id
-> HashAggregate
Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
1) Перераспределение данных по ключу группировки + receipt_id, receipt_id
-> Redistribute Motion
Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id, share0_ref3.receipt_id
-> Shared Scan (share slice:id 3:0)
В данном плане расчёт количества чеков выполняется в четыре шага:
Перераспределение по ключу группировки + receipt_id.
Это уменьшает перекос, так как количество уникальных значений receipt_id велико.DISTINCT
по ключу группировки + receipt_id.
Это уменьшает количество данных для следующего оператора перераспределения.Перераспределение по ключу группировки.
COUNT(receipt_id)
.
В этом плане двухэтапность выражается в шагах 1, 2. Происходит дополнительное перераспределение иDISTINCT
по ключу группировки + receipt_id.
Вышеуказанный запрос отрабатывает от 3,5 до 4,5 раз быстрее, в зависимости от нагрузки на кластере.
Не стоит включать параметр на всю базу – это может изменить поведение других запросов. Локально на уровне сессии можно ускорить проблемный запрос, а далее вернуть значение параметра в исходное состояние командойRESET
.
Вывод
При использовании параметра (хинта) появляется дополнительное перераспределение с более оптимальным ключом распределения, в расчёте начинают участвовать все сегменты кластера. Это повышает нагрузку на сеть в кластере, но кратно уменьшает время исполнения запроса.
2 вариант. Алгоритмический, расширение ключа группировки.
Метрика количество чеков аддитивна по времени. Можно посчитать количество чеков по дням и потом сделать доагрегацию. Добавление в ключ группировки дня позволит увеличить количество ключей группировки до 30 раз. Если равномерности данных в ключе всё ещё не будет хватать, можно задействовать другие поля таблицы, отвечающие аддитивности метрики количества чеков.
Переписанный запрос:
INSERT INTO receipts_cnt_baskets
WITH draft AS (
SELECT
sest.store_group_id
, fcre.calendar_dk
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id) AS cnt_baskets
FROM fct_receipts AS fcre
INNER JOIN selected_stores AS sest
USING (store_id)
INNER JOIN selected_plu AS sepl
USING (plu_id)
WHERE 1 = 1
AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
AND fcre.receipt_dttm <= '2023-09-01 00:00:00'::TIMESTAMP
GROUP BY
GROUPING SETS (
(store_group_id, calendar_dk, plu_group_id)
, (store_group_id, calendar_dk )
)
)
SELECT
store_group_id
, plu_group_id
, SUM(cnt_baskets)
FROM draft
GROUP BY
store_group_id
, plu_group_id
;
Для данного запроса оптимизатор выбрал план, как и в начале статьи (на примере ключа группировки (store_group_id, calendar_dk, plu_group_id)):
3) COUNT(receipt_id)
-> HashAggregate
Group Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id
2) DISTINCT ключ группировки + receipt_id
-> HashAggregate
Group Key: share1_ref3.store_group_id,
share1_ref3.calendar_dk, share1_ref3.plu_group_id, share1_ref3.receipt_id
1) Перераспределение данных по ключу группировки
-> Redistribute Motion
Hash Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id
-> Shared Scan (share slice:id 2:1)
Существенного перекоса не возникает, так как расширенный ключ группировки задействует все сегменты кластера, и нагрузка становится более равномерной.
Данный запрос выполняется от 7 до 9 раз быстрее по сравнению с изначальным запросом без хинта, в зависимости от нагрузки на кластере.
Если предположить, что каждый оператор перераспределения перемещает 100% строк, то в данном запросе происходит меньшее по объёму перераспределение данных в сравнении с запросом с хинтом, и нагрузка на сеть уменьшается до двух раз.
Вывод
Расширяя ключ группировки за счет аддитивности метрики “количество чеков” по времени, мы уменьшаем перекос в данных в ключе группировки и задействуем все сегменты кластера для расчёта.
Итоги
При разработке важно понимать природу данных. Хороший алгоритм в большинстве случаев лучше, чем использование параметров оптимизатора.
Если ключ группировки отличается от ключа распределения таблиц, то операция группировки ведёт к перераспределению данных на кластере. В данных ключа группировки может быть перекос, и важно уметь диагностировать такие случаи.
Малое количество ключей группировки ведёт к неполному задействованию сегментов кластера. Расширяя ключ группировки, можно увеличить использование сегментов, чтобы снизить время расчёта
Мы погрузились немного в мир Greenplum, рассмотрели, как СУБД выполняет запросы. Узнали о перекосе и методах борьбы с данным явлением. Надеюсь, было полезно и интересно.
Хочу поблагодарить за консультации Даниила Недумова и за помощь в подготовке статьи – Антона Денисова.
os9
Поясните пож-та, что значит "receipt_dttm для чека уникально" - то есть для всех товаров в одном чеке время одинаково? Также по "receipt_id относится только к одному магазину" - все товары в одном чеке относятся к одному магазину?
Я про greenplum ничего не знаю) Перераспределение внутри запроса - это считается нормальная практика? Терабайты гоняются по сети каждый раз при запросе, не выглядит оптимальным.
А если другой человек другой запрос выполнит, снова перераспределение? А если это параллельно.
realnumber3012 Автор
Добрый день
"receipt_dttm для чека уникально" - то есть для всех товаров в одном чеке время одинаково? Да, одно значение даты времени для одного чека
Также по "receipt_id относится только к одному магазину" - это означает, что значения receipt_id не пересекается в рамках нескольких магазинов. По сути (receipt_id, plu_id) - PK
Перераспределение внутри запроса - это считается нормальная практика? Если перераспределение не удается избежать делается перераспределение.
Практика нормальная Greenplum это про OLAP нагрузку и количество активных пользователей обычно ограничено.