Всем привет! Меня зовут Андрей, я работаю дата аналитиком в Data Team продукта Dialog.X5/Insights в X5 Tech. Мы предоставляем аналитику по продажам и покупательскому поведению на данных X5 Group.  Для обработки больших объёмов данных в продукте используется  СУБД (система управления базами данных) Greenplum.

Greenplum – это распределённая СУБД с массово-параллельной архитектурой (Massive Parallel Processing). Она построена на основе PostgreSQL и хорошо подходит для хранения и обработки больших объёмов данных.

В статье рассмотрим ресурсоёмкую операцию для распределённых системCOUNT(DISTINCT)и два способа оптимизации. Для предварительного погружения в планы запросов можно прочитать вот эту хорошую статью.

Список определений, используемых в статье:

Сегмент. Сегменты в Greenplum представляют собой экземпляры PostgreSQL. Каждый сегмент представляет собой независимую базу данных PostgreSQL, где хранится часть данных. Сегмент обрабатывает локальные данные, отдавая результаты мастеру. Мастер, в свою очередь, – это сервер, где развёрнут главный экземпляр PostgreSQL, к которому подключаются клиенты, отправляя SQL-запросы. Сами данные хранятся на серверах-сегментах.

Перераспределение данных. Операция в плане запроса (Redistribute Motion), когда каждый сегмент Greenplum повторно хеширует данные и отправляет строки в разные сегменты в соответствии с хеш-ключом.

Распределение таблицы по полю/списку полей. Хранение таблицы на различных сегментах кластера.Конкретный сегмент для хранения записей выбирается на основе хешей, которые рассчитываются по указанным полям.

Структура данных

Таблица чековых фактов:

fct_receipts (
  receipt_id    - идентификатор чека 
, receipt_dttm  - дата+время чека
, calendar_dk   - числовое представление даты чека например 20240101
, store_id      - идентификатор магазина
, plu_id        - идентификатор товара
)

Таблица распределена по полю receipt_id достаточно равномерно и партицирована по полю receipt_dttm. Объём данных исчисляется в терабайтах.

Немного о природе данных:

  • receipt_dttm для чека уникально;

  • receipt_id относится только к одному магазину;

  • исходя из вышеперечисленных утверждений, метрика количество чеков аддитивна по времени и по группам магазинов.

Запрос для расчёта количества чеков

Рассмотрим расчёт количества чеков для групп магазинов (это может быть сегментация по регионам, сетям и т. д) и для групп товаров (это может быть сегментация по брендам, производителям и т. д).

На вход запроса подаётся следующий список параметров:

  • Период (в статье везде указан август 2023 года)

  • Таблица-параметр с группами магазинов selected_stores:

    • store_group_id – группа магазинов

    • store_id – магазин

  • Таблица-параметр с группами товаров selected_plu:

    • plu_group_id – группа товаров

    • plu_id – товар

Так как таблицы-параметры небольшого размера относительно таблицы чековых фактов, для таблиц-параметров выбран тип распределенияREPLICATED. Таблицы с распределением REPLICATED дублируются в полном объёме на всех сегментах кластера и при соединении с нимиJOIN происходит локально.

Запрос на расчёт количества чеков в разрезе группы магазинов/товаров выглядит следующим образом:

INSERT INTO receipts_cnt_baskets_draft
SELECT
  sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
FROM fct_receipts            AS fcre
  INNER JOIN selected_stores AS sest
    USING (store_id)
  INNER JOIN selected_plu    AS sepl
    USING (plu_id)
WHERE 1 = 1
  AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
  AND fcre.receipt_dttm <  '2023-09-01 00:00:00'::TIMESTAMP
GROUP BY
  GROUPING SETS (
    (store_group_id, plu_group_id)
  , (store_group_id              )
  )
;

Немного контекста по запросу:

  • DISTINCT нужен, так как разные plu_id одной и той же группы товаров могут встречаться в одном чеке.

  • GROUPING SETS используется, чтобы не делать несколько обращений к чековой таблице фактов для расчёта разных группировок.

Анализ запроса

На вход запроса подадим параметры:

5 групп магазинов:

Группа магазинов

Количество магазинов

1

22287

2

1209

3

1001

4

162

5

14

35 групп товаров (для сокращения приведён ТОП-5)

Группа товаров

Количество товаров

1

25702

2

65

3

31

4

27

5

26

Давайте посмотрим на план запроса, построенный оптимизатором GPORCA:

EXPLAIN ANALYZE
INSERT INTO receipts_cnt_baskets
SELECT
  sest.store_group_id
, COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
, COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
-- 1 Часть запроса
FROM fct_receipts            AS fcre
  INNER JOIN selected_stores AS sest
    USING (store_id)
  INNER JOIN selected_plu    AS sepl
    USING (plu_id)
WHERE 1 = 1
  AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
  AND fcre.receipt_dttm <  '2023-09-01 00:00:00'::TIMESTAMP
-- 2 часть запроса
GROUP BY
  GROUPING SETS (
    (store_group_id, plu_group_id)
  , (store_group_id              )
  )
;

Упрощённый план запроса.  Комментарии к узлам плана пронумерованы.
Читаем снизу вверх:

1 Часть плана (Получение данных)

Итого:
Данные подготовлены и лежат на каждом сегменте 
по ключу распределения fct_receipts

Shared Scan (share slice:id 4:0)
 3) Соединения с таблицами-параметрами (JOIN локальный)
->  Hash Join
    Hash Cond: (fct_receipts.plu_id = selected_plu.plu_id)

->  Hash Join
    Hash Cond: (fct_receipts.store_id = selected_stores.store_id)

    2) Выборка 1 партиции согласно условию по датам
->  Partition Selector for fct_receipts
       Partitions selected: 1

    1) Хэширование таблиц параметров
->  Hash
    ->  Seq Scan on selected_stores 
->  Hash 
    ->  Seq Scan on selected_plu
2 Часть плана - расчет COUNT(DISTINCT receipt_id)

    Объединение результатов
->  Append

Ключ группировки (store_group_id)
    3) COUNT(receipt_id)
    ->  HashAggregate
      Group Key: share0_ref2.store_group_id
      
    2)  DISTINCT ключ группировки + receipt_id
    ->  HashAggregate
        Group Key: share0_ref2.store_group_id, share0_ref2.receipt_id
          
      1) Перераспределение данных по ключу группировки
      ->  Redistribute Motion
          Hash Key: share0_ref2.store_group_id
		  Считывание данных из 1 части плана
            ->  Shared Scan (share slice:id 1:0)

  
Ключ группировки (store_group_id, plu_group_id)  
  3) COUNT(receipt_id)
  ->  HashAggregate
      Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
      
     2) DISTINCT ключ группировки + receipt_id
     ->  HashAggregate
          Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
        
        1) Перераспределение данных по ключу группировки
        ->  Redistribute Motion
            Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
            Считывание данных из 1 части плана
            ->  Shared Scan (share slice:id 2:0) 

Итого, судя по плану запроса, расчёт количества чеков выполняется в три шага:

  1. Перераспределение данных по ключу группировки.

  2. DISTINCT ключ группировки + receipt_id.

  3. COUNT(receipt_id).

Переданные в запрос группы товаров и группы магазинов явно не равномерны. После перераспределения данных (шаг 1) на одном или нескольких сегментах может оказаться слишком много данных, возникнет так называемый перекос. Соответственно, некоторые сегменты будут более нагружены, и выполнение запроса будет упираться в обработку данных на этих сегментах.

Чтобы посмотреть, сколько строк пришло на сегмент или сколько строк обработал сегмент, можно включить параметрSET gp_enable_explain_allstat = ON; передEXPLAIN ANALYZE.

Тогда в плане появится дополнительная информация под каждым узлом:

Путём нехитрого парсинга можно получить список сегментов. В элементах списка последнее значение будет равным количеству строк, обработанных сегментом.

Приведена часть списка сегментов:

