Вас когда-либо просили посчитать количество чего-то на основании данных в бд за последний месяц, сгруппировав результат по каким-то значениям и разбив всё это ещё по дням/часам?
Если да — то вы уже представляете, что вам придётся написать что-то вроде такого, только хуже

SELECT hour(datetime), somename, count(*), sum(somemetric)
from table
where datetime > :monthAgo
group by 1, 2
order by 1 desc, 2

Время от времени самые разнообразные подобные запросы начинают появляться, и если один раз стерпишь и поможешь — увы, обращения будут поступать и в будущем.

А плохи такие запросы тем, что хорошо отнимают ресурсы системы на время выполнения, да и данных может быть так много, что даже реплику для таких запросов будет жаль (и своего времени).

А что если я скажу, что прямо в PostgreSQL можно создать вьюху, которая на лету будет учитывать только новые поступающие данные в прямо подобном запросе, как выше?

Так вот — это умеет делать расширение PipelineDB

Демо с их сайта, как это работает


Ранее PipelineDB был отдельным проектом, но теперь доступен как расширение для PG 10.1 и выше.

И хотя предоставляемые возможности уже давно существуют в иных продуктах, специально созданных для сбора метрик в режиме реального времени — у PipelineDB есть существенный плюс: меньший порог вхождения для разработчиков, которые уже умеют SQL).

Возможно для кого-то это несущественно. Лично я не ленюсь пробовать всё, что кажется подходящим для решения той или иной задачи, но и не двину тут же юзать одно новое решение на все случаи. Потому в этой заметке я не призываю тут же бросать всё и устанавливать PipelineDB, это лишь обзор основного функционала, т.к. штука мне показалась любопытной.

И так, в целом документация у них хорошая, но я хочу поделиться опытом, как это дело попробовать на практике и вывести результаты в Grafana.

Чтобы не захламлять локальную машину всё разворачиваю в докере.
Используемые образы: postgres:latest, grafana/grafana

Установка PipelineDB на Postgres


На машине с postgres выполнить последовательно:

  1. apt update
  2. apt install curl
  3. curl -s http://download.pipelinedb.com/apt.sh | bash
  4. apt install pipelinedb-postgresql-11
  5. cd /var/lib/postgresql/data
  6. Открыть в любом редакторе файл postgresql.conf
  7. Найти ключ shared_preload_libraries, расскоментить и установить значение pipelinedb
  8. Ключ max_worker_processes установить в значение 128 (рекомендация доки)
  9. Ребутнуть сервер

Создание потока и вьюх в PipelineDB


После ребута пг - наблюдай логи, чтобы там было такое


  1. Бд в которой будем работать: CREATE DATABASE testpipe;
  2. Создание расширения: CREATE EXTENSION pipelinedb;
  3. Теперь самое интересное — создание стрима. Именно в него необходимо добавлять данные для дальнейшей обработки:

    CREATE FOREIGN TABLE flow_stream (
        dtmsk timestamp without time zone,
        action text,
        duration smallint
    ) SERVER pipelinedb;

    По сути очень похоже на создание обыкновенной таблицы, только получить данные из этого стрима простым select нельзя — нужна вьюха
  4. собственно как её создать:

    CREATE VIEW viewflow WITH (ttl = '3 month', ttl_column = 'm') AS
    select minute(dtmsk) m,
           action, 
           count(*), 
           avg(duration)::smallint,
           min(duration),
           max(duration)
    from flow_stream
    group by 1, 2;

    Называются они Continuous Views и по дефолту materialize, т.е. с сохранением состояния.

    В выражении WITH передаются дополнительные параметры.

    В моём случае ttl = '3 month' говорит о том, что хранить нужно данные только за последние 3 месяца, а брать дату/время из колонки M. Фоновый процесс reaper ищет устаревшие данные и удаляет их.

    Для тех, кто не в курсе — функция minute возвращает дату/время без секунд. Таким образом все события, произошедшие в одну минуту будут иметь одно и то же время в результате агрегации.
  5. Такая вьюха — практически таблица, потому индекс по дате для выборки будет полезным, если данных будет храниться много

    create index on viewflow (m desc, action);

Использование PipelineDB


Помни: вставлять данные в стрим, а читать — из подписавшихся на него вьюх

insert into flow_stream VALUES (now(), 'act1', 21);
insert into flow_stream VALUES (now(), 'act2', 33);
select * from viewflow order by m desc, action limit 4;
select now()

Выполняю запрос вручную

Сначала наблюдаю, как меняются данные в 46-ю минуту
Как только наступает 47-я — предыдущая прекращает обновляться и начинает тикать текущая минута.

Если обратить внимание на план запроса, то можно увидеть оригинальную таблицу с данными



Рекомендую сходить в неё и узнать, как на самом деле хранятся ваши данные

Генератор событий на C#
using Npgsql;
using System;
using System.Threading;

namespace PipelineDbLogGenerator
{
    class Program
    {
        private static Random _rnd = new Random();
        private static string[] _actions = new string[] { "foo", "bar", "yep", "goal", "ano" };

