В этой статье мы посмотрим, как в двух системах создавать пользовательские агрегатные и оконные (в терминологии Oracle — аналитические) функции. Несмотря на различия в синтаксисе и в целом в подходе к расширяемости, механизм этих функций очень похож. Но и различия тоже имеются.
Надо признать, что собственные агрегатные и оконные функции встречается довольно редко. Оконные функции вообще по каким-то причинам традиционно относят к разряду «продвинутого» SQL и считают сложными для понимания и освоения. Тут бы разобраться с теми функциями, которые уже имеются в СУБД!
Зачем тогда вообще вникать в этот вопрос? Могу назвать несколько причин:
- Хотя оконные функции объективно сложнее обычных агрегатных, но ничего запредельного в них нет; это абсолютно необходимый инструмент для SQL-разработчика. А создание собственной оконной функции, даже совсем простой, позволяет лучше разобраться с тем, как работают стандартные.
- Оконные и агрегатные функции — прекрасный способ совместить процедурную обработку с декларативной логикой. В некоторых ситуациях получается выполнить сложные действия, оставаясь в рамках парадигмы решения задачи одним SQL-запросом.
- Да и просто интересная тема, а уж тем более интересно сравнить две системы.
Пример, на котором будем тренироваться — подсчет среднего, аналог стандартной функции avg для типа numeric (number в Oracle). Мы напишем такую функцию и посмотрим, как она работает в агрегатном и оконном режимах и может ли она вычисляться несколькими параллельными процессами. А в заключение поглядим на пример из реальной жизни.
Агрегатные функции
Будем двигаться от простого к сложному, переключаясь между PostgreSQL и Oracle.
Вначале некоторые общие соображения. Любая агрегатная функция вызывается для каждой строки таблицы по очереди и в конечном итоге обрабатывает их все. Между вызовами ей требуется сохранять внутреннее состояние, определяющее контекст ее выполнения. В конце работы она должна вернуть итоговое значение.
Итак, нам потребуется четыре составляющие:
- Состояние (контекст),
- Функция обработки очередной строки,
- Функция выдачи итогового результата,
- Указание на то, что предыдущие три пункта составляют агрегатную функцию.
PostgreSQL
Для хранения состояния нужно выбрать подходящий тип данных. Можно взять стандартный, а можно определить свой. Для функции, вычисляющей среднее, нужно отдельно суммировать значения и отдельно подсчитывать их количество. Поэтому создадим свой составной тип с двумя полями:
CREATE TYPE average_state AS (
accum numeric,
qty numeric
);
Теперь определим функцию для обработки очередного значения. В PostgreSQL она называется функцией перехода:
CREATE OR REPLACE FUNCTION average_transition(
state average_state,
val numeric
) RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) + %', state.accum, state.qty, val;
RETURN ROW(state.accum+val, state.qty+1)::average_state;
END;
$$ LANGUAGE plpgsql;
Функция принимает текущее состояние и очередное значение, а возвращает новое состояние: значения суммируются, а к количеству добавляется единица.
Кроме этого, мы выводим (RAISE NOTICE) параметры функции — это позволит нам увидеть, как выполняется работа. Старый добрый отладочный PRINT, нет ничего тебя лучше.
Следующая функция — возвращение финального значения:
CREATE OR REPLACE FUNCTION average_final(
state average_state
) RETURNS numeric AS $$
BEGIN
RAISE NOTICE '= %(%)', state.accum, state.qty;
RETURN CASE WHEN state.qty > 0 THEN
trim(trailing '0' from (state.accum/state.qty)::text)::numeric
END;
END;
$$ LANGUAGE plpgsql;
Функция принимает состояние и возвращает результирующее число. Для этого просто делим накопленную сумму на количество. Но при нулевом количестве возвращаем NULL (так поступает и avg).
«Финт ушами» с функцией trim нужен исключительно для аккуратности вывода: таким образом мы избавляемся от незначащих нулей, которые иначе будут загромождать экран и мешать восприятию. Примерно вот так:
SELECT 1::numeric / 2::numeric;
?column?
------------------------
0.50000000000000000000
(1 row)
В реальной жизни эти фокусы, конечно, не нужны.
И, наконец, определяем собственно агрегатную функцию. Для этого используется специальная команда CREATE AGGREGATE:
CREATE AGGREGATE average(numeric) (
sfunc = average_transition,
stype = average_state,
finalfunc = average_final,
initcond = '(0,0)'
);
В этой команде указывается тип данных для состояния (stype), две наши функции (sfunc и finalfunc) и еще начальное значение состояния (initcond) в виде строковой константы.
Можно пробовать. Почти все примеры в этой статье будут использовать простую таблицу с пятью строками: раз, два, три, четыре, пять. Таблицу создаем на лету функцией generate_series, незаменимым помощником генерации тестовых данных:
SELECT average(g.x) FROM generate_series(1,5) AS g(x);
NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: 3(2) + 3
NOTICE: 6(3) + 4
NOTICE: 10(4) + 5
NOTICE: = 15(5)
average
---------
3
(1 row)
Результат верный, а вывод функций позволяет проследить ход выполнения:
- Состояние было установлено в (0,0),
- Функция average_transition последовательно вызывалась пять раз, постепенно изменяя состояние,
- В конце была вызвана функция average_final, которая и получила 3 = 15/5.
Еще одна проверка — на пустом множестве:
SELECT average(g.x) FROM generate_series(1,0) AS g(x);
NOTICE: = 0(0)
average
---------
(1 row)
Oracle
В Oracle вся расширяемость обеспечивается механизмом Data Cartridge. Говоря по-простому, нам потребуется создать объектный тип, реализующий необходимый для агрегации интерфейс. Контекст естественным образом представляется атрибутами этого объекта.
CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty number,
STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number
RETURN number,
MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number
);
/
Начальное значение контекста определяется здесь не константой, а отдельной (статической, то есть не привязанной к конкретному экземпляру объекта) функцией ODCIAggregateInitialize.
Функция, вызываемая для каждой строки — это ODCIAggregateIterate.
Результат возвращает функция ODCIAggregateTerminate, и ей, заметьте, передаются некие флаги, с которыми мы разберемся чуть позже.
Интерфейс включает еще одну обязательную функцию: ODCIAggregateMerge. Мы ее определим — куда ж деваться, — но разговор о ней пока отложим.
Теперь создадим тело объекта с реализацией перечисленных методов.
CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
END;
/
Реализация, по большей части, повторяет все то, что мы делали для PostgreSQL, но в немного другом синтаксисе.
Trim-пляски вокруг возвращаемого значения не нужны: Oracle самостоятельно отрезает незначащие нули при выводе значения.
Обратите внимание, что все функции возвращают признак успешности выполнения (значение ODCIConst.Success), а смысловые значения передаются через параметры OUT и IN OUT (которые в PL/SQL никак не связаны с собственно возвращаемым значением, как в PL/pgSQL). В частности, любая функция, в том числе и ODCIAggregateTerminate, может изменять атрибуты своего объекта, ссылка на который передается ей в первом параметре (self).
Определение агрегатной функции выглядит следующим образом:
CREATE OR REPLACE FUNCTION average(val number) RETURN number
AGGREGATE USING AverageImpl;
/
Проверяем. Для генерации значений используем идиоматическую конструкцию с рекурсивным запросом CONNECT BY level:
SELECT average(level) FROM dual CONNECT BY level <= 5;
AVERAGE(LEVEL)
--------------
3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:0
Можно обратить внимание на то, что вывод сообщений в PostgreSQL появляется до результата, а в Oracle — после. Это из-за того, что RAISE NOTICE работает асинхронно, а пакет dbms_output буферизует вывод.
Как мы видим, в функцию ODCIAggregateTerminate был передан нулевой флаг. Это означает, что контекст больше не требуется и его — при желании — можно забыть.
И проверка на пустом множестве:
SELECT average(rownum) FROM dual WHERE 1 = 0;
AVERAGE(ROWNUM)
---------------
= 0(0) flags:0
Оконные функции: OVER()
Хорошая новость: написанная нами агрегатная функция может без всяких изменений работать и как оконная (аналитическая).
Оконная функция отличается от агрегатной тем, что не сворачивает выборку в одну (агрегированную) строку, а вычисляется как бы отдельно для каждой строки. Синтаксически вызов оконной функции отличается наличием конструкции OVER с указанием рамки, которая определяет множество строк для обработки. В простейшем случае она так и записывается: OVER(), и это означает, что функция должна обработать все строки. Результат получается такой, как будто мы посчитали обычную агрегатную функцию и записали результат (один и тот же) напротив каждой строки выборки.
Иными словами, рамка статична и охватывает все строки:
1. 2. 3. 4. 5. +---+ +---+ +---+ +---+ +---+ | 1 | | 1 | | 1 | | 1 | | 1 | | 2 | | 2 | | 2 | | 2 | | 2 | | 3 | | 3 | | 3 | | 3 | | 3 | | 4 | | 4 | | 4 | | 4 | | 4 | | 5 | | 5 | | 5 | | 5 | | 5 | +---+ +---+ +---+ +---+ +---+
PostgreSQL
Попробуем:
SELECT g.x, average(g.x) OVER ()
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: 3(2) + 3
NOTICE: 6(3) + 4
NOTICE: 10(4) + 5
NOTICE: = 15(5)
x | average
---+---------
1 | 3
2 | 3
3 | 3
4 | 3
5 | 3
(5 rows)
По выводу NOTICE видно, что все происходит точно так же, как и ранее при вычислении обычной агрегатной функции. Получив результат от функции average_final, PostgreSQL проставляет его в каждой строке.
Oracle
SELECT average(level) OVER() average
FROM dual CONNECT BY level <= 5;
LEVEL AVERAGE
---------- -----------
1 3
2 3
3 3
4 3
5 3
0(0) + 1
1(1) + 2
3(2) + 3
6(3) + 4
10(4) + 5
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:1
= 15(5) flags:0
Неожиданно. Вместо того, чтобы вычислить результат один раз, Oracle вызывает функцию ODCIAggregateTerminate N+1 раз: сначала для каждой строки с флагом 1 (что означает, что контекст еще пригодится) и затем еще один раз в конце. Значение, полученное при последнем вызове, просто игнорируется.
Вывод такой: если в функции ODCIAggregateTerminate используется вычислительно сложная логика, надо подумать о том, чтобы не делать одну и ту же работу несколько раз.
Оконные функции: OVER(PARTITION BY)
Предложение PARTITION BY в определении рамки похоже на обычную агрегатную конструкцию GROUP BY. Оконная функция с указанием PARTITION BY вычисляется отдельно для каждой группы строк, и результат приписывается к каждой строке выборки.
В таком варианте рамка тоже статична, но для каждой группы она разная. Например, если определены две группы строк (с первой по вторую и с третьей по пятую), то рамку можно представить себе так:
1. 2. 3. 4. 5. +---+ +---+ | 1 | | 1 | | 2 | | 2 | +---+ +---+ +---+ +---+ +---+ | 3 | | 3 | | 3 | | 4 | | 4 | | 4 | | 5 | | 5 | | 5 | +---+ +---+ +---+
PostgreSQL
SELECT g.x/3 part,
g.x,
average(g.x) OVER (PARTITION BY g.x/3)
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 0(0) + 3
NOTICE: 3(1) + 4
NOTICE: 7(2) + 5
NOTICE: = 12(3)
part | x | average
------+---+---------
0 | 1 | 1.5
0 | 2 | 1.5
1 | 3 | 4
1 | 4 | 4
1 | 5 | 4
(5 rows)
Вычисление снова происходит последовательно, но теперь при переходе к другой группе строк состояние сбрасывается в начальное значение (initcond).
Oracle
SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3)) average
FROM dual CONNECT BY level <= 5;
PART LEVEL AVERAGE
---------- ---------- ----------
0 2 1.5
0 1 1.5
1 4 4
1 5 4
1 3 4
0(0) + 2
2(1) + 1
= 3(2) flags:1
= 3(2) flags:1
0(0) + 4
4(1) + 5
9(2) + 3
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:1
= 12(3) flags:0
Занятно, что Oracle решил переставить строки местами. Это может что-то сказать о деталях реализации, но в любом случае — имеет право.
Оконные функции: OVER(ORDER BY)
Если в определение рамки добавить предложение ORDER BY, указывающее порядок сортировки, функция начнет работать в режиме нарастания (для функции sum мы бы так и сказали — нарастающим итогом).
Для первой строки рамка будет состоять из одной этой строки; для второй — из первой и второй; для третьей — из первой, второй и третьей и так далее. Иными словами, в рамку будут входить строки с первой до текущей.
На самом деле, это можно ровно так и записать: OVER(ORDER BY… ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), но, поскольку это многословие подразумеваются по умолчанию, его обычно опускают.
Итак, рамка перестает быть статичной: ее голова движется вниз, а хвост остается на месте:
1. 2. 3. 4. 5. +---+ +---+ +---+ +---+ +---+ | 1 | | 1 | | 1 | | 1 | | 1 | +---+ | 2 | | 2 | | 2 | | 2 | +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+
PostgreSQL
SELECT g.x, average(g.x) OVER (ORDER BY g.x)
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 6(3) + 4
NOTICE: = 10(4)
NOTICE: 10(4) + 5
NOTICE: = 15(5)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 2.5
5 | 3
(5 rows)
Как видим, строки все так же добавляются к контексту по одной, но теперь функция average_final вызывается после каждого добавления, выдавая промежуточный итог.
Oracle
SELECT level, average(level) OVER(ORDER BY level) average
FROM dual CONNECT BY level <= 5;
LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 2.5
5 3
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) + 4
= 10(4) flags:1
10(4) + 5
= 15(5) flags:1
= 15(5) flags:0
На этот раз обе системы работают одинаково.
Оконные функции: OVER(PARTITION BY ORDER BY)
Предложения PARTITION BY и ORDER BY можно комбинировать. Тогда внутри каждой группы строк функция будет работать в режиме нарастания, а при переходе от группы к группе состояние будет сбрасываться в начальное.
1. 2. 3. 4. 5. +---+ +---+ | 1 | | 1 | +---+ | 2 | +---+ +---+ +---+ +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+
PostgreSQL
SELECT g.x/3 part,
g.x,
average(g.x) OVER (PARTITION BY g.x/3 ORDER BY g.x)
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 0(0) + 3
NOTICE: = 3(1)
NOTICE: 3(1) + 4
NOTICE: = 7(2)
NOTICE: 7(2) + 5
NOTICE: = 12(3)
part | x | average
------+---+---------
0 | 1 | 1
0 | 2 | 1.5
1 | 3 | 3
1 | 4 | 3.5
1 | 5 | 4
(5 rows)
Oracle
SELECT trunc(level/3) part,
level,
average(level) OVER(PARTITION BY trunc(level/3) ORDER BY level) average
FROM dual CONNECT BY level <= 5;
PART LEVEL AVERAGE
---------- ---------- ----------
0 1 1
0 2 1.5
1 3 3
1 4 3.5
1 5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
0(0) + 3
= 3(1) flags:1
3(1) + 4
= 7(2) flags:1
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0
Оконные функции со скользящей рамкой
Во всех примерах, которые мы посмотрели, рамка либо была статической, либо двигалась только ее голова (при использовании предложения ORDER BY). Это давало нам возможность вычислять состояние последовательно, добавляя к контексту строку за строкой.
Но рамку оконной функции можно задать и таким образом, что ее хвост тоже будет смещаться. В нашем примере это будет соответствовать понятию скользящего среднего. Например, указание OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) говорит о том, что для каждой строки результата будут усредняться текущее и два предыдущих значений.
1. 2. 3. 4. 5. +---+ | | +---+ | | | | +---+ | 1 | | 1 | | 1 | +---+ +---+ | 2 | | 2 | | 2 | +---+ +---+ | 3 | | 3 | | 3 | +---+ | 4 | | 4 | +---+ | 5 | +---+
Сможет ли вычисляться оконная функция в таком случае? Оказывается, сможет, правда неэффективно. Но, написав еще немного кода, можно улучшить ситуацию.
PostgreSQL
Посмотрим:
SELECT g.x,
average(g.x) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 0(0) + 2
NOTICE: 2(1) + 3
NOTICE: 5(2) + 4
NOTICE: = 9(3)
NOTICE: 0(0) + 3
NOTICE: 3(1) + 4
NOTICE: 7(2) + 5
NOTICE: = 12(3)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 3
5 | 4
(5 rows)
Вплоть до третьей строки все идет хорошо, потому что хвост фактически не двигается: мы просто добавляем к уже имеющемуся контексту очередное значение. Но, поскольку мы не умеем убирать значение из контекста, для четвертой и пятой строк все приходится пересчитывать полностью, каждый раз возвращаясь к начальному состоянию.
Итак, было бы здорово иметь не только функцию добавления очередного значения, но и функцию удаления значения из состояния. И действительно, такую функцию можно создать:
CREATE OR REPLACE FUNCTION average_inverse(state average_state, val numeric)
RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) - %', state.accum, state.qty, val;
RETURN ROW(state.accum-val, state.qty-1)::average_state;
END;
$$ LANGUAGE plpgsql;
Чтобы оконная функция смогла ей воспользоваться, нужно пересоздать агрегат следующим образом:
DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
-- обычный вариант
sfunc = average_transition,
stype = average_state,
finalfunc = average_final,
initcond = '(0,0)',
-- вариант с “обратной” функцией
msfunc = average_transition,
minvfunc = average_inverse,
mstype = average_state,
mfinalfunc = average_final,
minitcond = '(0,0)'
);
Проверим:
SELECT g.x,
average(g.x) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM generate_series(1,5) as g(x);
NOTICE: 0(0) + 1
NOTICE: = 1(1)
NOTICE: 1(1) + 2
NOTICE: = 3(2)
NOTICE: 3(2) + 3
NOTICE: = 6(3)
NOTICE: 6(3) - 1
NOTICE: 5(2) + 4
NOTICE: = 9(3)
NOTICE: 9(3) - 2
NOTICE: 7(2) + 5
NOTICE: = 12(3)
x | average
---+---------
1 | 1
2 | 1.5
3 | 2
4 | 3
5 | 4
(5 rows)
Вот теперь все в порядке: для четвертой и пятой строк мы удаляем из состояния хвостовое значение и добавляем новое.
Oracle
Тут ситуация аналогична. Созданный вариант аналитической функции работает, но неэффективно:
SELECT level,
average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;
LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 3
5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
0(0) + 2
2(1) + 3
5(2) + 4
= 9(3) flags:1
0(0) + 3
3(1) + 4
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0
Функция удаления значения из контекста определяется следующим образом:
MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;
CREATE OR REPLACE TYPE AverageImpl AS OBJECT(
accum number,
qty number,
STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl) RETURN number,
MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number) RETURN number,
MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number) RETURN number
);
/
CREATE OR REPLACE TYPE BODY AverageImpl IS
STATIC FUNCTION ODCIAggregateInitialize(actx IN OUT AverageImpl)
RETURN number IS
BEGIN
actx := AverageImpl(0,0);
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateIterate(self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') + '||val);
self.accum := self.accum + val;
self.qty := self.qty + 1;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateMerge(self IN OUT AverageImpl, ctx2 IN AverageImpl)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') & '||ctx2.accum||'('||ctx2.qty||')');
self.accum := self.accum + ctx2.accum;
self.qty := self.qty + ctx2.qty;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateTerminate(self IN OUT AverageImpl, returnValue OUT number, flags IN number)
RETURN number IS
BEGIN
dbms_output.put_line('= '||self.accum||'('||self.qty||') flags:'||flags);
returnValue := CASE WHEN self.qty > 0 THEN self.accum / self.qty END;
RETURN ODCIConst.Success;
END;
MEMBER FUNCTION ODCIAggregateDelete(self IN OUT AverageImpl, val IN number)
RETURN number IS
BEGIN
dbms_output.put_line(self.accum||'('||self.qty||') - '||val);
self.accum := self.accum - val;
self.qty := self.qty - 1;
RETURN ODCIConst.Success;
END;
END;
/
Пересоздавать саму функцию не нужно. Проверим:
SELECT level,
average(level) OVER(ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) average
FROM dual CONNECT BY level <= 5;
LEVEL AVERAGE
---------- ----------
1 1
2 1.5
3 2
4 3
5 4
0(0) + 1
= 1(1) flags:1
1(1) + 2
= 3(2) flags:1
3(2) + 3
= 6(3) flags:1
6(3) - 1
5(2) + 4
= 9(3) flags:1
9(3) - 2
7(2) + 5
= 12(3) flags:1
= 12(3) flags:0
Параллельность
И PostgreSQL, и Oracle (Enterprise Edition) умеют вычислять агрегатные функции в параллельном режиме. При этом каждый из параллельных процессов выполняет свою часть работы, формируя промежуточное состояние. Затем основной процесс-координатор получает эти несколько состояний и должен объединить их в одно итоговое.
Для этого нужна еще одна функция объединения, которую мы сейчас и напишем. В нашем случае она просто складывает и суммы, и количество значений.
PostgreSQL
Функция выглядит следующим образом:
CREATE OR REPLACE FUNCTION average_combine(state1 average_state, state2 average_state)
RETURNS average_state AS $$
BEGIN
RAISE NOTICE '%(%) & %(%)', state1.accum, state1.qty, state2.accum, state2.qty;
RETURN ROW(state1.accum+state2.accum, state1.qty+state2.qty)::average_state;
END;
$$ LANGUAGE plpgsql;
Еще мы уберем наш отладочный вывод из функции average_transition. При параллельном выполнении мы будем суммировать не пять значений, а больше, так что если этого не сделать, мы получим слишком много бесполезной информации.
Поскольку мы убираем вывод, то отпадает и необходимость использовать процедурный язык — напишем функцию на чистом SQL:
CREATE OR REPLACE FUNCTION average_transition(state average_state, val numeric)
RETURNS average_state AS $$
SELECT ROW(state.accum+val, state.qty+1)::average_state;
$$ LANGUAGE sql;
Осталось пересоздать агрегат с учетом новой функции и указать, что его можно безопасно использовать в параллельном режиме:
DROP AGGREGATE average(numeric);
CREATE AGGREGATE average(numeric) (
-- обычный вариант
sfunc = average_transition,
stype = average_state,
finalfunc = average_final,
combinefunc = average_combine,
initcond = '(0,0)',
-- вариант с “обратной” функцией
msfunc = average_transition,
minvfunc = average_inverse,
mstype = average_state,
mfinalfunc = average_final,
minitcond = '(0,0)',
-- параллельность
parallel = safe
);
Теперь создадим таблицу и заполним ее данными. Тысячи строк будет достаточно.
CREATE TABLE t(n) AS SELECT generate_series(1,1000)::numeric;
С настройками по умолчанию PostgreSQL не построит параллельный план для такой таблицы — слишком она мала, — но его несложно уговорить:
SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;
EXPLAIN(costs off) SELECT average(n) FROM t;
QUERY PLAN
------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 2
-> Partial Aggregate
-> Parallel Seq Scan on t
В плане запроса видим:
- два запланированных рабочих процесса, выполняющих частичное агрегирование (Partial Aggregate),
- узел Gather, собирающий информацию,
- и итоговое объединение состояний (Finalize Aggregate).
Проверим:
SELECT average(n) FROM t;
NOTICE: 0(0) & 281257(678)
NOTICE: 281257(678) & 127803(226)
NOTICE: 409060(904) & 91440(96)
NOTICE: = 500500(1000)
average
---------
500.5
(1 row)
Почему функция average_combine вызывается три раза, а не два? Дело в том, что в PostgreSQL координирующий процесс тоже выполняет часть работы. Поэтому, хотя было запущено два рабочих процесса, реально работа выполнялась в трех. Один из них успел обработать 678 строк, другой 226 и третий — 96 (хотя эти цифры ничего не значат и при другом запуске могут отличаться).
Oracle
Если помните, функцию ODCIAggregateMerge мы уже написали в самом начале, поскольку в Oracle она является обязательной. Документация настаивает, что эта функция необходима не только для параллельной работы, но и для последовательной — хотя мне трудно понять, зачем (и на практике не приходилось сталкиваться с ее выполнением при последовательной обработке).
Все, что остается сделать — объявить функцию безопасной для параллельной работы:
CREATE OR REPLACE FUNCTION average(val number) RETURN number
PARALLEL_ENABLE
AGGREGATE USING AverageImpl;
/
Создаем таблицу:
CREATE TABLE t(n) AS SELECT to_number(level) FROM dual CONNECT BY level <= 1000;
Уговорить Oracle еще проще, чем PostgreSQL — достаточно написать хинт. Вот какой получается план (вывод сильно урезан для простоты):
EXPLAIN PLAN FOR SELECT /*+ PARALLEL(2) */ average(n) FROM t;
SELECT * FROM TABLE(dbms_xplan.display);
---------------------------------
| Id | Operation |
---------------------------------
| 0 | SELECT STATEMENT |
| 1 | SORT AGGREGATE |
| 2 | PX COORDINATOR |
| 3 | PX SEND QC (RANDOM) |
| 4 | SORT AGGREGATE |
| 5 | PX BLOCK ITERATOR |
| 6 | TABLE ACCESS FULL |
---------------------------------
План также содержит:
- частичную агрегацию (4),
- координатора, получающего частичные контексты (2),
- и итоговое объединение контекстов (1).
SELECT /*+ PARALLEL(2) */ average(n) FROM t;
AVERAGE(N)
----------
500.5
0(0) & 216153(657)
216153(657) & 284347(343)
= 500500(1000) flags:0
В Oracle координатор не участвует в частичной агрегации. Поэтому объединяются только два контекста и по этой же причине мы видим только вывод функции ODCIAggregateMerge.
Документация
Самое время привести ссылки на документацию, в том числе и на агрегатные и оконные функции, уже включенным в СУБД. Там можно найти много интересного.
PostgreSQL:
- Пользовательские агрегатные функции
- CREATE AGGREGATE
- Стандартные агрегатные и оконные функции
Oracle:
- Пользовательские агрегатные функции
- Интерфейс агрегатных функций
- Стандартные агрегатные и аналитические функции
Пример про округление копеек
И обещанный пример из жизни. Эту функцию я придумал, когда приходилось писать отчеты для бухгалтерии, работающей по РСБУ (правилам российского бухучета).
Самая простая задача, в которой возникает необходимость округления — распределение общих расходов (скажем, 100 рублей) на отделы (скажем, 3 штуки) по какому-то принципу (скажем, поровну):
WITH depts(name) AS (
VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round(amount,2) FROM report;
dept | round
------+-------
A | 33.33
B | 33.33
C | 33.33
(3 rows)
Этот запрос показывает проблему: суммы надо округлять, но при этом теряется копейка. А РСБУ этого не прощает.
Задачу можно решать по-разному, но на мой вкус наиболее элегантный способ — оконная функция, которая работает в нарастающем режиме и берет всю борьбу с копейками на себя:
WITH depts(name) AS (
VALUES ('A'), ('B'), ('C')
), report(dept,amount) AS (
SELECT name, 100.00 / count(*) OVER() FROM depts
)
SELECT dept, round2(amount) OVER (ORDER BY dept) FROM report;
dept | round2
------+--------
A | 33.33
B | 33.34
C | 33.33
(3 rows)
Состояние такой функции включает ошибку округления (r_error) и текущее округленное значение (amount). Функция обработки очередного значения увеличивает ошибку округления, и, если она уже превышает полкопейки, добавляет к округленной сумме копеечку:
state.r_error := state.r_error + val - round(val,2);
state.amount := round(val,2) + round(state.r_error,2);
state.r_error := state.r_error - round(state.r_error,2);
А функция, выдающая результат, просто возвращает уже готовый state.amount.
Полный код функции приводить не буду: используя уже приведенные примеры написать ее не представляет сложности.
Если вам встречались интересные примеры использования собственных агрегатных или оконных функций — поделитесь ими в комментариях.
Комментарии (17)
zoroda
14.03.2018 06:15Буквально на прошлой неделе решал задачу корректного отображения, например, доли от итоговой суммы. Решил делать через агрегатные функции.
Код-- Объявление агрегатной функции CREATE AGGREGATE calc.percent(numeric[]) ( SFUNC=calc.part_add, STYPE=numeric[], FINALFUNC="calc.part_divide_percent", INITCOND='{0, 0}' ); -- Обработка очередной строки CREATE OR REPLACE FUNCTION calc.part_add( numeric[], numeric[]) RETURNS numeric[] AS $BODY$ select array[$1[1] + coalesce($2[1],0), $1[2] + coalesce($2[2],0)] $BODY$ LANGUAGE sql VOLATILE COST 100; -- Выдача результата CREATE OR REPLACE FUNCTION calc.part_divide_percent(numeric[]) RETURNS numeric AS $BODY$ select case when not coalesce($1[2],0) = 0 then $1[1] / $1[2] * 100 else null end $BODY$ LANGUAGE sql VOLATILE COST 100;
zoroda
14.03.2018 06:40Точнее, в этом примере считается итоговый процент отношения одного показателя к другому.
erogov Автор
14.03.2018 12:37Ага, можно так. Мне, правда, не очень нравится этот трюк с массивом: экономим на создании типа (а зачем?), но ведь код получается непонятный.
Кстати, как вариант, можно было бы обойтись обычной суммой:sum(...) / nullif(sum(...), 0)
.
А для отношения доли к сумме в Oracle даже есть стандартная функция
ratio_to_report
.zoroda
15.03.2018 05:54Поясню. Трюк с массивом и использование самописной агрегатной функции нужны были для того, чтобы сделать интерфейс администратора для настройки этого приложения. Таким образом, в админке можно написать формулу расчёта показателя:
=percent(PCurrent, PTotal)
Эта формула легко регулярными выражениями приводится к виду
calc.percent(array[indicators->>'PCurrent', indicators->>'PTotal'])
, который подставляется в текст создаваемого представления.
Так как подобных функций набирается как минимум с десяток и в них может быть более 2 параметров (присутствуют более сложные методики расчёта), решил не создавать типы для каждой из функций (впрочем, теперь это моё решение не кажется таким однозначным).
В результате:
- даём пользователю возможность ввести человекопонятную формулу без необходимости проверки на 0/null
- можем проверить её корректность на этапе при сохранении
- разрешаем использовать только те агрегатные функции, которые доступны в схеме calc
erogov Автор
15.03.2018 11:54Идея понятная, цель благая.
В плане «поворчать» могу обратить внимание на пару моментов.
Во-первых, все абстракции текут. Например, стоит ошибиться с названием параметра и получим странное поведение (молчаливый null вместо сообщения об ошибке) — если, конечно, это не было предусмотрено в дополнение к регулярке.
Ну и во-вторых, с отдельными типами эта конструкция работала бы ничуть не сложнее и не хуже (:
ruslan_shv
14.03.2018 11:25как потом дебажить эти функции?
erogov Автор
14.03.2018 12:40А какие сложности? Отдельные функции же можно вызывать самостоятельно, вне агрегата. Так что при необходимости для них легко написать любые тесты.
asmm
14.03.2018 13:19Заметил особенность реализации аналитических функций в Oracle. Если в OVER() используем ORDER BY, то в ODCIAggregateTerminate зайдёт только при смене значения в ORDER BY. Надо об этом помнить, а то можно получить совсем неожиданные результаты.
Пример:
Функция GROUP_NUMBER — назначает номер группы при смене значения
GROUP_NUMBERCREATE OR REPLACE TYPE GROUP_NUMBER_IMPL as OBJECT ( PREV_VAL VARCHAR2(100) , CUR_GROUP_NUMBER NUMBER , STATIC FUNCTION ODCIAggregateInitialize(SCTX IN OUT GROUP_NUMBER_IMPL) RETURN NUMBER , MEMBER FUNCTION ODCIAggregateIterate(SELF IN OUT GROUP_NUMBER_IMPL, VALUE IN VARCHAR2) RETURN NUMBER , MEMBER FUNCTION ODCIAggregateTerminate(SELF IN GROUP_NUMBER_IMPL, RETURN_VAL OUT NUMBER, FLAGS IN NUMBER) RETURN NUMBER , MEMBER FUNCTION ODCIAggregateMerge(self IN OUT GROUP_NUMBER_IMPL, CTX2 IN GROUP_NUMBER_IMPL) RETURN NUMBER ); / create or replace type body GROUP_NUMBER_IMPL IS static FUNCTION ODCIAggregateInitialize(SCTX IN OUT GROUP_NUMBER_IMPL) RETURN NUMBER is begin SCTX := GROUP_NUMBER_IMPL('', 0); return ODCIConst.Success; end; MEMBER FUNCTION ODCIAggregateIterate(SELF IN OUT GROUP_NUMBER_IMPL, VALUE IN VARCHAR2) RETURN NUMBER is begin if NVL(value, 'ЪъЪ') != NVL(self.PREV_VAL, 'ЪъЪ') OR self.CUR_GROUP_NUMBER = 0 THEN self.CUR_GROUP_NUMBER := self.CUR_GROUP_NUMBER + 1; self.PREV_VAL := value; end if; return ODCIConst.Success; end; MEMBER FUNCTION ODCIAggregateTerminate(SELF IN GROUP_NUMBER_IMPL, RETURN_VAL OUT NUMBER, FLAGS IN NUMBER) RETURN NUMBER is begin RETURN_VAL := self.CUR_GROUP_NUMBER; return ODCIConst.Success; end; MEMBER FUNCTION ODCIAggregateMerge(self IN OUT GROUP_NUMBER_IMPL, CTX2 IN GROUP_NUMBER_IMPL) RETURN NUMBER is begin self.CUR_GROUP_NUMBER := self.CUR_GROUP_NUMBER + CTX2.CUR_GROUP_NUMBER; end; end; / CREATE OR REPLACE FUNCTION GROUP_NUMBER(INPUT VARCHAR2) RETURN NUMBER PARALLEL_ENABLE AGGREGATE USING GROUP_NUMBER_IMPL; /
asmm
14.03.2018 13:27Копипаст подвёл. Первый запрос должен выглядеть так. Т.е. без ROWNUM в ORDER BY
SELECT GROUP_NUMBER(O.OBJECT_TYPE) OVER (ORDER BY O.LAST_DDL_TIME) ГР , O.OBJECT_TYPE , TO_CHAR(O.LAST_DDL_TIME, 'DD.MM.YYYY HH24:MI:SS') LAST_DDL_TIME FROM ( SELECT O.* FROM ALL_OBJECTS O WHERE ROWNUM < 20 ) O ;
erogov Автор
14.03.2018 14:35Есть такое дело, тоже натыкался. Спасибо, что напомнили, это важный момент.
И PostgreSQL себя точно так же ведет, кстати.
asmm
14.03.2018 14:05Если помните, функцию ODCIAggregateMerge мы уже написали в самом начале, поскольку в Oracle она является обязательной. Документация настаивает, что эта функция необходима не только для параллельной работы, но и для последовательной — хотя мне трудно понять, зачем (и на практике не приходилось сталкиваться с ее выполнением при последовательной обработке).
ODCIAggregateMerge используется для GROUP BY ROLLUP
The ODCIAggregateMerge() interface is invoked to compute super aggregate values in such roll-up operations.
erogov Автор
14.03.2018 14:49Вот оно что, оказывается! Действительно, это логично. Спасибо, теперь буду знать.
PostgreSQL так пока не умеет, он по нескольку раз считает.
little-brother
Спасибо за статью.
Еще бы пример как с производительностью у пользовательских функций, хотя бы в сравнении с какой либо стандартной.
erogov Автор
Если сравнивать свою функцию, написанную на PL/SQL (тем более на PL/pgSQL) со стандартной, то все будет довольно печально. Так что если есть стандартная функция, то изобретать велосипед не надо (:
Если написать свою на C в PostgreSQL, то будет ровно одно и то же.
А вот если написать свою в Oracle на чем-нибудь компилируемом, то не знаю, не пробовал. Предполагаю, что будет чуть проигрывать.
RPG18
Вроде же можно PL/V8 на случай математики с их JIT? Ваш коллега Иван Панченко на PGDay 15 и PGConf 18 да же бенчмарки давал.
erogov Автор
Можно и так, там все равно, на чем функции написаны. Но надо пробовать, конечно, потому что от подводных граблей никто не застрахован.