Как многие интересующися знают, в PostgreSQL в версии 9.4 появилось (наконец-то) логическое декодирование (logical decoding). Теперь, чтобы сделать свою репликацию, необязательно разбираться с форматом бинарных wal файлов или писать триггеры (может были еще способы), а преобразовать данные в удобный для себя формат. Для этого достаточно написать плагин к PostgreSQL, который будет этим заниматься. В статье описывается плагин, который преобразует данные в JSON.


Код плагина находится на гитхабе — github.com/ildus/decoder_json. Приветсвуются pull-requests с улучшениями (особенно по части улучшения поддержки типов), багфиксами и просто косметическими улучшениями. JSON был выбран за простоту. Это не окончательный вариант, возможно после тестирования на реальных данных окажется что нужен более производительный формат, и придется переделать. В статье я не буду приводить весь код плагина, а только части про которые мне кажется нужно рассказать.

Необходимые требования для создания плагина: знание С, установленные средства сборки (gcc, cmake), установленные пакеты (в debian-системах) postgresql-9.4, postgresql-server-dev-9.4 и аналогичные в других системах. После установки postgresql, в postgresql.conf надо установить значение max_replication_slots = 1 (или больше) и wal_level = logical.

Сам плагин представляет собой подключаемую библиотеку на C, из которой вызываются callback функции на события postgresql. При инициализации вызывается функция _PG_output_plugin_init со структурой, полям которой полям нужно назначить свои функции:

  • startup_cb — функция, вызываемая при инициализации плагина
  • begin_cb — начало транзакции
  • change_cb — запись данных
  • commit_cb — потверждение транзакции
  • shutdown_cb — деинициализация плагина

Функция которая заполняет структуру:

void _PG_output_plugin_init(OutputPluginCallbacks *cb)
{
    cb->startup_cb = decoder_json_startup;
    cb->begin_cb = decoder_json_begin_txn;
    cb->change_cb = decoder_json_change;
    cb->commit_cb = decoder_json_commit_txn;
    cb->shutdown_cb = decoder_json_shutdown;
}

Теперь осталось определить эти пять функций. decoder_json_startup вызывается в начале декодирования и используется для задания опций декодирования и создания своего контекста памяти:

Функция decoder_json_startup
/* initialize this plugin */
static void
decoder_json_startup(LogicalDecodingContext *ctx,
                     OutputPluginOptions *opt,
                     bool is_init)
{
    ListCell   *option;
    DecoderRawData *data;

    data = palloc(sizeof(DecoderRawData));
    data->context = AllocSetContextCreate(ctx->context,
                                          "Raw decoder context",
                                          ALLOCSET_DEFAULT_MINSIZE,
                                          ALLOCSET_DEFAULT_INITSIZE,
                                          ALLOCSET_DEFAULT_MAXSIZE);
    data->include_transaction = false;
    data->sort_keys = false;

    ctx->output_plugin_private = data;

    /* Default output format */
    opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;

    foreach(option, ctx->output_plugin_options)
    {
        DefElem    *elem = lfirst(option);

        Assert(elem->arg == NULL || IsA(elem->arg, String));

        if (strcmp(elem->defname, "include_transaction") == 0)
        {
            /* if option does not provide a value, it means its value is true */
            if (elem->arg == NULL)
                data->include_transaction = true;
            else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                strVal(elem->arg), elem->defname)));
        }
        else if (strcmp(elem->defname, "sort_keys") == 0) {
            /* if option does not provide a value, it means its value is true */
            if (elem->arg == NULL)
                data->sort_keys = true;
            else if (!parse_bool(strVal(elem->arg), &data->sort_keys))
                ereport(ERROR,
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                strVal(elem->arg), elem->defname)));
        }
        else
        {
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("option \"%s\" = \"%s\" is unknown",
                            elem->defname,
                            elem->arg ? strVal(elem->arg) : "(null)")));
        }
    }
}

Здесь парсятся параметры, переданные в плагин и сохраняются в структуру. Созданный контекст памяти используется потом в decoder_json_change чтобы корректно почистить используемые ресурсы. Важные моменты:

  • свои данные сохраняются в ctx->output_plugin_private
  • opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT — так задается что вывод плагина будет текстовый

decoder_json_shutdown вызывается в конце декодирования, и используется для чистки ресурсов.

Функция decoder_json_shutdown
/* cleanup this plugin's resources */
static void
decoder_json_shutdown(LogicalDecodingContext *ctx)
{
    DecoderRawData *data = ctx->output_plugin_private;

    /* cleanup our own resources via memory context reset */
    MemoryContextDelete(data->context);
}


Дальше самое интересное. Надо определить функции decoder_json_begin_txn, decoder_json_commit_txn и decoder_json_change которые собственно и генерируют строки, получаемые командами pg_logical_slot_peek_changes и pg_logical_slot_get_changes. Сгенеренную строку надо добавить в слот, это делается командами:

OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "some string");
OutputPluginWrite(ctx, true);

Функции decoder_json_begin_txn и decoder_json_commit_txn пишут (или просто пропускают, если есть такое условие) команды начала и конца транзакции в слот — строки 'begin' и 'commit' соответсвенно.