        static void Main(string[] args)
        {
            var connString = "Host=localhost;port=5432;Username=postgres;Database=testpipe";

            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();

                while (true)
                {
                    var dt = DateTime.UtcNow;

                    using (var cmd = new NpgsqlCommand())
                    {
                        var act = GetAction();

                        cmd.Connection = conn;
                        cmd.CommandText = "INSERT INTO flow_stream VALUES (@dtmsk, @action, @duration)";
                        cmd.Parameters.AddWithValue("dtmsk", dt);
                        cmd.Parameters.AddWithValue("action", act);
                        cmd.Parameters.AddWithValue("duration", GetDuration(act));

                        var res = cmd.ExecuteNonQuery();
                        Console.WriteLine($"{res} {dt}");
                    }

                    Thread.Sleep(_rnd.Next(50, 230));
                }
            }
        }

        private static int GetDuration(string act)
        {
            var c = 0;

            for (int i = 0; i < act.Length; i++)
            {
                c += act[i];
            }

            return _rnd.Next(c);
        }

        private static string GetAction()
        {
            return _actions[_rnd.Next(_actions.Length)];
        }
    }
}


Вывод в Grafana


Для получения данных из postgres нужно добавить соответствующий источник данных:



Создать новую дашборду и добавить на неё панель типа Graph, а после нужно перейти в редактирование панели:



Далее — выбрать источник данных, переключиться в режим написания sql-запроса и ввести такое:

select 
  m as time, -- Grafana требует колонку time
  count, action
from viewflow
where $__timeFilter(m) -- макрос графаны, принимает на вход имя колонки, на выходе col between :startdate and :enddate
order by m desc, action;

И тут же получается нормальный график, конечно, если вы запустили генератор событий



FYI: наличие индекса может оказаться очень важным. Хотя его использование зависит от объёма получившейся таблицы. Если вы планируете хранить небольшое количество строк за небольшое количество времени, то очень легко может оказаться, что seq scan будет дешевле, а индекс лишь добавит доп. нагрузку при обновлении значений

На один стрим может быть подписано несколько вьюх.

Допустим я хочу видеть сколько выполняются методы апи в разрезе по процентилям

CREATE VIEW viewflow_per WITH (ttl = '3 d', ttl_column = 'm') AS
select minute(dtmsk) m,
      action, 
      percentile_cont(0.50) WITHIN GROUP (ORDER BY duration)::smallint p50,
      percentile_cont(0.95) WITHIN GROUP (ORDER BY duration)::smallint p95,
      percentile_cont(0.99) WITHIN GROUP (ORDER BY duration)::smallint p99
from flow_stream
group by 1, 2;

create index on viewflow_per (m desc);

Проделываю тот же трюк с графаной и получаю:


Итого


В целом штука рабочая, вела себя хорошо, без нареканий. Хотя под докером загрузка их демо базы в архиве (2.3 гб) оказалась малость долгим делом.

Хочу заметить — я не проводил нагрузочные тесты.

Официальная документация

Может быть интересным


