Предыстория


Нежданно ни гадано, затеяли значит высшие "итишные" силы включить новые заморские очереди Кафка в уже выполненный на 4/3 проект и слава богу, что только для внешних взаимодействий и передачи всякой информации туды-сюды. Главный архитектор дал благословение и понеслось, да не туда, так как нести то некому это невиданное заморское чудо. Что делать, в обозримые сроки не впихнуть и перед боярами чин и обязательства не сдержать. Посидел РП, погоревал, да сдул пыль со знаний древних и ранее опробованных и тут понеслось.

Что есть

  1. Maria DB.

  2. Список таблиц с данными из внешней системы.

  3. Давление обязательств по срокам.

  4. Творческий "ит зуд" .

Требования

  1. По кнопке обогащать данные таблиц основной базы из зеркал

  2. Возможность на лету или почти на лету генерировать репликационные скрипты

  3. Операции вставка, обновление

  4. Иметь возможность через справочник настраивать: количество таблиц, поля для сравнения изменений, поля для вставки

Немного теории или что такое репликация?

Если знаете, то смело можете пропускать и двигаться к следующему заголовку.

Репликация — это процесс копирования данных из одной базы данных в другую. Репликация используется для обеспечения доступности данных, повышения производительности и снижения нагрузки на основную базу данных.

 Существует несколько типов репликации:

* Синхронная репликация — это процесс копирования данных из основной базы данных в реплики в режиме реального времени. Синхронная репликация обеспечивает высокую доступность данных, но может снижать производительность основной базы данных.

* Асинхронная репликация — это процесс копирования данных из основной базы данных в реплики с некоторой задержкой. Асинхронная репликация позволяет повысить производительность основной базы данных, но может снижать доступность данных.

 Репликация может быть выполнена различными способами:

* Репликация на уровне базы данных — это процесс копирования всей базы данных из основной в реплики. Репликация на уровне базы данных обеспечивает полную синхронизацию данных, но может занимать много времени и ресурсов.

* Репликация на уровне таблицы — это процесс копирования отдельных таблиц из основной базы данных в реплики. Репликация на уровне таблицы позволяет выборочно копировать данные, но может приводить к несогласованности данных.

* Репликация на уровне строки — это процесс копирования отдельных строк из основной базы данных в реплики. Репликация на уровне строки позволяет наиболее точно копировать данные, но может быть сложной в реализации.

 Для репликации данных используются различные технологии и инструменты, такие как MySQL, PostgreSQL, Oracle, Microsoft SQL Server, MongoDB и другие.

 

Репликация данных имеет ряд преимуществ:

* Повышение доступности данных — репликация позволяет обеспечить доступ к данным даже в случае сбоя основной базы данных.

* Повышение производительности — репликация позволяет распределить нагрузку на чтение данных между основной базой данных и репликами.

* Снижение нагрузки на основную базу данных — репликация позволяет снизить количество операций чтения и записи в основную базу данных.

* Обеспечение целостности данных — репликация позволяет обеспечить согласованность данных между основной базой данных и репликами.

 

Репликация также имеет некоторые недостатки:

* Увеличение объёма данных — репликация приводит к увеличению объёма данных, которые необходимо хранить и обрабатывать.

* Увеличение сложности системы — репликация усложняет систему, добавляя дополнительные компоненты и связи.

* Увеличение затрат — репликация требует дополнительных ресурсов, таких как оборудование, программное обеспечение и персонал.

 Выбор типа репликации и технологии репликации зависит от конкретных требований и условий проекта.

Немного кода

  1. Сформировать список полей на основании которых будут формировать скрипты

select TABLE_NAME, ordinal_position as column_id,
    column_name as column_name,
    data_type as data_type,
    case when numeric_precision is not null
              then numeric_precision
        else character_maximum_length end as max_length,
    case when datetime_precision is not null
              then datetime_precision
        when numeric_scale is not null
             then numeric_scale
        else 0 end as data_precision,
    is_nullable,
    column_default
from information_schema.columns
where table_name LIKE  'miror%' -- put table name here
--    and table_schema = 'schema name' -- put schema name here
order by TABLE_NAME, column_id;
  1. Сохранить списки полей в справочник(таблицу), чтобы было удобней настраивать .

