В прошлой статье мини-цикла о работе с агрегатами я рассказывал, как организовать эффективное многопоточное преобразование потока первичных данных в данные агрегированные. Там мы рассматривали задачу "свертки" продаж в агрегаты вида товар/дата/кол-во.

Сегодня мы рассмотрим более сложный вариант, который зачастую начинается со слов "А заказчик захотел…" и приводит нас к иерархичным агрегатам в нескольких одновременных разрезах, которые позволяют нам в СБИС практически мгновенно строить оперативные отчеты в подсистемах организации торговли, бухгалтерского учета и даже управления активными продажами.

Бизнес-требования

И швец, и жнец, и на дуде игрец...
И швец, и жнец, и на дуде игрец...
  • уметь быстро получить информацию не только по товарам, но и по складам

  • в том числе и сводка-TOP продаж товаров на интервале

  • в том числе с фильтром по складу... или без

  • а еще график динамики продаж за месяц по дням... и за год по месяцам... и за все время по годам

  • ... и с любым из фильтров склад/товар

  • ... и чтобы все быстро работало!

Итак, вычленяем ключевое для нас относительно предыдущей задачи:

  • появляется новый разрез агрегации - по складу

  • необходима агрегация "без учета" или "по всем" - например, сводные продажи всех товаров склада или продажи товара по всем складам

  • нужны агрегаты для динамики (дневные/месячные/годовые) в разрезе любого фильтра

Структура новых агрегатов

Добавим в таблицу агрегатов новое поле - тип интервала агрегации (D/M/Y) и новый аналитический разрез - склад.

CREATE TABLE agg(
  it    -- товар
    integer
, wh    -- склад
    integer
, dt    -- дата продажи/начала кванта
    date
, quant -- тип кванта D/M/Y
    "char"
, qty   -- количество
    double precision
);

Замечу, что тут для экономии размера данных мы использовали однобайтный спецтип "char". Например, такой тип имеет поле relkind (тип объекта) в системной таблице pg_class.

Неудобный NULL и удобный ноль

Для аналитики "по всем" используем значение = 0 (не NULL) соответствующего разреза. Это позволит нам всегда передавать значения в один и тот же запрос, без изменения его модели на … IS [NOT] NULL.

То есть для запроса "какие товары продавались лучше всего в таком-то месяце" будем использовать запрос вида:

SELECT
  it
, qty
FROM
  agg
WHERE
  wh = $1::integer AND -- передадим 0 для разреза "по всем складам"
  (quant, dt) = ($2::"char", $3::date) -- передадим 'M' для обращения к "месячному" агрегату
ORDER BY
  qty DESC
LIMIT ...;

А если нам понадобится этот же рейтинг по конкретному складу, то просто передадим сюда же ID склада! Очевидно, для такого запроса подходящим будет индекс (quant, dt, wh, qty DESC).

В результате, за единственный Index Scan мы получим сразу все, что хотим, без какой-либо динамической агрегации на моменте получения данных.

Динамика в разрезе фильтра

Давайте теперь сконструируем запрос, который поможет нам нарисовать красивый график по динамике:

SELECT
  dt
, qty
FROM
  agg
WHERE
  (quant, it, wh) = ($1::"char", $2::integer, $3::integer) AND
  dt BETWEEN $4::date AND $5::date -- период графика
ORDER BY
  dt;

Индекс под него - (quant, it, wh, dt). Почему на первом месте именно quant? Потому что у него очень маленькая селективность, и индекс будет занимать меньше места.

Сборка агрегатов

Итак, вернемся к предыдущей статье и проблемам, которые мы успешно решали там - разделению кросс-блокировок между параллельно работающими потоками. В нынешней ситуации, разделив обработку по ключу (it, wh), мы заведомо устраним конфликты между обработчиками.

Но возникнет небольшая проблема - давайте посмотрим, как именно эффективнее всего добиться формирования агрегатов:

При проходе по курсору над flow-таблицей мы формируем в памяти "дифф" для инкремента записей соответствующих агрегатов по обрабатываемому этим потоком ключу (it, wh) - сразу для каждого из типов интервалов.

Вместе с этим мы вставляем во flow новую "первичку" для последующих "надагрегатов", заменяя нулем каждый из вариантов разрезов анализа.

Задача: "Вскипятить воду в чайнике"

И физик и математик: налить воду в чайник, зажечь плиту, поставить чайник на огонь и подогреть до 100*С.

Новая задача: "Вскипятить воду в чайнике. Чайник уже налит, огонь горит"

Физик: поставить чайник на огонь и подогреть.

Математик: выливаем воду из чайника на плиту - чайник пуст, огонь не горит - задача сведена к предыдущей!

(c) народный анекдот

Понятно, что при последующей обработке такого ключа, содержащего хотя бы один ноль, записи "надагрегатов" формировать уже не нужно.

Обходим блокировки

Единственная точка, в которой у нас могут пересечься два параллельно работающих потока, это формирование/обновление записи queue, соответствующей этому ключу "надагрегата" - например, (0, 0).

К счастью, это достаточно просто обходится вставкой новой записи в queue, если pg_try_advisory_xact_lock(it, wh) для такого ключа вернула нам FALSE. То есть да, в разрезе ключа распределения записи в очереди могут быть неуникальны. Но в этом нет ничего страшного, потому что они всего лишь выполняют функцию сигнализатора "во flow что-то может быть по этому ключу". И если нет - не страшно, при обработке этой записи очереди мы заглянем во flow, ничего не найдем, и спокойно завершим обработку.


Итого - мы получили в БД все нужные агрегаты во всех требуемых разрезах, которые помогут нам обеспечить быстрый показ отчета/графика в любой комбинации фильтров: