Очереди сообщений используются для выполнения: отложенных операций, взаимодействия сервисов между собой, «batch processing» и т.д. Для организации подобных очередей существуют специализированные решения, такие как: RabbitMQ, ActiveMQ, ZeroMQ и тд, но часто бывает, что в них нет большой необходимости, а их установка и поддержка причинит больше боли и страданий, чем принесет пользы. Допустим, у вас есть сервис, при регистрации в котором пользователю отправляется email для подтверждения, и, если вы используете Postgres, то вам повезло — в Postgres, почти из коробки, есть расширение PgQ, которое сделает всю грязную работу за вас.

В этой статье я расскажу об организации очередей сообщений (задач) в PostgreSQL с использованием расширения PgQ. Эта статья будет полезна, если вы еще не использовали PgQ или используете самописные очереди поверх Postgres.

Зачем вообще нужен PgQ, если можно просто создать табличку и записывать туда задачи? Казалось бы, можно, но вам придется учесть паралельный доступ к задачам, возможные ошибки (что будет, если процесс обрабатывающий задачу, упадет?), а также производительность (PgQ очень быстрый, а самописные решения, как правило, нет, особенно если транзакция в базе не закрывается во время всего выполнения задачи), но самое главное, почему на мой взгляд надо использовать PgQ, это то, что PgQ уже написан и работает, а самописное решение еще надо написать (UPD: про то, почему не стоит использовать самописные очереди, можно почитать, например, тут).
(UPD: т.к. PgQ работает поверх Postgres, все прелести транзакций можно использовать и в нем)

Но у PgQ есть один огромный минус — отсутствие документации, этот недостаток я и пытаюсь компенсировать этой статьей.

Устройство


PgQ состоит из частей (как минимум 2-х): 1 — расширение pgq для postgres, 2 — демон pgqd (об их установке чуть позже).

Все взаимодействие с очередью осуществляется с помощью функций внутри Postgres.

Например, чтобы создать очередь, надо выполнить

select * from pgq.create_queue({имя очереди} text);

После того как очередь создана, в нее можно добавлять сообщения

select * from pgq.insert_event({имя очереди} text, {тип события} text, {информация о событии} text);

Теперь надо научиться получать записанные сообщения. Для этого существует такая сущность, как «consumer» (я буду писать консьюмер), который получает не сообщения (события), а «бачи» (batch). Бач — это группа подряд идущих сообщений, бачи создаются с помощью pgqd. Периодически (параметр «ticker_period» в конфигурационном файле) pgqd берет все накопленные сообщения и записывает в новый бач. Важно, если pgqd не работает, то новые бачи не создаются, а значит, консьюмерам нечего читать, также, если pgqd долго не работал, а потом включился, то он создаст один большой бач из сообщений, накопленных за это время, поэтому pgqd не следует просто так отключать.

Регистрация консьюмера (Важно! консьюмер будет получать события, записанные только после его регистрации, поэтому следует вначале создать консьюмер, а только потом уже писать события):

select * from pgq.register_consumer({имя очереди} text, {имя консьюмера} text);

(аналогично pgq.unregister_consumer)
Каждый консьюмер получит абсолютно все события, произошедшие после его создания (даже уже обработанные другим консьюмером), это значит, что скорее всего вам нужен всего один консьюмер на одну очередь. Далее я расскажу, как при этом разделить нагрузку на несколько серверов.

Для получения бача нужно вначале узнать его ID:

select * from pgq.next_batch({имя очереди} text, {имя консьюмера} text);

Функция может вернуть NULL, если консьюмер обработал все бачи. В этом случае нужно просто подождать, пока pgqd создаст новый бач.

При этом, пока бач не обработан, эта функция будет возвращать одно и то же значение.
Получить все события в баче можно с помощью:

select * from pgq.get_batch_events({id бача} bigint);

(Бач может быть пустым.)