Создаём таблицу.

--Создаём таблицу
CREATE TABLE IF NOT EXISTS `miror_heads_sql` (
  `table_name` varchar(1000) DEFAULT NULL,
  `insert_miror` varchar(1000) DEFAULT NULL,
  `insert_s` varchar(1000) DEFAULT NULL,
  `update_miror` varchar(1000) DEFAULT NULL,
  `update_s` varchar(4000) DEFAULT NULL,
  `compare_miror` varchar(4000) DEFAULT NULL,
  `compare_s` varchar(4000) DEFAULT NULL,
  `orderby` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Таблица для сбора запросов merge';

Закидываю текущие поля зеркал с префиксом "miror".

-- Закидываю текущие поля
select TABLE_NAME,     column_name, column_name, column_name, column_name, column_name, column_name,
   ordinal_position as column_id 
from information_schema.columns
where table_name LIKE  'miror%' -- Зеркала содержать префикс mirror 
--    and table_schema = 'schema name' -- put schema name here
order by TABLE_NAME, column_id;
  1. Разрабатываю генератор репликационных скриптов Insert, Update. Удаления данных не будет.

В моём случае все таблицы содержат уникальное поле ISN, и есть только добавление и изменения данных.


SELECT U.table_name,  /* Наименование таблицы*/
(SELECT CONCAT( 'INSERT INTO ',  SUBSTR(U.table_name, 7)  ,' ( ',  GROUP_CONCAT( CONCAT(s.insert_s, ' '))  ,' ) SELECT ',  GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' '))  ,' FROM ',  s.table_name  ,' ma  LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ') 
  FROM miror_heads_sql s
 WHERE not s.insert_s IS NULL 
    AND NOT s.insert_miror IS NULL 
    AND not s.insert_s = '' 
    AND NOT s.insert_miror = '' 
     AND s.table_name=   U.table_name  
 ORDER BY orderby DESC) INS_SQL /*Скрипт для  вставки новых записей из зеркал*/,
( SELECT  
  CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a  LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.',  s.compare_s, '=ma.',  s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND  (', 
  replace(GROUP_CONCAT(CONCAT(' NOT ma.',  s.compare_miror, '=a.',  s.compare_s) ),',',' OR ') , ' )' )    
  FROM miror_heads_sql s
 WHERE NOT s.compare_miror IS null
   AND NOT s.compare_s IS NULL 
   AND NOT s.compare_s = ''
   AND NOT s.compare_miror = ''
    AND s.table_name=   U.table_name
 ORDER BY orderby DESC ) UPD_SQL /* Cкрипт для обновления данных из зеркал */
 FROM (select DISTINCT s.table_name from miror_heads_sql s)  U;

  1. Разрабатываю генератор репликационных скриптов Insert, Update. Удаления данных не будет.

Создал таблицу уже с данными при помощи CREATE_TABLE+ добавил поле ORDERBY для приоритета выполнения скрипта. 

Create table miror_sql as 
SELECT U.table_name, 
(SELECT CONCAT( 'INSERT INTO ',  SUBSTR(U.table_name, 7)  ,' ( ',  GROUP_CONCAT( CONCAT(s.insert_s, ' '))  ,' ) SELECT ',  GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' '))  ,' FROM ',  s.table_name  ,' ma  LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ') 
  FROM miror_heads_sql s
 WHERE not s.insert_s IS NULL 
    AND NOT s.insert_miror IS NULL 
    AND not s.insert_s = '' 
    AND NOT s.insert_miror = '' 
     AND s.table_name=   U.table_name  
 ORDER BY orderby DESC) INS_SQL,
( SELECT  
  CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a  LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.',  s.compare_s, '=ma.',  s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND  (', 
  replace(GROUP_CONCAT(CONCAT(' NOT ma.',  s.compare_miror, '=a.',  s.compare_s) ),',',' OR ') , ' )' )    
  FROM miror_heads_sql s
 WHERE NOT s.compare_miror IS null
   AND NOT s.compare_s IS NULL 
   AND NOT s.compare_s = ''
   AND NOT s.compare_miror = ''
    AND s.table_name=   U.table_name
 ORDER BY orderby DESC ) UPD_SQL, ROWNUM() ORDERBY
 FROM (select DISTINCT s.table_name from miror_heads_sql s)  U; 
  1. Сохранить списки полей в справочник(таблицу), чтобы было удобней настраивать .

    Создал таблицу уже с данными при помощи CREATE_TABLE и добавил поле ORDERBY для приоритета выполнения скрипта.

Create table miror_sql as 
SELECT U.table_name, 
(SELECT CONCAT( 'INSERT INTO ',  SUBSTR(U.table_name, 7)  ,' ( ',  GROUP_CONCAT( CONCAT(s.insert_s, ' '))  ,' ) SELECT ',  GROUP_CONCAT( CONCAT('ma.', s.insert_miror, ' '))  ,' FROM ',  s.table_name  ,' ma  LEFT JOIN ', SUBSTR(U.table_name, 7) ,' a ON ma.isn=a.isn WHERE a.isn IS NULL; ') 
  FROM miror_heads_sql s
 WHERE not s.insert_s IS NULL 
    AND NOT s.insert_miror IS NULL 
    AND not s.insert_s = '' 
    AND NOT s.insert_miror = '' 
     AND s.table_name=   U.table_name  
 ORDER BY orderby DESC) INS_SQL,
( SELECT  
  CONCAT('UPDATE ', SUBSTR(U.table_name, 7) ,' a  LEFT JOIN ', U.table_name ,' ma ON ma.isn=a.isn SET ', GROUP_CONCAT(CONCAT(' a.',  s.compare_s, '=ma.',  s.compare_miror)), ' WHERE ma.isn IS NOT NULL AND  (', 
  replace(GROUP_CONCAT(CONCAT(' NOT ma.',  s.compare_miror, '=a.',  s.compare_s) ),',',' OR ') , ' )' )    
  FROM miror_heads_sql s
 WHERE NOT s.compare_miror IS null
   AND NOT s.compare_s IS NULL 
   AND NOT s.compare_s = ''
   AND NOT s.compare_miror = ''
    AND s.table_name=   U.table_name
 ORDER BY orderby DESC ) UPD_SQL, ROWNUM() ORDERBY
 FROM (select DISTINCT s.table_name from miror_heads_sql s)  U; 

  1. Настройка и проверка на соответствие требованиям.

    1. Каждый скрипт необходимо проверить на корректность выполнения и если есть ошибки, то внести исправления.

    2. Выставить приоритеты и ещё раз всё проверить 

  2. Создать функцию, которая умеет выполнять в цикле сгенерированные скрипты с учётом приоритета.

    DELIMITER //
    CREATE PROCEDURE `Repl_DO`(
        IN `Result` VARCHAR(8000)
    )
    BEGIN
     DECLARE T_NAME VARCHAR(8000)  DEFAULT FALSE;
     DECLARE i_SQL VARCHAR(8000)  DEFAULT FALSE;
     DECLARE u_SQL VARCHAR(8000)  DEFAULT FALSE;
     
     DECLARE done INT DEFAULT FALSE;
      DECLARE cur1 CURSOR FOR SELECT a.table_name, a.insert_sql, a.update_sql FROM absolut_test.miror_sql a ORDER BY a.orderby;
      DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
      OPEN cur1;
      
      read_loop: LOOP
        FETCH cur1 INTO T_NAME, i_SQL, u_SQL;
        INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _INS')); 
        
        EXECUTE IMMEDIATE i_SQL;
        
         
        EXECUTE IMMEDIATE u_SQL;
        
        
        IF done THEN
          LEAVE read_loop;
        END IF;
        
        
      END LOOP;
      CLOSE cur1;
      
    END//
    DELIMITER ;
    

    1. Логи наше всё. Просто добавь немного логов

    Чтобы отслеживать на каких таблицах падает наша репликация добавим таблицу для логов.

    CREATE TABLE IF NOT EXISTS `miror_log` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `dt` timestamp NULL DEFAULT current_timestamp(),
      `Msg` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

    Добавим в процедуру Repl_DO запись в логи.

    
    
    DELIMITER //
    CREATE PROCEDURE `Repl_DO`(
        IN `Result` VARCHAR(8000)
    )
    BEGIN
     DECLARE T_NAME VARCHAR(8000)  DEFAULT FALSE;
     DECLARE i_SQL VARCHAR(8000)  DEFAULT FALSE;
     DECLARE u_SQL VARCHAR(8000)  DEFAULT FALSE;
     
     DECLARE done INT DEFAULT FALSE;
      DECLARE cur1 CURSOR FOR SELECT a.table_name, a.insert_sql, a.update_sql FROM absolut_test.miror_sql a ORDER BY a.orderby;
      DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
      OPEN cur1;
      
      read_loop: LOOP
        FETCH cur1 INTO T_NAME, i_SQL, u_SQL;
        INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _INS')); --Запись в логи наименования таблицы и скрипта 
        
        EXECUTE IMMEDIATE i_SQL;
        
        INSERT INTO miror_log (msg) VALUES(CONCAT(T_NAME, ' _UPD')); --Запись в логи наименования таблицы и скрипта 
         
        EXECUTE IMMEDIATE u_SQL;
        
        
        IF done THEN
          LEAVE read_loop;
        END IF;
        
        
      END LOOP;
      CLOSE cur1;
      
    END//
    DELIMITER ;
    

    1. Отладка и тестирование сгенерированных скриптов.

    Временная репликация делается для боевой системы, которая будет работать со 100 тыс + реальными клиентами и поэтому необходимо ответственно подойти к тестированию и отладке сгенерированных скриптов. Детали описывать не буду т.к. это относится к методологии тестирования, но оставлю основные пункты.

    1. Написание тест-кейсов. Разработка подробных сценариев тестирования, описывающих, как должен работать функционал, включая входные данные, ожидаемые результаты и критерии прохождения теста.

    2. Подготовка тестовых данных. Создаём или используем существующие данные, которые будут использоваться в процессе тестирования.

    3. Проведение тестирования. Выполнения тестов, следуя разработанным сценариям, и сравниваем полученные результаты с ожидаемыми.

    4. Составление отчета о результатах. После завершения тестирования собираем информацию о найденных ошибках.

    5. Если нет ошибок переходим к п.8.

    6. Исправляем найденные в предыдущих пунктах ошибки.

    7. Регрессионное тестирование. После внесения изменений в функционал проведите повторное тестирование, чтобы убедиться, что существующие функции не были нарушены.

    8. Если есть ошибки переходим к п.6.

    9. Отладка завершена, можно публиковать и переходить к опытной эксплуатации.

    Эти этапы помогают обеспечить качество и надежность нового функционала, а также его соответствие требованиям.

  1. Запуск скриптов

    1. Загрузить в зеркала данные

    2. Запустить процедуру  CALL Repl_DO(@A) и дождаться окончания

    3. Если возникала ошибка, то смотрим на какой таблице остановился механизм репликации через запрос:

       SELECT * FROM miror_log o  ORDER BY o.id desc

  2. Развитие

    В ходе любой разработки приходят идеи по улучшению и оптимизации разрабатываемого функционала и эта разработка не стала исключением.

    1. Детальные логи(Как получить в Марии?)

    2. Эмуляция транзакции за счёт дополнительных таблиц (Commit, Rolback) 

    3. Добавить возможность изменять первичные ключи для любой таблицы из справочника

    4. Автоматическая приоритезация выполнения скриптов на основе связей между таблицами( в т.ч. динамическая если   связь зависит от заполнения данных)

    5. Автоматическая генерация зеркал и их первичное заполнение

Итог

И овцы живы и волки сыты.

Как итог всё требования удовлетворены и есть время по взрослому реализовать механизм передачи данных через брокер сообщений Kafka. Если вам понравилась статья ставьте лайк. В моих "архивах" за 20 летний стаж работы в ИТ лежит много разных подходов и забавных историй, которыми я по возможности буду делиться с тобой, мой дорого читатель.

А как вы считает РП нужны скилы разработчика/аналитика для эффективной работы?

Комментарии (0)