Привет, Хабр!
Так вышло, что по работе я столкнулся с необходимостью репликации некоторого (достаточно большого) количества таблиц из различных баз postgres
в одну базу postgres
для нужд Data Science. Казалось бы, что может быть проще, ведь в postgres
есть механизм логической репликации. А логическая репликация между двумя постгресами вообще не требует никаких дополнительных инструментов.
К сожалению, от этого варианта пришлось отказаться — большое количество таблиц и операций вставки/обновления/удаления строк в каждой из баз данных недопустимо увеличивало бы размер WAL-журналов.
Некоторые инструменты репликации, как, например, meltano, поддерживают, помимо log-based репликации, инкрементную репликацию на основе ключей и полную репликацию.
Инкрементная репликация обычно использует значение определенного столбца, например, метку времени или увеличивающееся целое число. Да и в принципе наличие поля lastmodified позволяет “в лоб” реплицировать данные за определенный промежуток времени.
Однако, я столкнулся с тем, что не во всех нужных мне таблицах было такое поле, а в некоторых данные могли обновляться “задним” числом.
Так я пришел к мысли использовать postgres_fdw для связи с данными на других серверах postgres
и dbt для переноса данных в локальные таблицы.
Подготовим данные
Предлагаю воспользоваться этим репозиторием. Он содержит csv-файл с небольшим набором данных транзакций. В принципе, для демонстрации этого достаточно.
Отредактируем файл docker-compose.yaml
и добавим туда еще один контейнер postgres
, куда мы будем реплицировать наши данные:
docker-compose.yaml
version: '3'
services:
postgresdb:
container_name: postgresdb
image: postgres:14.5
restart: always
environment:
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "postgres"
volumes:
- ./data/init.sql:/docker-entrypoint-initdb.d/init.sql
- ./data/transaction_data.csv:/var/lib/postgresql/files/transaction_data.csv
ports:
- "5432:5432"
networks:
- postgres
postgresdb_replica:
container_name: postgresdb_replica
image: postgres:14.5
restart: always
environment:
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "postgres"
ports:
- "5430:5432"
networks:
- postgres
networks:
postgres:
Запускаем наши контейнеры:
docker compose -f “docker-compose.yaml” up -d –build
Теперь у нас есть два работающих экземпляра postgres
.
Далее нам необходимо подключиться к базе данных контейнера postgresdb_replica
и подготовиться к обращению к удаленным данным через postgres_fdw
. Для начала узнаем IPAddress
контейнера postgresdb
:
docker inspect \
-f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' postgresdb
Мой хост — 172.25.0.3.
Настройка postgres_fdw
Далее подключаемся к postgresdb_replica
и выполняем команды:
-- создаем схему, куда будем складывать сторонние таблички
CREATE schema IF NOT EXISTS foreign_tables;
-- устанавливаем расширение postgres_fdw
CREATE EXTENSION postgres_fdw;
-- создаем сервер для подключения к сторонней базе данных
CREATE SERVER IF NOT EXISTS foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host ‘172.25.0.3’, port '5432', dbname 'postgres');
-- создаем сопоставление пользователей
-- (рекомендуется заводить отдельные учетные записи для postgres_fdw с ролью,
-- которая будет зависеть от того, хотите ли вы изменять исходные таблицы)
CREATE USER MAPPING IF NOT EXISTS FOR postgres
SERVER foreign_server
OPTIONS (user ‘postgres’, password ‘postgres’);
-- импортируем нашу табличку:
IMPORT FOREIGN SCHEMA public
LIMIT TO (transactions)
FROM SERVER foreign_server
INTO foreign_tables
Теперь для обращения к удаленным данным нам достаточно выполнить операцию:
SELECT * FROM foreign_tables.transactions
Настройка dbt
Для начала установим необходимые зависимости:
pip install dbt
DBT — это мощный инструмент для выполнения ELT процессов. Он позволяет разрабатывать, тестировать и запускать SQL скрипты, а также создавать модули и переиспользовать код, что помогает сократить время разработки и поддержки процессов ELT.
Детальное описание dbt
выходит за рамки этой статьи, но я бы рекомендовал почитать эту статью, ну и, разумеется, документацию)
Мы можем инициализировать проект dbt
, выполнив:
dbt init
Это создаст необходимую структуру папок, но для нашего игрушечного примера она немного избыточна, поэтому создадим директорию c такой структурой:
tutorial/
├── models
│ └── transactions.sql
├── dbt_project.yml
└── profiles.yml
В директории models/
мы разместим нашу sql модель. Модель в dbt
— это набор SQL запросов, которые описывают, как преобразовать или агрегировать входные данные, чтобы создать структурированные таблицы для представления данных в нашей базе данных. Модели являются основным компонентом dbt
проекта.
transactions.sql
{{
config(
schema='row',
materialized='table'
)
}}
select * from foreign_tables.transactions
Здесь мы сообщаем dbt
, что хотим создать таблицу transactions (таблицы или представления создаются по имени файла) и материализовать ее именно, как таблицу. Про материализации можно прочитать здесь.
И добавляем простой запрос SELECT для репликации наших исходных данных.
Каждый проект dbt
должен содержать файл dbt_project.yml
— так dbt
узнает, что каталог является проектом dbt
. Список доступных конфигураций можно посмотреть здесь.
dbt_project.yml
name: 'tutorial'
version: '2.0.0'
config-version: 2
profile: 'tutorial'
model-paths:
- models
target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
Также нам потребуется файл profiles.yml, который будет содержать сведения о подключении к нашей платформе данных.
profiles.yml
config:
send_anonymous_usage_stats: False
use_colors: True
partial_parse: True
tutorial:
target: prod
outputs:
prod:
type: postgres
thread: 2
host: localhost
port: 5430
user: postgres
pass: postgres
dbname: postgres
schema: dbt
Мы готовы запустить наш маленький dbt
проект:
dbt run –project-dir . – profiles-dir . -m transactions
Мы реплицировали таблицу из сторонней базы данных Postgres
. Основная причина использовать dbt
для репликации внешних таблиц в локальные — сложные запросы к стороннему серверу могут выполняться бесконечно, особенно для больших таблиц.
Но я столкнулся с еще одной проблемой — поскольку я реплицировал данные с реплик, у меня было ограничение подключения в 30 секунд и некоторые таблицы не успевали материализоваться. Поэтому я решил перейти на материализованные представления.
DBT
не поддерживает материализованные представления из коробки, поэтому в наш проект необходимо добавить пакет materialized-views
.
Идем в репозиторий и копируем папку materialized-views
в нашу директорию. Добавим в каталог models
новый transactions_view.sql:
transactions_view.sql
{{
config(
schema='row',
materialized='materialized_view',
)
}}
select * from foreign_tables.transactions
Добавляем файл tutorials/packages.yml:
packages.yml
packages:
- local: ./materialized-views
А также создадим каталог tutorial/macros
и добавим туда файл *.sql:
*.sql
{% macro drop_relation(relation) -%}
{{ return(dbt_labs_materialized_views.drop_relation(relation)) }}
{% endmacro %}
{% macro postgres__list_relations_without_caching(schema_relation) %}
{{ return(dbt_labs_materialized_views.postgres__list_relations_without_caching(schema_relation)) }}
{% endmacro %}
{% macro postgres_get_relations() %}
{{ return(dbt_labs_materialized_views.postgres_get_relations()) }}
{% endmacro %}
Проверим, что наша структура проекта соответствует следующей:
tutorial/
├── dbt_project.yml
├── macros
│ └── *.sql
├── materialized-views
│ ├── dbt_project.yml
│ ├── integration_tests
│ ├── macros
│ └── README.md
├── models
│ ├── transactions.sql
│ └── transactions_view.sql
├── packages.yml
└── profiles.yml
Запустим нашу модель, предварительно установив пакет materialized-views
:
dbt deps --project-dir . --profiles-dir .
dbt run --project-dir . --profiles-dir . -m transactions_view
Отлично, наше представление материализовалось!
Повторный запуск команды будет аналогичен выполнению команды REFRESH
:
Теперь мы можем интегрировать наш dbt
проект в какой-нибудь оркестратор ELT пайплайнов, например, Dagster
или AirFlow,
чтобы запускать обновление наших материализаций по расписанию и постороить поверх этих материализованных представлений другие таблицы или инкрементные материализации.
Решение не идеальное, но для меня пока работает) Буду рад советам)