Предыстория
Нежданно ни гадано, затеяли значит высшие "итишные" силы включить новые заморские очереди Кафка в уже выполненный на 4/3 проект и слава богу, что только для внешних взаимодействий и передачи всякой информации туды-сюды. Главный архитектор дал благословение и понеслось, да не туда, так как нести то некому это невиданное заморское чудо. Что делать, в обозримые сроки не впихнуть и перед боярами чин и обязательства не сдержать. Посидел РП, погоревал, да сдул пыль со знаний древних и ранее опробованных и тут понеслось.
Что есть
Maria DB.
Список таблиц с данными из внешней системы.
Давление обязательств по срокам.
Творческий "ит зуд" .
Требования
По кнопке обогащать данные таблиц основной базы из зеркал
Возможность на лету или почти на лету генерировать репликационные скрипты
Операции вставка, обновление
Иметь возможность через справочник настраивать: количество таблиц, поля для сравнения изменений, поля для вставки
Немного теории или что такое репликация?
Если знаете, то смело можете пропускать и двигаться к следующему заголовку.
Репликация — это процесс копирования данных из одной базы данных в другую. Репликация используется для обеспечения доступности данных, повышения производительности и снижения нагрузки на основную базу данных.
Существует несколько типов репликации:
* Синхронная репликация — это процесс копирования данных из основной базы данных в реплики в режиме реального времени. Синхронная репликация обеспечивает высокую доступность данных, но может снижать производительность основной базы данных.
* Асинхронная репликация — это процесс копирования данных из основной базы данных в реплики с некоторой задержкой. Асинхронная репликация позволяет повысить производительность основной базы данных, но может снижать доступность данных.
Репликация может быть выполнена различными способами:
* Репликация на уровне базы данных — это процесс копирования всей базы данных из основной в реплики. Репликация на уровне базы данных обеспечивает полную синхронизацию данных, но может занимать много времени и ресурсов.
* Репликация на уровне таблицы — это процесс копирования отдельных таблиц из основной базы данных в реплики. Репликация на уровне таблицы позволяет выборочно копировать данные, но может приводить к несогласованности данных.
* Репликация на уровне строки — это процесс копирования отдельных строк из основной базы данных в реплики. Репликация на уровне строки позволяет наиболее точно копировать данные, но может быть сложной в реализации.
Для репликации данных используются различные технологии и инструменты, такие как MySQL, PostgreSQL, Oracle, Microsoft SQL Server, MongoDB и другие.
Репликация данных имеет ряд преимуществ:
* Повышение доступности данных — репликация позволяет обеспечить доступ к данным даже в случае сбоя основной базы данных.
* Повышение производительности — репликация позволяет распределить нагрузку на чтение данных между основной базой данных и репликами.
* Снижение нагрузки на основную базу данных — репликация позволяет снизить количество операций чтения и записи в основную базу данных.
* Обеспечение целостности данных — репликация позволяет обеспечить согласованность данных между основной базой данных и репликами.
Репликация также имеет некоторые недостатки:
* Увеличение объёма данных — репликация приводит к увеличению объёма данных, которые необходимо хранить и обрабатывать.
* Увеличение сложности системы — репликация усложняет систему, добавляя дополнительные компоненты и связи.
* Увеличение затрат — репликация требует дополнительных ресурсов, таких как оборудование, программное обеспечение и персонал.
Выбор типа репликации и технологии репликации зависит от конкретных требований и условий проекта.
Немного кода
Сформировать список полей на основании которых будут формировать скрипты
select TABLE_NAME, ordinal_position as column_id,
column_name as column_name,
data_type as data_type,
case when numeric_precision is not null
then numeric_precision
else character_maximum_length end as max_length,
case when datetime_precision is not null
then datetime_precision
when numeric_scale is not null
then numeric_scale
else 0 end as data_precision,
is_nullable,
column_default
from information_schema.columns
where table_name LIKE 'miror%' -- put table name here
-- and table_schema = 'schema name' -- put schema name here
order by TABLE_NAME, column_id;
Сохранить списки полей в справочник(таблицу), чтобы было удобней настраивать .
Создаём таблицу.
--Создаём таблицу
CREATE TABLE IF NOT EXISTS `miror_heads_sql` (
`table_name` varchar(1000) DEFAULT NULL,
`insert_miror` varchar(1000) DEFAULT NULL,
`insert_s` varchar(1000) DEFAULT NULL,
`update_miror` varchar(1000) DEFAULT NULL,
`update_s` varchar(4000) DEFAULT NULL,
`compare_miror` varchar(4000) DEFAULT NULL,
`compare_s` varchar(4000) DEFAULT NULL,
`orderby` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Таблица для сбора запросов merge';
Закидываю текущие поля зеркал с префиксом "miror".
-- Закидываю текущие поля
select TABLE_NAME, column_name, column_name, column_name, column_name, column_name, column_name,
ordinal_position as column_id
from information_schema.columns
where table_name LIKE 'miror%' -- Зеркала содержать префикс mirror
-- and table_schema = 'schema name' -- put schema name here
order by TABLE_NAME, column_id;
Разрабатываю генератор репликационных скриптов Insert, Update. Удаления данных не будет.
В моём случае все таблицы содержат уникальное поле ISN, и есть только добавление и изменения данных.
SELECT U.table_name, /* Наименование таблицы*/
(SELECT CONCAT( 'INSERT INTO ', SUBSTR(U.table_name, 7) ,' ( ', GROUP_CONCAT( CONCAT(s.insert_s, ' ')) ,' ) SELECT ', GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' ')) ,' FROM ', s.table_name ,' ma LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ')
FROM miror_heads_sql s
WHERE not s.insert_s IS NULL
AND NOT s.insert_miror IS NULL
AND not s.insert_s = ''
AND NOT s.insert_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC) INS_SQL /*Скрипт для вставки новых записей из зеркал*/,
( SELECT
CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.', s.compare_s, '=ma.', s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND (',
replace(GROUP_CONCAT(CONCAT(' NOT ma.', s.compare_miror, '=a.', s.compare_s) ),',',' OR ') , ' )' )
FROM miror_heads_sql s
WHERE NOT s.compare_miror IS null
AND NOT s.compare_s IS NULL
AND NOT s.compare_s = ''
AND NOT s.compare_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC ) UPD_SQL /* Cкрипт для обновления данных из зеркал */
FROM (select DISTINCT s.table_name from miror_heads_sql s) U;
Разрабатываю генератор репликационных скриптов Insert, Update. Удаления данных не будет.
Создал таблицу уже с данными при помощи CREATE_TABLE+ добавил поле ORDERBY для приоритета выполнения скрипта.
Create table miror_sql as
SELECT U.table_name,
(SELECT CONCAT( 'INSERT INTO ', SUBSTR(U.table_name, 7) ,' ( ', GROUP_CONCAT( CONCAT(s.insert_s, ' ')) ,' ) SELECT ', GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' ')) ,' FROM ', s.table_name ,' ma LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ')
FROM miror_heads_sql s
WHERE not s.insert_s IS NULL
AND NOT s.insert_miror IS NULL
AND not s.insert_s = ''
AND NOT s.insert_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC) INS_SQL,
( SELECT
CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.', s.compare_s, '=ma.', s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND (',
replace(GROUP_CONCAT(CONCAT(' NOT ma.', s.compare_miror, '=a.', s.compare_s) ),',',' OR ') , ' )' )
FROM miror_heads_sql s
WHERE NOT s.compare_miror IS null
AND NOT s.compare_s IS NULL
AND NOT s.compare_s = ''
AND NOT s.compare_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC ) UPD_SQL, ROWNUM() ORDERBY
FROM (select DISTINCT s.table_name from miror_heads_sql s) U;
-
Сохранить списки полей в справочник(таблицу), чтобы было удобней настраивать .
Создал таблицу уже с данными при помощи CREATE_TABLE и добавил поле ORDERBY для приоритета выполнения скрипта.
Create table miror_sql as
SELECT U.table_name,
(SELECT CONCAT( 'INSERT INTO ', SUBSTR(U.table_name, 7) ,' ( ', GROUP_CONCAT( CONCAT(s.insert_s, ' ')) ,' ) SELECT ', GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' ')) ,' FROM ', s.table_name ,' ma LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ')
FROM miror_heads_sql s
WHERE not s.insert_s IS NULL
AND NOT s.insert_miror IS NULL
AND not s.insert_s = ''
AND NOT s.insert_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC) INS_SQL,
( SELECT
CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.', s.compare_s, '=ma.', s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND (',
replace(GROUP_CONCAT(CONCAT(' NOT ma.', s.compare_miror, '=a.', s.compare_s) ),',',' OR ') , ' )' )
FROM miror_heads_sql s
WHERE NOT s.compare_miror IS null
AND NOT s.compare_s IS NULL
AND NOT s.compare_s = ''
AND NOT s.compare_miror = ''
AND s.table_name= U.table_name
ORDER BY orderby DESC ) UPD_SQL, ROWNUM() ORDERBY
FROM (select DISTINCT s.table_name from miror_heads_sql s) U;
-
Настройка и проверка на соответствие требованиям.
1. Каждый скрипт необходимо проверить на корректность выполнения и если есть ошибки, то внести исправления.
2. Выставить приоритеты и ещё раз всё проверить
-
Создать функцию, которая умеет выполнять в цикле сгенерированные скрипты с учётом приоритета.
DELIMITER // CREATE PROCEDURE `Repl_DO`( IN `Result` VARCHAR(8000) ) BEGIN DECLARE T_NAME VARCHAR(8000) DEFAULT FALSE; DECLARE i_SQL VARCHAR(8000) DEFAULT FALSE; DECLARE u_SQL VARCHAR(8000) DEFAULT FALSE; DECLARE done INT DEFAULT FALSE; DECLARE cur1 CURSOR FOR SELECT a.table_name, a.insert_sql, a.update_sql FROM absolut_test.miror_sql a ORDER BY a.orderby; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; OPEN cur1; read_loop: LOOP FETCH cur1 INTO T_NAME, i_SQL, u_SQL; INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _INS')); EXECUTE IMMEDIATE i_SQL; EXECUTE IMMEDIATE u_SQL; IF done THEN LEAVE read_loop; END IF; END LOOP; CLOSE cur1; END// DELIMITER ;
Логи наше всё. Просто добавь немного логов
Чтобы отслеживать на каких таблицах падает наша репликация добавим таблицу для логов.
CREATE TABLE IF NOT EXISTS `miror_log` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `dt` timestamp NULL DEFAULT current_timestamp(), `Msg` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Добавим в процедуру
Repl_DO
запись в логи.DELIMITER // CREATE PROCEDURE `Repl_DO`( IN `Result` VARCHAR(8000) ) BEGIN DECLARE T_NAME VARCHAR(8000) DEFAULT FALSE; DECLARE i_SQL VARCHAR(8000) DEFAULT FALSE; DECLARE u_SQL VARCHAR(8000) DEFAULT FALSE; DECLARE done INT DEFAULT FALSE; DECLARE cur1 CURSOR FOR SELECT a.table_name, a.insert_sql, a.update_sql FROM absolut_test.miror_sql a ORDER BY a.orderby; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; OPEN cur1; read_loop: LOOP FETCH cur1 INTO T_NAME, i_SQL, u_SQL; INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _INS')); --Запись в логи наименования таблицы и скрипта EXECUTE IMMEDIATE i_SQL; INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _UPD')); --Запись в логи наименования таблицы и скрипта EXECUTE IMMEDIATE u_SQL; IF done THEN LEAVE read_loop; END IF; END LOOP; CLOSE cur1; END// DELIMITER ;
Отладка и тестирование сгенерированных скриптов.
Временная репликация делается для боевой системы, которая будет работать со 100 тыс + реальными клиентами и поэтому необходимо ответственно подойти к тестированию и отладке сгенерированных скриптов. Детали описывать не буду т.к. это относится к методологии тестирования, но оставлю основные пункты.
Написание тест-кейсов. Разработка подробных сценариев тестирования, описывающих, как должен работать функционал, включая входные данные, ожидаемые результаты и критерии прохождения теста.
Подготовка тестовых данных. Создаём или используем существующие данные, которые будут использоваться в процессе тестирования.
Проведение тестирования. Выполнения тестов, следуя разработанным сценариям, и сравниваем полученные результаты с ожидаемыми.
Составление отчета о результатах. После завершения тестирования собираем информацию о найденных ошибках.
Если нет ошибок переходим к п.8.
Исправляем найденные в предыдущих пунктах ошибки.
Регрессионное тестирование. После внесения изменений в функционал проведите повторное тестирование, чтобы убедиться, что существующие функции не были нарушены.
Если есть ошибки переходим к п.6.
Отладка завершена, можно публиковать и переходить к опытной эксплуатации.
Эти этапы помогают обеспечить качество и надежность нового функционала, а также его соответствие требованиям.
-
Запуск скриптов
Загрузить в зеркала данные
Запустить процедуру CALL Repl_DO(@A) и дождаться окончания
-
Если возникала ошибка, то смотрим на какой таблице остановился механизм репликации через запрос:
SELECT * FROM miror_log o ORDER BY o.id desc
-
Развитие
В ходе любой разработки приходят идеи по улучшению и оптимизации разрабатываемого функционала и эта разработка не стала исключением.
Детальные логи(Как получить в Марии?)
Эмуляция транзакции за счёт дополнительных таблиц (Commit, Rolback)
Добавить возможность изменять первичные ключи для любой таблицы из справочника
Автоматическая приоритезация выполнения скриптов на основе связей между таблицами( в т.ч. динамическая если связь зависит от заполнения данных)
-
Автоматическая генерация зеркал и их первичное заполнение
Итог
И овцы живы и волки сыты.
Как итог всё требования удовлетворены и есть время по взрослому реализовать механизм передачи данных через брокер сообщений Kafka. Если вам понравилась статья ставьте лайк. В моих "архивах" за 20 летний стаж работы в ИТ лежит много разных подходов и забавных историй, которыми я по возможности буду делиться с тобой, мой дорого читатель.