Ключ группировки распределился по 58 сегментам, виден явный перекос на одном из сегментов. На 179 сегмент поступило около 269 млн строк, а на 129 сегмент поступило около 31 млн. На 179 сегмент поступило в 9 раз больше строк, а если сравнивать с другими сегментами, то разница будет ещё более ощутимой.

Вышеуказанный запрос выполняется около одной минуты на периоде 1 месяц, в зависимости от нагрузки на кластере.

Оптимизация запроса

Рассмотрим пару вариантов оптимизации такого запроса.

1 вариант. Использование параметра.

Для текущей версии нашего кластера параметр optimizer_force_multistage_agg установлен в off. Значение от версии к версии может изменяться. Для просмотра значения параметра можно воспользоваться командой:

SHOW optimizer_force_multistage_agg;

В документации написано, что данный параметр указывает оптимизатору GPORCA на выбор многоступенчатого агрегатного плана для операций типа COUNT(DISTINCT).
Когда это значение выключено (по умолчанию), оптимизатор GPORCA выбирает между одноэтапным и двухэтапным агрегированным планом в зависимости от стоимости SQL-запроса.

Включаем параметр SET optimizer_force_multistage_agg = on;
Приказываем оптимизатору выбирать двухэтапный агрегированный план.

План на примере ключа группировки (store_group_id, plu_group_id):

Ключ группировки (year_granularity, store_group_id, plu_group_id)

4) COUNT(receipt_id)
->  HashAggregate
    Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
	
    3) Перераспределение данных по ключу группировки
    ->  Redistribute Motion 
        Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id
		
        2) DISTINCT ключ группировки + receipt_id
        ->  HashAggregate
            Group Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id
			
	      1) Перераспределение данных по ключу группировки + receipt_id, receipt_id
            ->  Redistribute Motion
		        Hash Key: share0_ref3.store_group_id, share0_ref3.plu_group_id, share0_ref3.receipt_id, share0_ref3.receipt_id
                ->  Shared Scan (share slice:id 3:0)

В данном плане расчёт количества чеков выполняется в четыре шага:

  1. Перераспределение по ключу группировки + receipt_id.
    Это уменьшает перекос, так как количество уникальных значений receipt_id велико.

  2. DISTINCT по ключу группировки + receipt_id.
    Это уменьшает количество данных для следующего оператора перераспределения.

  3. Перераспределение по ключу группировки.

  4. COUNT(receipt_id).

В этом плане двухэтапность выражается в шагах 1, 2. Происходит дополнительное перераспределение иDISTINCTпо ключу группировки + receipt_id.

Вышеуказанный запрос отрабатывает от 3,5 до 4,5 раз быстрее, в зависимости от нагрузки на кластере.

Не стоит включать параметр на всю базу – это может изменить поведение других запросов. Локально на уровне сессии можно ускорить проблемный запрос, а далее вернуть значение параметра в исходное состояние командойRESET.

Вывод

При использовании параметра (хинта) появляется дополнительное перераспределение с более оптимальным ключом распределения, в расчёте начинают участвовать все сегменты кластера. Это повышает нагрузку на сеть в кластере, но кратно уменьшает время исполнения запроса.

2 вариант. Алгоритмический, расширение ключа группировки.

Метрика количество чеков аддитивна по времени. Можно посчитать количество чеков по дням и потом сделать доагрегацию. Добавление в ключ группировки дня позволит увеличить количество ключей группировки до 30 раз. Если равномерности данных в ключе всё ещё не будет хватать, можно задействовать другие поля таблицы, отвечающие аддитивности метрики количества чеков.

Переписанный запрос:

INSERT INTO receipts_cnt_baskets
  WITH draft AS (
    SELECT
      sest.store_group_id
    , fcre.calendar_dk
    , COALESCE(sepl.plu_group_id, 0::INT4) AS plu_group_id
    , COUNT(DISTINCT fcre.receipt_id)      AS cnt_baskets
    FROM fct_receipts            AS fcre
      INNER JOIN selected_stores AS sest
        USING (store_id)
      INNER JOIN selected_plu    AS sepl
        USING (plu_id)
    WHERE 1 = 1
      AND fcre.receipt_dttm >= '2023-08-01 00:00:00'::TIMESTAMP
      AND fcre.receipt_dttm <= '2023-09-01 00:00:00'::TIMESTAMP
   GROUP BY
     GROUPING SETS (
       (store_group_id, calendar_dk, plu_group_id)
     , (store_group_id, calendar_dk              )
  )
)
SELECT
  store_group_id
