Довольно типичная схема при разработке системы, когда основная логика обработки сосредоточена в приложении (в нашем случае Erlang), а данные для работы этого приложения (настройки, профили пользователей и т. д.) в базе данных (PostgreSQL). Приложение Erlang кэширует настройки в ETS для ускорения обработки и снижения нагрузки на БД путём отказа от постоянных запросов. При этом изменение этих данных происходит через отдельный (возможно, внешний) сервис.


В таких ситуациях встаёт задача поддержания закэшированных данных в актуальном состоянии. Есть разные подходы для решения этой задачи. Один из них — это логическая репликация PostgreSQL. О нем и пойдёт речь ниже.


Протокол потоковой логической репликации


Логическая репликация использует протокол потоковой репликации PostgreSQL для получения изменения данных в таблицах PostgreSQL путём чтения WAL логов, фильтрации нужный таблиц и отправки этих изменений подписчику. Этот механизм аналогичный тому, который используется для физической репликации для создания standby БД.


Логическая репликация предоставляет следующие преимущества:


  • получение изменения без задержек в реальном времени;
  • фильтрация получаемых изменений по таблицам и операциям (INSERT/DELETE/UPDATE);
  • полнота и целостность данных, получаемых подписчиком. Подписчик получает изменения в том же порядке, как они происходили в БД;
  • нет потери данных в случае временной остановки подписчика. PostgreSQL запоминает, где остановилась репликация;

Подготовка базы данных


Для работы с логической репликацией необходим плагин, который декодирует WAL записи от сервера в более удобный формат.
До версии PostgreSQL 10 можно использовать расширение/extension pglogical_output plugin.
Начиная с PostgreSQL 10 pgoutput plugin.
В этой статье будем рассматривать pgoutput plugin.


На стороне PostgreSQL необходимо выполнить следующие шаги:


  • Выставить параметры для поддержки логической репликации в
    postgresql.conf


    wal_level = 'logical'
    max_replication_slots = 5
    max_wal_senders = 5

  • Создать роль, которая будет использоваться для репликации. Роль должна иметь атрибут REPLICATION или SUPERUSER.


    CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test'; 

  • Разрешить доступ для этой роли в pg_hba.conf c database = replication


    host    replication     epgl_test    127.0.0.1/32    trust

  • Создать публикацию/publication. При создании публикации мы указываем таблицы, которые мы планируем получать в приложении Erlang


    CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; 
    ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; -- добавить таблицу в уже существующую публикацию


Erlang часть


Не так давно поддержка протокола потоковой репликации была добавлена в популярную Erlang библиотеку для работы с PostgreSQL EPGSQL. На основе этой библиотеки мы и будем строить логику получения изменений в Erlang.
Так как формат непосредственно данных в сообщении XlogData протокола зависит от того, какой плагин используется для слота репликации, библиотека EPGSQL не декодирует данные, а вызывает Callback-метод или посылает сообщение процессу асинхронно.


Подключение к БД


