PostgreSQL предлагает метод логического декодирования и делает возможным сбор данных об изменениях на основе логирования. Вы сможете настроить и запустить CDC в несколько шагов.

Архитектура современных веб-приложений состоит из нескольких программных компонентов, таких как информационные панели (дашборды), аналитические системы, базы данных, озёра данных (Data Lakes), кэшевые хранилища, функции поиска и т.д.

База данных обычно является основной частью любого приложения. Обновление данных в режиме реального времени позволяет поддерживать разрозненные системы данных в непрерывной синхронизации и быстро реагировать на появление новой информации. Как же поддерживать экосистему приложений в синхронном состоянии? Как эти компоненты получают информацию об изменениях в базе данных? Термин отслеживание изменённых данных, или сокращённо CDC, — относится к любому решению, которое идентифицирует новые или изменённые данные.

Статья посвящена отслеживанию изменённых данных (CDC) в PostgreSQL и способам достижения этой цели.
Отслеживание изменённых данных (CDC) — это метод интеграции данных для обнаружения, захвата и передачи изменений, внесённых в источники данных базы данных.
Как правило, интеграция данных на основе CDC состоит из следующих шагов:

  1. Захват изменённых данных в исходной базе данных.
  2. Преобразование изменённых данных в формат, который могут принять ваши потребители (консьюмеры).
  3. Публикация данных для консьюмеров или целевой базы данных.

PostgreSQL предлагает два встроенных способа сделать CDC возможным:

  • Из журналов транзакций, PostgreSQL WALs (они же Write Ahead Logs).
  • С помощью триггеров базы данных.

Давайте кратко обсудим плюсы и минусы использования журналов транзакций (WALs) и триггеров для отслеживания изменения данных.

Триггеры


Методы на основе триггеров предполагают создание триггеров аудита в базе данных для регистрации всех событий, связанных с методами INSERT, UPDATE и DELETE.

Триггеры могут быть привязаны к таблицам (разделённым и нет) или представлениям (views).

Они также могут срабатывать для операторов TRUNCATE. При возникновении события триггера — функция вызывается в соответствующее время для обработки события.

  • ???? Главное преимущество этого метода заключается в том, что всё это можно сделать на уровне SQL, в отличие от журналов транзакций.
  • ???? Однако использование триггеров оказывает значительное влияние на производительность исходной базы данных, поскольку эти триггеры должны запускаться в базе данных приложения при внесении изменений в данные.

Журналы транзакций


В большинстве современных СУБД журналы транзакций (WAL для PostgreSQL) обычно используются для логирования и дублирования (репликации) транзакций.

В PostgreSQL все транзакции, такие как INSERT, UPDATE, DELETE, записываются в WAL до того, как клиент получает результат транзакции.

  • Преимущество этого подхода в том, что он никак не влияет на производительность базы данных.
  • Он также не требует модификации таблиц БД или приложения. Нет необходимости создавать дополнительные таблицы в исходной базе данных.
  • CDC на основе журнала обычно считается лучшим подходом к отслеживанию изменённых данных, применимым ко всем возможным сценариям, включая системы с чрезвычайно высокими объёмами транзакций.

Учтите, что в настоящее время большинство операторов DDL, таких как CREATE, DROP, ALTER, не отслеживаются. Однако команда TRUNCATE в потоке логической репликации присутствует.
Если вам нужна потоковая передача изменений данных Postgres по мере их возникновения, вам понадобится функция логического декодирования или логической репликации Postgres.

image

Применение логического декодирования Postgres


Логическое декодирование — это официальное название основанной на логировании PostgreSQL CDC логической репликации.

Логическое декодирование использует содержимое журнала PostgreSQL Write-Ahead Log для хранения всех действий, происходящих в базе данных. Журнал Write Ahead Log — это внутренний журнал, который описывает изменения базы данных на уровне хранилища.

1. Первым шагом в использовании логического декодирования является установка следующих параметров в конфигурации Postgres postgresql.conf:

wal_level = logical
max_replication_slots = 5
max_wal_senders = 10

  • Установка wal_level в значение logical позволяет WAL записывать информацию, необходимую для логического декодирования.
  • Убедитесь, что значение параметра max_replication_slots равно или больше количества коннекторов PostgreSQL, использующих WAL, и прибавьте к этому количество других слотов репликации, используемых вашей базой данных.
  • Убедитесь, что параметр max_wal_senders, определяющий максимальное количество одновременных соединений с WAL, как минимум вдвое превышает количество логических слотов репликации. Например, если ваша база данных использует в общей сложности 5 слотов репликации, значение параметра max_wal_senders должно быть 10 или больше.

Перезапустите ваш сервер Postgres, чтобы применить изменения.

2. Второй шаг заключается в настройке логической репликации с помощью подключаемого модуля test_decoding

Создайте слот логической репликации для базы данных, которую вы хотите синхронизировать, выполнив следующую команду:

SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');

Примечание: Каждый слот репликации имеет имя, которое может содержать строчные буквы, цифры и символ подчёркивания.

Чтобы убедиться в том, что слот был успешно создан, выполните следующую команду:

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

3. В следующем шаге создайте публикацию для всех ваших таблиц или только для тех, что вам нужны. Если вы зададите конкретные таблицы, вы сможете добавить или удалить их из публикации позже.

CREATE PUBLICATION pub FOR ALL TABLES;

или

CREATE PUBLICATION pub FOR TABLE table1, table2, table3;

По желанию вы можете выбрать, какие операции включить в публикацию. Например, следующая публикация включает для первой таблицы (table1) только операции INSERT и UPDATE.

CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');

4. Убедитесь, что выбранные вами таблицы есть в публикации.

psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub     | public     | table1
pub     | public     | table2
pub     | public     | table3
(3 rows)

С этого момента наша публикация pub будет отслеживать изменения всех таблиц в базе данных psql-stream.

5. Давайте создадим условную таблицу t и заполним её несколькими записями:

create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);

В результате мы получим 10 записей в таблице t.

psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)

6. Наконец, пришло время проверить, работает ли наша логическая репликация.

Выполните следующую команду в консоли PostgreSQL, чтобы увидеть записи Postgres WAL:

SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);

В результате вы получите что-то вроде:

    lsn    | xid  |                          data                          
-----------+------+--------------------------------------------------------
 0/19EA2C0 | 1045 | BEGIN 1045
 0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
 0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
 0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
 0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
 0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
 0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
 0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
 0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
 0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
 0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
 0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)

pg_logical_slot_peek_changes — это ещё одна команда PostgreSQL для просмотра изменений из записей WAL без их поглощения. Поэтому многократный вызов команды pg_logical_slot_peek_changes будет возвращать один и тот же результат.

В свою очередь, pg_logical_slot_get_changes возвращает результаты только при первом вызове. Последующие вызовы pg_logical_slot_get_changes возвращают пустые наборы результатов. Это означает, что при выполнении команды get результаты обрабатываются и удаляются, что значительно расширяет наши возможности по написанию логики использования этих событий для создания реплики таблицы.

7. Не забудьте избавиться от слота, который вам больше не нужен, чтобы остановить его поглощение.

SELECT pg_drop_replication_slot('replication_slot');

Подключаемые модули


Мы уже говорили о подключаемом модуле test_decoding, доступном на Postgres 9.4+. Хотя он и был создан как пример подключаемого модуля, он всё ещё полезен, если ваш консьюмер поддерживает его.

Наряду с плагином test_decoding, с PostgreSQL поставляется ещё один плагин — pgoutput. pgoutput доступен начиная с Postgres 10. Некоторые потребители поддерживают его для декодирования (например, Debezium).

Выполните следующую команду, чтобы создать плагин на основе pgoutput, как в шаге 2 выше.

SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');

Следующая команда поглощает измененённые данные, аналогичные описанным в шаге 6.

psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
    lsn    | xid  |                                           data                                           
-----------+------+------------------------------------------------------------------------------------------
 0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
 0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
 0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
 0/19A1890 | 1038 | \x49000080384e0002740000000234316e
 0/19A1910 | 1038 | \x49000080384e0002740000000234326e
 0/19A1990 | 1038 | \x49000080384e0002740000000234336e
 0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
 0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
 0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
 0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
 0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
 0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
 0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)