, plu_group_id
, SUM(cnt_baskets)
FROM draft
GROUP BY
  store_group_id
, plu_group_id
;

Для данного запроса оптимизатор выбрал план, как и в начале статьи (на примере ключа группировки (store_group_id, calendar_dk, plu_group_id)):

3) COUNT(receipt_id)
->  HashAggregate
    Group Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id

    2) DISTINCT ключ группировки + receipt_id
    ->  HashAggregate
        Group Key: share1_ref3.store_group_id,
share1_ref3.calendar_dk, share1_ref3.plu_group_id, share1_ref3.receipt_id

        1) Перераспределение данных по ключу группировки
        ->  Redistribute Motion
            Hash Key: share1_ref3.store_group_id, share1_ref3.calendar_dk, share1_ref3.plu_group_id
            ->  Shared Scan (share slice:id 2:1)

Существенного перекоса не возникает, так как расширенный ключ группировки задействует все сегменты кластера, и нагрузка становится более равномерной.

Данный запрос выполняется от 7 до 9 раз быстрее по сравнению с изначальным запросом без хинта, в зависимости от нагрузки на кластере. 

Если предположить, что каждый оператор перераспределения перемещает 100% строк, то в данном запросе происходит меньшее по объёму перераспределение данных в сравнении с запросом с хинтом, и нагрузка на сеть уменьшается до двух раз.

Вывод

Расширяя ключ группировки за счет аддитивности метрики “количество чеков” по времени, мы уменьшаем перекос в данных в ключе группировки и задействуем все сегменты кластера для расчёта.

Итоги

  • При разработке важно понимать природу данных. Хороший алгоритм в большинстве случаев лучше, чем использование параметров оптимизатора.

  • Если ключ группировки отличается от ключа распределения таблиц, то операция группировки ведёт к перераспределению данных на кластере. В данных ключа группировки может быть перекос, и важно уметь диагностировать такие случаи.

  • Малое количество ключей группировки ведёт к неполному задействованию сегментов кластера. Расширяя ключ группировки, можно увеличить использование сегментов, чтобы снизить время расчёта

Мы погрузились немного в мир Greenplum, рассмотрели, как СУБД выполняет запросы. Узнали о перекосе и методах борьбы с данным явлением. Надеюсь, было полезно и интересно.

Хочу поблагодарить за консультации Даниила Недумова и за помощь в подготовке статьи – Антона Денисова.

Комментарии (2)


  1. os9
    17.10.2024 13:26

    1. Поясните пож-та, что значит "receipt_dttm для чека уникально" - то есть для всех товаров в одном чеке время одинаково? Также по "receipt_id относится только к одному магазину" - все товары в одном чеке относятся к одному магазину?

    2. Я про greenplum ничего не знаю) Перераспределение внутри запроса - это считается нормальная практика? Терабайты гоняются по сети каждый раз при запросе, не выглядит оптимальным.
      А если другой человек другой запрос выполнит, снова перераспределение? А если это параллельно.


    1. realnumber3012 Автор
      17.10.2024 13:26

      Добрый день

      1. "receipt_dttm для чека уникально" - то есть для всех товаров в одном чеке время одинаково? Да, одно значение даты времени для одного чека

      Также по "receipt_id относится только к одному магазину" - это означает, что значения receipt_id не пересекается в рамках нескольких магазинов. По сути (receipt_id, plu_id) - PK

      1. Перераспределение внутри запроса - это считается нормальная практика? Если перераспределение не удается избежать делается перераспределение.

      Практика нормальная Greenplum это про OLAP нагрузку и количество активных пользователей обычно ограничено.