Должно быть создано специальное репликационное соединение с БД, для этого надо передать флаг replication.
В рамках репликационного соединение к БД можно выполнять только репликационные команды (например DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Выполнить обычный запрос через это соединение нельзя.


Создание репликационного слота


Репликационный слот используются для отслеживания текущей позиции переданного WAL-лога.
При создании репликационного слота задаётся плагин для декодирования.


С версии PostgreSQL 10 появилась возможность создавать временные репликационные слоты, которые автоматически удаляются при закрытии репликационного соединения.


Если приложение считывает начальное состояние таблиц каждый раз при старте, то я рекомендую использовать временные репликационные слоты, в этом случае не надо будет заботиться об удалении созданных репликационных слотов (DROP_REPLICATION_SLOT). Удалять старые/не используемые репликационные слоты крайне важно, потому что PostgreSQL не удаляет WAL логи пока подписчики всех репликационных слотов не получат изменения. Если остался не активный репликационный слот, то WAL логи начнут накапливаться и рано или поздно произойдёт переполнение файловой системы.


Получение начального состояния таблиц


При создании репликационного слота (см. предыдущий шаг), автоматически создаётся snapshot, который показывает состояние базы данных на момент создания слота. Этот snapshot может быть использован для загрузки начального состояния таблиц, которое было на начало репликации.


Snapshot доступен только пока репликационное соединение, в котором была выполнена команда CREATE_REPLICATION_SLOT не закрыто.


Для загрузки начальных данных должно быть создано новое обычное/не репликационное соединение к БД, так как выполнить SELECT в репликационном соединении нельзя. В этом соединении устанавливаем snapshot SET TRANSACTION SNAPSHOT SnapshotName и извлекаем нужные данные.


Запуск репликации


Запускаем репликацию для созданного репликационного слота. При запуске репликации передаём дополнительные параметры для плагина, для pgoutput это имя созданной публикации.


Все шаги вместе


start_replication() ->
    %% Создание репликационного соединения
    {ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]),

    %% Создание репликационного слота
    {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn,
        "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput").

    %% Получение начального состояния таблиц
    {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]),
    {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"),
    {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]),
    %% select/load data epgsql:equery(NormalConn,...
    epgsql:close(NormalConn),

    %% Запуск репликации
    ReplSlot = "epgl_repl_slot",
    Callback = ?MODULE,
    CbInitState = #{},
    WALPosition = "0/0", 
    PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'",
    ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts).

handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
    io:format("~p~n", [{StartLSN, EndLSN, Data}]),
    {ok, EndLSN, EndLSN, CbState}.

Есть два варианта взаимодействия с библиотекой EPGSQL:


  • Синхронный. В качестве Callback передаётся имя модуля. Библиотека для полученных данных будет вызывать функцию CallbackModule:handle_x_log_data. Функция должна возвращать LastFlushedLSN, LastAppliedLSN, который посылается в ответ PostgreSQL, чтобы отслеживать текущее положение репликационного слота. В своих проектах мы используем только этот вариант;


  • Асинхронный. В качестве Callback передаётся pid процесса, который будет получать сообщения вида {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}. После обработки процесс должен сообщить обработанный LSN через вызов epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN);



Вместо заключения


Дополнительно, чтобы использовать описанный подход, необходимо реализовать декодирование сообщений из формата плагина репликационного слота в более привычные для Erlang структуры. Или воспользоваться библиотекой с GitHub, которая реализует декодирование для двух плагинов и упрощает выполнение репликационных команд.

Комментарии (4)


  1. erlyvideo
    30.12.2019 17:51

    ощущение некоторой недосказанности.

    Что дальше стоит делать с этой инфой? Сюда будут приходить только евенты по таблице с конфигом или вообще всё? Если вообще всё, то по конфигурационному каналу мы получим поток информации о всём потоке данных?


    1. mish_gun Автор
      30.12.2019 21:03

      Да, будут приходить изменения только по тем нужным таблицам, которые включены в публикацию/publication.
      При создании публикации добавляем в нее только нужные таблицы (может быть добавлено несколько таблиц), запускаем репликацию в Erlang, как описано в этой статье.
      Далее при любом изменении (insert/update/delete) по заданным таблицам в БД в Erlang будет приходить информация об этих изменениях.
      Информацию по изменениям других таблиц, не включенных в публикацию, будет отфильтрована на стороне PostgreSQL.

      Этот подход можно использовать для поддержания кэша данных в Erlang в консистентном и актуальном состоянии. PostgreSQL будет посылать изменения сразу как они произошли, в том же порядке, как они происходили. Пример есть в библиотеке, ссылка на которую была в конце статьи


  1. chirik161
    30.12.2019 21:03

    существует подобная реализация на elixir?


    1. mish_gun Автор
      30.12.2019 21:04

      Я не встречал, но и не искал для Elixir