Комментарии (24)


  1. DrAndyHunter
    10.12.2018 07:26

    Спасибо за статью, буквально в пятницу узнал об этом расширении для pg. Подскажите, а для 9.6 postgresql версии как быть? Расширения нет под 9.6?


    1. SanSYS Автор
      10.12.2018 07:51

      Кажется расширения под 9.6 нет, но я не сильно искал

      PipelineDB currently supports PostgreSQL versions 10.1, 10.2, 10.3, 10.4, 10.5, and 11.0
      Как быть — заюзать уже пг 10 )
      Даже 11-й зарелизился
      Ну или если решили, что нужно использовать PipelineDB, а хотите продолжать сидеть на 9.6, то всегда можно поднять ещё один свежий Postgres и там развлекаться


  1. SanSYS Автор
    10.12.2018 07:51

    del


  1. Tantrido
    10.12.2018 08:22

    Интересно: а это расширение может помочь, если данные будут в основном в JSONB поле?


    1. SanSYS Автор
      10.12.2018 08:59

      Типа того?

      select minute(eventtime) as time, 
             sum((data->>'value')::integer) as sum
      from table1
      

      Не вижу проблем с таким вариантом, хотя не пробовал

      Тут же суть в том, что расширение организует поточную обработку данных
      т.е.: приходят данные, возможно накапливаются в каком-то буфере, в фоне выполняется запрос из вьюхи над новыми данными, далее — результат вставляется в результирующую таблицу, либо — дополяет старые данные

      И чисто теоретически не сильно важно, какого типа данные гоняются от потока до вьюхи, потому что вы сами же с ними и работаете весьма прозрачным образом

      Например если у вас в jsonb хранится массив объектов и хотите именно его обрабатывать «в потоке», то можно сделать трансформацию разложив массив на строки и… работать как с обычной таблицей


      1. Tantrido
        10.12.2018 09:03

        А можно JSONB разложить в представление (view)?


        1. SanSYS Автор
          10.12.2018 10:19

          Определённо да, вечером скину пример


        1. Envek
          10.12.2018 17:10

          Один JSON-объект можно разложить в запись с помощью jsonb_to_record, а массив — с помощью jsonb_to_recordset. См. официальную документацию и мой пример.


        1. SanSYS Автор
          10.12.2018 18:03

          Как подсказали выше, вот так

          select * 
          from jsonb_to_recordset(
                  '[{"a":1,"b":"foo"},{"a":"2","c":"bar", "v":"2018-04-01"}]'::jsonb
          ) as x(a int, b text, v date, not_exists time);


          1. Tantrido
            11.12.2018 00:48

            Это простенький JSON, а если сложный иерархический, но структура более менее постоянна.


            1. Envek
              12.12.2018 19:12
              +1

              Можно вытаскивать нужный кусок JSONB и скармливать его это же функции, на ходу подправляя, если требуется.
              Вот, например, если у нас есть таблица listings, у которой есть jsonb-поле raw, где-то в недрах которого лежит нужный нам массив (а иногда не массив):


              SELECT id, v."SKU"
              FROM 
                listings, 
                jsonb_to_recordset(
                  CASE jsonb_typeof(raw->'Variations'->'Variation')
                  WHEN 'array' THEN raw->'Variations'->'Variation'
                  ELSE jsonb_build_array(raw->'Variations'->'Variation')
                  END
                ) AS v("SKU" varchar)
              WHERE raw IS NOT NULL 
                AND raw->'Variations'->'Variation' IS NOT NULL


      1. UncleAndy
        10.12.2018 11:10

        Т.е. эта система потокобезопасная и лишена дедлоков при наличии вставок через несколько подключений к БД?


        1. SanSYS Автор
          10.12.2018 18:21

          Не знаю
          Уверен, что при простом использовании стримов и вьюх — всё будет ок
          Но если использовать вызовы различных кастомных методов, то внутри можно написать любуую логику и да — монжо словить классический дедлок, всё как всегда
          У них в репе issues имеют метку deadlock, но они скорее не к экстеншену для postgres


  1. SergeyGershkovich
    10.12.2018 11:58

    А где в статье связь между запросом

    SELECT hour(datetime), somename, count(*), sum(somemetric)
    from table
    where datetime > :monthAgo
    group by 1, 2
    order by 1 desc, 2

    и стримом
    
    CREATE FOREIGN TABLE flow_stream (
        dtmsk timestamp without time zone,
        action text,
        duration smallint
    ) SERVER pipelinedb;

    Я так понимаю, в «table» нужно триггер или правило добавлять, чтобы заливать данные в «flow_stream»?


    1. VJean
      10.12.2018 12:17

      PipelineDB можно рассматривать как таблицы с интегрированными триггерами.


      1. SergeyGershkovich
        10.12.2018 13:00

        Мне очень понравился первый абзац:

        Вас когда-либо просили посчитать количество чего-то на основании данных в бд за последний месяц, сгруппировав результат по каким-то...

        Что если у меня уже есть Postgres DB со своими таблицами, к которым требуется писать множество агрегатных запросов. Я так понимаю, чтобы использовать PipelineDB, я должен переместить таблицы в PipelineDB или написать некие правила для отправки данных из своих таблиц в PipelineDB? — этих правил в статье я не увидел.


        1. VJean
          10.12.2018 15:48
          +1

          Достаточно детальный пример с примечаниями: Streaming Databases & PipelineDB: Likes Use Case

          В статье много чего не описано по сравнению с официальной документацией, в том числе и про докер, т.к. готовый образ уже собран.


          1. SergeyGershkovich
            10.12.2018 19:37

            Почитал документацию, для себя определил PipelineDB, как механизм агрегирующих счетчиков в режиме реального времени. Счетчик от потока отключил, агрегированные данные потерял.
            Идея интересная, но не для случая уже хранимых данных. Удобно для данных, которые только планируется сохранять, с учетом, что счетчики должны быть включены заранее.


  1. le1ic
    10.12.2018 14:06

    Вечный tradeoff — тормозить чтение, или тормозить запись. Аггрегация, как я понимаю, синхронная, классическая?


  1. Tatikoma
    10.12.2018 14:19

    curl -s http://download.pipelinedb.com/apt.sh | bash

    Очень сильно удивился увидев такую конструкцию. Открыл документацию pipelinedb и увидел там ровно такую же строчку. Выглядит очень грустно.


    1. Pinkerton42
      10.12.2018 16:27

      Угу. В нем безапеляционно мелькают строки «apt-get install -y ...» без всяких вопросов. За такое обычно по рукам бьют.


      1. Tatikoma
        10.12.2018 17:20

        Это проблемы разного уровня, если я ничего не путаю.
        apt-get install -y в худшем случае сломает вам систему, если у вас нет косячных репозиториев.
        curl -s http | bash — в худшем случае установит вирус, который спросит права на рута и вы ему их сами предоставите


        1. Pinkerton42
          10.12.2018 18:45

          Неважно какого уровня: это — проблема. И считаю, что да, за такое надо бить по рукам.


  1. neenik
    10.12.2018 14:58

    Поздно postgresql, поздно. OLAP+SQL=ClickHouse.
    У нас у самих до сир пор осталось пяток самописных агрегаций в постгресе. Характеристика одним словом — неудобно.