Если при обработке какого-то из них возникла ошибка, то можно попробовать обработать это событие позже:

select * from pgq.event_retry({id бача} bigint, {id события} bigint, {количество секунд до ретрая} integer);

Чтобы сообщить об окончании бача и получить возможность приступить к новому, используется

select * from pgq.finish_batch({id бача} bigint);

Разумеется, это не все функции в расширении, рекомендую почитать pgq.github.io/extension/pgq/files/external-sql.html и github.com/pgq/pgq/tree/master/functions (в каждом файле содержится дефиниция и описание соответствующей функции).

Распределение нагрузки


Для того чтобы обрабатывать события одновременно несколькими обработчиками, существует расширение pgq_coop, которое работает повех pgq и добавляет новую сущность «субконсьюмер» (sub consumer), который будет получать все события с момента регистрации родительского консьюмера, разумеется, кроме уже обработанных.

select * from pgq_coop.register_subconsumer({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);

select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text);

select * from pgq_coop.next_batch({имя очереди} text, {имя консьюмера} text, {имя субконсьюмера} text, {если другой субконсьюмер был неактивен в течение этого интервала, текущий может забрать у него бач} interval);

select * from pgq_coop.finish_batch({id бача} bigint);

Прочитать про все функции можно здесь.

Установка


Расширение pgq и демон pgqd входят в репозитории PGDG и в большинстве дитрибутивов устанавливаются очень просто, например, в Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX — номер версии).

pgqd — это небольшая программка, узнать про использование которой можно с помощью pgqd --help, не забудьте добавить ее в автозапуск (sudo systemctl enable pgqd.service, а конфиг по умолчанию — /etc/pgqd.ini).

Чтобы начать использовать PgQ, в базе надо просто подключить расширение:

create extension if not exists pgq;


C pgq_coop все немного сложнее, его нет в репозитории, но собрать его из исходников не составляет труда (пример для Debian):

sudo apt install postgresql-server-dev-XX
git clone https://github.com/pgq/pgq-coop.git
cd pgq-coop
sudo make install

и подключить расширение с помощью

create extension if not exists pgq_coop;

Полезные ссылки


Документация pgq
Функции pgq
Функции pgq_coop
Исходный код pgqd
github аккаунт со всеми связанными проектами
Wiki postgres-а