Здесь можно заметить, что результаты возвращаются в двоичном формате. Плагин pgoutput производит двоичный вывод.

wal2json — ещё один популярный плагин вывода для логического декодирования.

Вот пример вывода данных из плагина wal2json:

{
      "change":[
         {
            "kind":"insert",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               ""
            ]
         }
      ]
   }
   {
      "change":[
         {
            "kind":"update",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               "New Value"
            ],
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }
   {
      "change":[
         {
            "kind":"delete",
            "schema":"public",
            "table":"t",
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }


Важные рекомендации по работе со слотами


При работе со слотами помните о следующем:

  • Каждый слот может иметь только один подключаемый модуль (вы сами выбираете, какой именно).
  • Каждый слот предоставляет изменения только из одной базы данных.
  • Одна база данных может иметь несколько слотов.
  • Каждое изменение данных обычно выдаётся один раз для каждого слота.
  • Однако слот может повторно выдать изменения при перезапуске инстанса Postgres (консьюмер должен справиться с этой задачей).
  • Непоглощенный слот — это угроза доступности инстанса Postgres. Postgres будет сохранять все WAL-файлы для этих непоглощенных изменений. Это может привести к переполнению хранилища.

Консьюмеры PostgreSQL WAL


Консьюмер — это любое приложение, которое может получить поток логического декодирования Postgres. pg_recvlogical — это приложение PostgreSQL, которое может управлять слотами и поглощать из них поток. Оно включено в дистрибутив Postgres, поэтому, скорее всего, оно уже будет установлено у вас вместе с PostgreSQL.

image
Фото: Markus Spiske / Unsplash

Образец программы на Golang


Следующий образец программы на Golang показывает, как приступить к созданию собственного консьюмера Postgress WAL. Он использует логическую репликацию PostgreSQL-10.x для потоковой передачи изменений базы данных (декодированных сообщений WAL) из исходной базы данных.

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/jackc/pgconn"
    "github.com/jackc/pglogrepl"
    "github.com/jackc/pgproto3/v2"
)

// Обратите внимание, что параметр выполнения "replication=database" в строке подключения является обязательным
// слот репликации не будет создан, если опустить значение replication=database

const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"

var Event = struct {
    Relation string
    Columns  []string
}{}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()
    conn, err := pgconn.Connect(ctx, CONN)
    if err != nil {
        panic(err)
    }
    defer conn.Close(ctx)

    // 1. Создайте таблицу
    if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
        fmt.Errorf("failed to create table: %v", err)
    }

    // 2. Убедитесь, что публикация существует
    if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
        fmt.Errorf("failed to drop publication: %v", err)
    }

    if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
        fmt.Errorf("failed to create publication: %v", err)
    }

    // 3. Создайте временный сервер репликации слотов
    if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
        fmt.Errorf("failed to create a replication slot: %v", err)
    }

    var msgPointer pglogrepl.LSN
    pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}

    // 4. Установите связь
    err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
    if err != nil {
        fmt.Errorf("failed to establish start replication: %v", err)
    }

    var pingTime time.Time
    for ctx.Err() != context.Canceled {
        if time.Now().After(pingTime) {
            if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
                fmt.Errorf("failed to send standby update: %v", err)
            }
            pingTime = time.Now().Add(10 * time.Second)
            //fmt.Println("client: please standby")
        }

        ctx, cancel := context.WithTimeout(ctx, time.Second*10)
        defer cancel()

        msg, err := conn.ReceiveMessage(ctx)
        if pgconn.Timeout(err) {
            continue
        }
        if err != nil {
            fmt.Errorf("something went wrong while listening for message: %v", err)
        }

        switch msg := msg.(type) {
        case *pgproto3.CopyData:
            switch msg.Data[0] {
            case pglogrepl.PrimaryKeepaliveMessageByteID:
            //    fmt.Println("server: confirmed standby")

            case pglogrepl.XLogDataByteID:
                walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
                if err != nil {
                    fmt.Errorf("failed to parse logical WAL log: %v", err)
                }

                var msg pglogrepl.Message
                if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
                    fmt.Errorf("failed to parse logical replication message: %v", err)
                }
                switch m := msg.(type) {
                case *pglogrepl.RelationMessage:
                    Event.Columns = []string{}
                    for _, col := range m.Columns {
                        Event.Columns = append(Event.Columns, col.Name)
                    }
                    Event.Relation = m.RelationName
                case *pglogrepl.InsertMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.UpdateMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.DeleteMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.TruncateMessage:
                    fmt.Println("ALL GONE (TRUNCATE)")
                }
            }
        default:
            fmt.Printf("received unexpected message: %T", msg)
        }
    }
}