Функция decoder_json_change вызывается на событие изменения данных. В этой функции определяется какое именно событие произошло (INSERT, UPDATE, DELETE) и для каждого из них создается своя структура. Для UPDATE и DELETE важно наличие уникального (not null) или первичного ключа в таблице, иначе просто невозможно определить изменяемую (удаляемую) строку. Это зависит от значения параметра REPLICA IDENTITY для таблицы.

Эта функция принимает 4 параметра:

  • LogicalDecodingContext *ctx — контекст
  • ReorderBufferTXN *txn
  • Relation relation — сведения о изменяемой таблице
  • ReorderBufferChange *change — сведения о данных

Кратко про функцию можно сказать что тип операции определяется через change->action. Далее по данным в change (change->data.tp.newtuple и change->data.tp.oldtuple) создается JSON структура. JSON генерируется с помощью библиотеки libjansson.

Вот здесь то и начинаются сложности. Если REPLICA IDENTITY для таблицы установлен в NOTHING или DEFAULT при отсутствующем первичном ключе, то невозможно определить изменяемые строки и в лог попадут только записи добавления. При обновлении или удалении данных с таблицы с DEFAULT, FULL, INDEX и при наличии уникального ключа, то его значение берется из newtuple или из oldtuple (если значение ключа изменяется запросом). При отсутствии уникального ключа и если FULL, то для идентификации используются все значения из oldtuple.

В результате строится JSON структура, вида {"a": 0, "r": "public.some_table", "c": {"id": 1}, "d": {"a": 2}}, где a — это тип действия, r — название таблицы, c — значения для идентификации строки, d — собственно данные.

Проверим работу. Сборка плагина и запуск тестов:

git clone https://github.com/ildus/decoder_json.git
cd decoder_json
# разрешаем запись всем в папку с библиотеками постгреса - ни в коем случае нельзя делать на продакшене
sudo chmod a+rw `pg_config --pkglibdir`
chmod a+rwx ./
# скачиваем и собираем libjansson, для генерации JSON
make deps 
# переключаемся под юзера postgres
sudo su postgres
make
make test

Тестирование плагина вручную:

# переключаемся под юзера postgres, создаем тестовую бд и открываем консоль работы с бд
sudo su postgres
createdb test_db
psql test_db

# psql консоль
test_db=# create table test1 (id serial primary key, name varchar);
test_db=# SELECT * FROM pg_create_logical_replication_slot('custom_slot', 'decoder_json');
  slot_name  | xlog_position 
-------------+---------------
 custom_slot | 0/4D9F870
(1 row)

Здесь мы указываем название слота и подключаемый плагин. В ответе мы видим название слота, и место (xlog позиция) с которого собственно начинается запись данных в слот. То что мы указали наш плагин, не означает что он уже работает, само декодирование начинается только когда мы забираем данные. Для этого используются функции pg_logical_slot_peek_changes и pg_logical_slot_get_changes. Они отличаются тем что get функция после получения данных чистит очередь.

Добавление данных:

test_db=# insert into test1 values (1, 'bb');
INSERT 0 1
test_db=# insert into test1 values (2, 'bb');
INSERT 0 1
test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on');
 location  |  xid  |                        data                         
-----------+-------+-----------------------------------------------------
 0/BAB0968 | 48328 | begin
 0/BAB0968 | 48328 | {"a":0,"r":"public.test1","d":{"id":1,"name":"bb"}}
 0/BAB09F0 | 48328 | commit
 0/BAB09F0 | 48329 | begin
 0/BAB09F0 | 48329 | {"a":0,"r":"public.test1","d":{"id":2,"name":"bb"}}
 0/BAB0A78 | 48329 | commit
(6 rows)

Изменение данных

test_db=# update test1 set name = 'dd' where id=2;
UPDATE 1
test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on');
 location  |  xid  |                               data                               
-----------+-------+------------------------------------------------------------------
 0/BB4C700 | 48338 | begin
 0/BB4C700 | 48338 | {"c":{"id":2},"a":1,"r":"public.test1","d":{"id":2,"name":"dd"}}
 0/BB4C798 | 48338 | commit
(3 rows)

Удаление данных:

test_db=# delete from test1 where id=2;
DELETE 1
test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on');
 location  |  xid  |                  data                   
-----------+-------+-----------------------------------------
 0/BB4C8A8 | 48339 | begin
 0/BB4C8A8 | 48339 | {"c":{"id":2},"a":2,"r":"public.test1"}
 0/BB4C9C8 | 48339 | commit
(3 rows)

Использованные и полезные ресурсы:

  • документация PostgreSQL — www.postgresql.org/docs/9.4/static/logicaldecoding.html
  • пример плагина из PostgreSQL (https://github.com/postgres/postgres/tree/master/contrib/test_decoding)
  • michael.otacoo.com/ — очень полезный блог, плагин decoder_raw автора этого блога использовался как основа для моего плагина.
  • github.com/xstevens/decoderbufs — плагин который использует google protocol buffers как выходной формат.

Комментарии (6)