Комментарии (34)


  1. aol-nnov
    06.01.2020 19:48
    +2

    PgQ состоит из частей (как минимум 2-х): 1 — расширение pgq для postgres, 2 — демон pgqd

    О, ну, конечно же, это в разы меньше боли, чем установка


    RabbitMQ, ActiveMQ, ZeroMQ и тд

    // Режим сарказма: выкл


    1. Astolfo Автор
      06.01.2020 19:55

      Так установка pgq это ~7 строчек в терминале, а отдельный сервис надо не только поставить, но и обновлять, мониторить, тюнить и тд…


      1. aol-nnov
        06.01.2020 19:57
        +2

        Вы так говорите, будто "демон pgqd" не надо мониторить...


        Чем больше "кусочков", тем веселее


        1. Astolfo Автор
          06.01.2020 20:01
          -4

          pgqd это 1000 строчек кода, вещь очень тонкая, от нее надо только чтобы она была запущена github.com/pgq/pgqd/tree/master/src


      1. ertaquo
        06.01.2020 21:05
        +1

        Берем Docker и…


        1. Astolfo Автор
          06.01.2020 21:13

          … и что? Простите, но я не очень понимаю о чем вы…


          1. ertaquo
            06.01.2020 21:17

            Через Docker можно установить что угодно как отдельный сервис, и легко и быстро обновлять, мониторить и т. п. Разницы практически никакой.


            1. Astolfo Автор
              06.01.2020 21:42

              Docker, конечно, прекрасный инструмент, но не панацея, не очень понимаю как он решит проблему с мониторингом, поддержкой (если что-то сломалось), необходимостью еще одной библиотеки, с которой надо разбираться, также, я это не упомянул, но pgq можно использовать внутри транзакций и тп…
              PgQ по факту ничего в инфраструктуру не привносит (только pgqd, но это ~ ничего), а отдельные сервисы — отдельные сервисы


              1. Naglec
                06.01.2020 22:11
                +1

                Ехал докер через докер
                Докер докер докер докер


                1. Astolfo Автор
                  06.01.2020 22:14

                  Ну вот да… Жизненная ситуация…


  1. swelf
    06.01.2020 21:36
    +1

    Читаешь значит такой про REST API, а там
    получить данные — метод GET
    создать — PUT
    обновить — POST
    удалить — DELETE
    и думаешь — избыточно как-то, мне бы хватило только GET и POST, и тут приходит PgQ и говорит: «Не парься, чувак, у нас вобще все через select»


    1. Astolfo Автор
      06.01.2020 21:46
      +1

      (улыбнуло)
      Так вызываются функции в Postgres, рекомендую погуглить, или почитать что-нибудь подобное postgrespro.ru/docs/postgresql/12/xfunc-sql


    1. edo1h
      07.01.2020 01:25

      вы немного не поняли.


      вот уже есть какой-то контекст, где уже "всё через select" (активно используется postgres). и потребовалось добавить очереди. именно в этом случае и есть смысл глядеть на реализацию очередей на постгре.


      только, честно говоря, по этой статье я так и не понял почему стоит выбрать (или хотя бы серьёзно рассматривать) именно PgQ


      1. Astolfo Автор
        07.01.2020 10:01

        только, честно говоря, по этой статье я так и не понял почему стоит выбрать (или хотя бы серьёзно рассматривать) именно PgQ

        PgQ идет с Postgres, если вы уже используете Postgres, возможно, вам будет легче начать использовать и PgQ, а не отдельные сервисы (но это только мое мнение).
        Также PgQ транзакционен, это большой плюс.


        1. edo1h
          07.01.2020 10:09

          на всякий случай уточню, я имел в виду "PgQ — далеко не единственная реализация очередей на базе postgresql, статья не показывает чем он отличается от прочих"


          1. Astolfo Автор
            07.01.2020 10:35

            Тогда зависит от конкретной реализации. Скорее всего производительностью, за счет деления на бачи, например.


      1. Melkij
        07.01.2020 13:14
        +1

        почему стоит выбрать (или хотя бы серьёзно рассматривать) именно PgQ

        Потому что pgq учитывает как именно работает с данными postgresql. Очень пристально учитывает. И потому не умирает под нагрузкой от любых транзакций длительностью более 15 минут из-за помех работы вакууму — что характерно для самописных очередей (как например вот таких)


  1. dmitryb-dev
    06.01.2020 22:51
    +7

    Каждый раз когда натыкаюсь на какой нибудь плагин для постгреса, вспоминаю этот комментарий:

    Redis и MongoDB не нужны, потому что Постгрес проще, удобнее и быстрее.
    Если бы Ruby был написан на хранимках в Постгресе, он работал бы в три раза быстрее и не падал.
    Ноду пришлось писать на V8, потому что джаваскриптеры не поняли лицензию Постгреса, а Erlang появился, потому что кто-то просто не умеет пользоваться триггерами и хранимками.
    K&R после создания C пришлось писать UNIX, потому что они хотели, конечно, написать Постгрес, но ему было не на чем запускаться.
    C++0X появился только для того, чтобы умные люди перестали писать на C и перешли на Постгрес.
    Разработка файловых систем нового поколения часто тормозится (особенно ZFS), потому что люди понимают, что с какой стороны не подходи и что не делай, а из-за требований нормальной транзакционности все равно получается Постгрес!
    Сам Постгрес написан на хранимках Постгреса, мейкфайл выполнен в виде дампа Постгреса, и собирается Постгрес Постгресом.


    1. Astolfo Автор
      06.01.2020 23:09

      Я ни в коем случае не имею в виду, что сторонние сервисы для очередей не нужны, но часто без них будет проще обойтись (на мой взгляд).
      В любом случае, после прочтения, выбор брокера очередей все равно остается за вами, например, если вы и ваша команда эксперты в rabbitmq, он вам подходит (по техническим соображениям), и вы умеете его готовить, то скорее всего вам лучше всего подойдет именно он, но если это не так, теперь вы можете посмотреть и в сторону PgQ.


  1. Throwable
    07.01.2020 01:29
    +2

    Пардон, а как сие чудо использовать клиенту? Например как устроить листенер месседжей через стандартный jdbc, чтобы он не делал тупой полинг? Да и зачем так усложнять, если уже есть LISTEN/NOTIFY?


    1. Astolfo Автор
      07.01.2020 09:54

      LISTEN/NOTIFY о PubSub, а PgQ об очередях.
      А чем плох полинг? Eго просто реализовать — простой for, он никак не влияет на производительность. Если next_batch вернул null спите несколько секунд (размер тика в pgqd), если не null, то обрабатываете и сразу берете следующий.


      1. Throwable
        07.01.2020 12:07

        Плох тем, что большой latency, поэтому и не годится в сценариях, где месседжи приходят достаточно редко, но требуют моментальной обработки. Впринципе LISTEN/NOTIFY как раз мог бы использоваться для нотификации, что в такую-то очередь пришло сообщение и надо его прочитать. Но проблема в том, что стандартный PG jdbc не умеет нотификации (только через тот же поллинг).


        Поэтому прежде чем заявлять, что Postgresql умеет очереди, нужно позаботиться о клиентских интерфейсах и имплементировать стандартные протоколы типа AMQP, MQTT, STOMP, JMS, etc (например в Oracle идет поддержка JMS из коробки). А на коленках через полинг я очереди могу сделать и через обычную таблицу — для этого не нужен PgQ.


        1. Astolfo Автор
          07.01.2020 13:36

          Плох тем, что большой latency, поэтому и не годится в сценариях, где месседжи приходят достаточно редко, но требуют моментальной обработки.

          Не очень понимаю, что за сценарий такой, и не уверен, что для этого вообще стоит использовать очереди.
          (Также вам никто не запрещает использовать PgQ (для гарантий обработки) + LISTEN/NOTIFY (для отсутствия latency))

          А на коленках через полинг я очереди могу сделать и через обычную таблицу — для этого не нужен PgQ.

          Почему не надо использовать самописные очереди — отдельная тема, описанная много где, например тут.

          Поэтому прежде чем заявлять, что Postgresql умеет очереди, нужно позаботиться о клиентских интерфейсах и имплементировать стандартные протоколы типа AMQP, MQTT, STOMP, JMS, etc

          С обычными задачами (отправка уведомлений, ресайз картинок и тп (задачи, для которых latency ~секунда норма)) прекрасно справляется select + полинг.


  1. Anthrax_Beta
    07.01.2020 10:04

    1) ИМХО специализированный инструмент будет работать на порядок лучше, чем универсальный. Есть какие-нибудь тесты, показывающие, чем PgQ лучше/хуже, чем тот же RabbitMQ?
    2) Навешивая на PostgreSQL кучу всяких свистелок, вы превращаете его в одну большую точку отказа.


    1. Astolfo Автор
      07.01.2020 10:18

      Есть какие-нибудь тесты, показывающие, чем PgQ лучше/хуже, чем тот же RabbitMQ?

      Вы имеете в виду производительность? Это отдельная тема, в статье я ее почти не затрагивал, в основном я говорил об удобстве.
      Навешивая на PostgreSQL кучу всяких свистелок, вы превращаете его в одну большую точку отказа.

      Если вы используете Postgres, и вам нужна отказоустойчивость, то скорее всего вы о ней уже позаботились (patroni и тп), но да в каком-то смысле вы правы.


  1. aSimpleMan
    07.01.2020 10:08

    А что если мне нужно делать http запрос в сторонний сервис? Мне придеться переодически слать запросы на получение бачей?


    1. Astolfo Автор
      07.01.2020 10:10

      (если я правильно понял ваш вопрос)

      Мне придется периодически слать запросы на получение бачей?

      Я обычно в своем языке программирования один раз пишу абстракцию очереди и вызываю ее методы Put, Subscribe, Unsubscribe… там где нужна очередь, при этом реализую получение бачей (и другое взаимодействие с PgQ) только один раз в ней.

      А что если мне нужно делать http запрос в сторонний сервис?

      Зависит от http запроса, если вы хотите выполнить его асинхронно, то да, для этого PgQ подойдет.


  1. PavelVainerman
    08.01.2020 21:14

    А это тоже самое (вроде как от разработчиков skype) или что-то другое?

    www.pgcon.org/2008/schedule/attachments/55_pgq.pdf


    1. Astolfo Автор
      08.01.2020 21:29

      Да, PgQ был разработан в skype, только сейчас актуальная версия — 3, а этот доклад относится ко 2, но основные концепты не поменялись, поэтому подобные вещи полезно читать (на мой взгляд), несмотря на то что местами они могли устареть.


      1. PavelVainerman
        08.01.2020 21:32

        Понял. Спасибо за статью (и информацию).


  1. renaay
    08.01.2020 21:29
    +1

    Насколько я понимаю, pgq — это инструмент позволяющий перенести часть логики работы с очередями в базу данных. Но ведь, большинство маломальски средних проектов наоборот, стремятся вынести всю логику на уровне кода в пользу базы данных. Возможно, конечно, у меня просто небыло задач, где бы пригодился pgq.
    В любом случае спасибо за ваш труд, было познавательно почитать.


    1. Astolfo Автор
      08.01.2020 21:37

      Но ведь, большинство маломальски средних проектов наоборот, стремятся вынести всю логику на уровне кода в пользу базы данных.

      Обычно речь о бизнес-логике, тут ситуация другая. Очередь это тип базы данных, вы же используете БД, а не храните данные в собственном формате в файлах или вообще напрямую на блочном устройстве.
      В данном случае PgQ легко (по крайней мере я пишу на golang) оборачивается в абстракцию на уровне вашего приложения и легко подменяется другими реализациями в случае необходимости, а поверх этого уже реализуется ваша бизнес-логика.


  1. zhenya12
    08.01.2020 21:40

    1. хотел поинтересоваться есть примеры когда нужно использовать данное решение?
    2. как данное решение можно масштабировать?
    3. есть возможность подписаться и не писать свой скедулер а просто получить сообщение которое пришло?
    4. какая конфигурация пула бд должно быть скажем на 1к


    1. Astolfo Автор
      08.01.2020 22:05

      1. хотел поинтересоваться есть примеры когда нужно использовать данное решение?

      Вообще они есть в различных докладах на эту тему, но, если кратко, то тогда же, когда вы используете очереди (т.к. это и есть очередь), например в этой статье есть раздел Использование очереди сообщений, пункты которого относятся к +- любой очереди.

      2. как данное решение можно масштабировать?

      Вместе с Postgres, но у PgQ такая производительность, что проблем не должно быть (рекомендую вам самим это проверить).

      3. есть возможность подписаться и не писать свой скедулер а просто получить сообщение которое пришло?

      А что вы называете скедулером? Получение бачей — это простой цикл for + if + sleep, работающий в отдельном потоке.

      4. какая конфигурация пула бд должно быть скажем на 1к

      Не очень понимаю что вы имеете в виду под пулом бд и , но если я правильно понял, то PgQ спокойно обрабатывает тысячи запросов в секунду на t3.nano + 20 gb gp2 ebs, не верьте мне, проверьте сами.