Эта программа просто регистрирует входящие события, но в производственной среде вы можете легко отправить их в очередь сообщений или в целевую базу данных.

Заключение


Логическое декодирование в PostgreSQL обеспечивает эффективный способ для других компонентов приложения быть в курсе изменений данных в вашей базе данных Postgres.
Традиционно используется модель уведомлений pull, при которой каждый компонент приложения запрашивает Postgres через определённый интервал времени. Логическое кодирование использует модель push-уведомления, при которой Postgres уведомляет другие части приложения о каждом изменении, как только оно происходит.
В настоящее время события изменения данных могут быть отправлены консьюмерам за миллисекунды без запроса к базе данных. Благодаря логическому декодированию, база данных PostgreSQL становится центральной частью вашего современного динамического приложения реального времени (RTA).


НЛО прилетело и оставило здесь промокод для читателей нашего блога:

15% на все тарифы VDS (кроме тарифа Прогрев) — HABRFIRSTVDS.

Комментарии (4)


  1. Melkij
    10.06.2022 11:29
    +3

    Преимущество этого подхода в том, что он никак не влияет на производительность базы данных.

    Да ну? Само по себе повышение wal_level увеличивает объёмы записи в WAL. Отстающий слот логической репликации будет мешать работать автовакууму, да и logical decoding на хорошем потоке записи ресурсы занимает. Может быть сильно выгоднее именно использовать триггеры, чтобы не декодировать все изменения в СУБД ради отслеживания пары таблиц.
    Заострю внимание: если вы хотите сделать CREATE PUBLICATION с всего одной табличкой — logical decoding будет декодировать абсолютно весь поток WAL с этой СУБД и просто отбрасывать изменения не связанные с этой таблицей. Именно так, а не наоборот, когда какой-то магией декодируются только изменения в этой таблице.

    Ну а CREATE PUBLICATION и компания для test_decoding не нужны и не важны. Механизм публикаций и изменений работает через pgoutput плагин и именно для pub/sub этот плагин logical decoding и был создан. Предполагается что consumer так же postgresql, поэтому формат протокола бинарный, но, конечно, никто не мешает сделать совместимую реализацию consumer'а.

    В целом же штук хороший и вполне удобный. Пожалуй стоит только отдельно упомянуть, что не надо добавлять в подписку таблички без primary key. База будет отвергать update и delete в такой табличке пока руками через создание pk или простановку replica identity не объясните, что делать с данными. Запись данных будет отклоняться именно на стороне publication базы. Нередкая история «как положить сервис»


  1. PrinceKorwin
    10.06.2022 11:47

    Почитал оригинальную документацию и эту статью и не понял следующий момент.

    Можно ли сделать цепочку логических репликаций? Примерно такую схему:

    PG Master - logical replication -> PG Slave - logical replication -> Consumer

    Т.е. чтобы репликация шла не с мастера, но со вторичной ноды.


    1. zuborg
      10.06.2022 12:25

      Да, можно.

      Любой инстанс постгреса можно настроить публиковать изменения, слейв в том числе.


  1. Vitaly2606
    11.06.2022 22:21

    Логическое декодирование применимо при не больших нагрузках. Когда мы говорим о 2 и более WAL в секунду то уже могут быть проблемы с лагом репликации.

    https://gitlab.com/postgres-ai/postgresql-consulting/tests-and-benchmarks/-/issues/32