Необходимость переноса данных из одной среды в другую — задача, с которой разработчики сталкиваются достаточно часто. Например, для отправки таблиц из прода в среды для тестирования. Вместе с тем, такая «перезаливка» таблиц нередко превращается в настоящий квест, по ходу которого нужно не только гарантировать сохранность данных, но и исключить ошибки, связанные с человеческим фактором. Поэтому лучшей практикой является автоматизация переноса.
Меня зовут Евгений Грибков. Я ведущий программист в центре технологий VK. В этой статье мы рассмотрим одно из возможных решений создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL.
Типовой алгоритм перезаливки таблиц в базе данных
Обычно в ситуациях, когда нужно перезалить данные в таблицах БД из одной среды в другую, применяют шаблонный алгоритм восстановления базы:
делают резервную копию с прода;
восстанавливают бэкап на отдельной изолированной, промежуточной среде;
производят обезличивание или изменение персональных данных;
создают бэкап с полученной базы данных;
восстанавливают резервную копию в нужные среды (например, для разработки и тестирования).
Обычно весь описанный алгоритм автоматизирован и работает по расписанию (например, раз в сутки ночью или раз в неделю на выходных).
Но у такого подхода есть существенный недостаток: каждый раз восстанавливать всю базу данных во все среды — весьма длительный и очень дорогой в плане занимаемого места процесс. Поэтому в алгоритме нередко предусматривают чистки как исторических данных до определенного момента времени (например, всё, что старее одного календарного года), так и по определённым критериям. Таким образом достигается уменьшение объема БД в десятки, а то и в сотни раз.
Также надо учитывать, что в средах может быть несколько БД, в которые надо периодически догружать данные. Причем желательно, чтобы была возможность делать это в моменты, когда БД не используется активно — например, раз в неделю ночью или каждую ночь.
Соответственно, в таких кейсах важна автоматизация, которая дает возможность перезаливать таблицы БД из одной среды в другую без ручного контроля и глобального вмешательства разработчиков.
Вариант реализации скрипта
Теперь перейдем от теории к практике. Рассмотрим один из вариантов создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL. При этом сразу оговоримся и примем условие, что обе БД на одном экземпляре СУБД — то есть, БД‑источник уже восстановлен в той же СУБД, в которой находится целевая БД.
Алгоритм работы такого скрипта будет следующим:
Отключить все ограничения.
Отключить все триггеры.
Для заданных таблиц сохранить все внешние ключи, после чего удалить их.
Произвести полную очистку заданных таблиц через команду
TRUNCATE
с последующим их заполнением данными.Удалить битые данные.
Обновить статистики перезалитых выше таблиц.
Включить триггеры.
Восстановить внешние ключи.
Включить и перепроверить все ограничения.
Теперь приступим к реализации.
Для начала определим все необходимые переменные и временные таблицы
SET NOCOUNT OFF;
DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()),
@DBMaster VARCHAR(255) = 'БД-мастер',
@ERROR VARCHAR(MAX);
DECLARE @HistoryLimited bit = 1,
@table_name nvarchar(255),
@is_identity int = 0,
@stm nvarchar(max) = '',
@cols nvarchar(max) = '',
@IsNOTInsert bit,
@schema_name nvarchar(255),
@col_name_identity nvarchar(255),
@referencing_object nvarchar(255),
@referenced_object nvarchar(255),
@constraint_name nvarchar(255),
@referencing_columns nvarchar(max),
@referenced_columns nvarchar(max),
@rules nvarchar(max),
@key_cols nvarchar(max),
@StartMoment DATETIME2,
@FinishMoment DATETIME2,
@delete_referential_action INT,
@update_referential_action INT,
@max_row_insert INT = 100000,
@isClearTableFKs BIT = 1,
@RowCount BIGINT = 1,
@WhileDelCount INT = 0;
;
DECLARE @cnt TABLE (cnt BIGINT NOT NULL);
DROP TABLE IF EXISTS #tbl_res;
CREATE TABLE #tbl_res (
SchName NVARCHAR(255) NOT NULL,
TblName NVARCHAR(255) NOT NULL,
StartMoment DATETIME2 NOT NULL,
FinishMoment DATETIME2 NOT NULL,
Cnt BIGINT NOT NULL,
ErrorMsg NVARCHAR(MAX) NULL
);
Здесь определены переменные, которые будут использоваться далее в скрипте, а также табличная переменная, в которой будет записан итог работы перезаливки данных с таймингом.
Далее отключаем все ограничения в БД. Это можно сделать с помощью следующей команды: EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"
Отключаем все триггеры БД
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'
+ CHAR(13)
, SCHEMA_NAME(b.[schema_id])
, OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.object_id = t.parent_id
WHERE t.is_disabled = 0
AND t.type_desc = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,
OBJECT_NAME(t.parent_id) ASC;
OPEN r_cursor_trigg_off;
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
END
CLOSE r_cursor_trigg_off;
DEALLOCATE r_cursor_trigg_off;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
Далее собираем метаданные по таблицам, с которыми будем работать
DROP TABLE IF EXISTS #tbls;
CREATE TABLE #tbls (
[name] NVARCHAR(255) NOT NULL,
sch_name NVARCHAR(255) NOT NULL,
IsNOTInsert BIT NOT NULL
);
INSERT INTO #tbls (
[name],
sch_name,
IsNOTInsert
)
SELECT t.[name],
SCHEMA_NAME(t.[schema_id]) AS sch_name,
--задается правило, по которому определяем
--нужно ли после очистки наполнять данными таблицу или нет
--по умолчанию нужно (0-да, 1-нет)
0 AS IsNOTInsert
FROM sys.tables AS t
--в фильтре задаем какие таблицы брать в расчет
--(в нашем случае какие не брать в расчёт)
WHERE t.[name] NOT LIKE 'unused%'
AND t.[name] NOT LIKE 'removed%'
AND t.[name] NOT LIKE 'migrated%'
AND t.[name] NOT LIKE 'migration%'
AND t.[name] NOT LIKE 'sysdiag%'
AND t.[name] NOT LIKE 'test%'
AND t.[name] NOT LIKE 'tmp%'
AND t.[name] NOT LIKE '%_cache'
AND t.[name] NOT IN ('FKs');
Теперь соберем все внешние ключи полученных таблиц, сохраним их в таблице dbo.FKs, затем — удалим
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t
WHERE t.[name]= 'FKs'
AND t.[schema_id] = SCHEMA_ID('dbo'))
BEGIN
CREATE TABLE dbo.FKs (
referencing_object NVARCHAR(255) NOT NULL,
constraint_column_id INT NOT NULL,
referencing_column_name NVARCHAR(255) NOT NULL,
referenced_object NVARCHAR(255) NOT NULL,
referenced_column_name NVARCHAR(255) NOT NULL,
constraint_name NVARCHAR(255) NOT NULL,
delete_referential_action INT NOT NULL,
update_referential_action INT NOT NULL
);
END
ELSE IF (@isClearTableFKs = 1)
BEGIN
TRUNCATE TABLE dbo.FKs;
END
INSERT INTO dbo.FKs (
referencing_object,
constraint_column_id,
referencing_column_name,
referenced_object,
referenced_column_name,
constraint_name,
delete_referential_action,
update_referential_action
)
SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['
, OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,
FK.constraint_column_id,
CONCAT('['
, COL_NAME(FK.parent_object_id, FK.parent_column_id)
, ']') AS referencing_column_name,
CONCAT('['
, SCHEMA_NAME(R.[schema_id]), '].['
, OBJECT_NAME(FK.referenced_object_id)
, ']') AS referenced_object,
CONCAT('['
, COL_NAME(FK.referenced_object_id, FK.referenced_column_id)
, ']') AS referenced_column_name,
CONCAT('['
, OBJECT_NAME(FK.constraint_object_id)
, ']') AS constraint_name,
FKK.delete_referential_action,
FKK.update_referential_action
FROM sys.foreign_key_columns AS FK
INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]
= FK.constraint_object_id
INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id
INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id
WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0
WHERE t0.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'));
DELETE FROM trg
FROM dbo.FKs AS trg
WHERE NOT EXISTS (
SELECT 1
FROM #tbls AS src
WHERE trg.referencing_object = CONCAT('['
, src.sch_name, '].[', src.[name], ']')
OR trg.referenced_object = CONCAT('['
, src.sch_name, '].[', src.[name], ']')
)
DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name
FROM dbo.FKs AS t
WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id)
, ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_drop;
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
, ' DROP CONSTRAINT ', @constraint_name, ';');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name;
END
CLOSE r_cursor_fk_drop;
DEALLOCATE r_cursor_fk_drop;
Следом перейдем к фрагменту кода, который отвечает за очистку и наполнение данными выбранных ранее таблиц
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.[name],
t.sch_name,
t.IsNOTInsert
FROM #tbls AS t
ORDER BY t.[name] ASC;
OPEN r_cursor;
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @cols = '';
SET @is_identity = 0;
SET @col_name_identity = NULL;
SET @stm = CONCAT('TRUNCATE TABLE ', @DB
, '.[', @schema_name, '].[', @table_name, ']');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
IF (@IsNOTInsert = 0)
BEGIN
SELECT @cols = @cols + CASE WHEN @cols = ''
THEN c.[name] ELSE ',' + c.name END,
@is_identity = @is_identity + c.is_identity,
@col_name_identity = CASE WHEN (c.is_identity = 1)
THEN c.[name]
ELSE @col_name_identity END
FROM sys.tables t,
sys.columns c
WHERE t.[object_id] = c.[object_id]
AND t.[name] = @table_name
AND c.is_computed = 0;
SET @stm = '';
IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] ON');
SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB
, '.[', @schema_name, '].[', @table_name
, '](', @cols, ') SELECT ', @cols
, ' FROM [',@DBMaster,'].['
, @schema_name, '].[', @table_name, '] WITH(NOLOCK) ');
--здесь можно задать ограничение на наполнение данными
IF @HistoryLimited = 1
BEGIN
IF @table_name LIKE '%History'
SET @stm = CONCAT(@stm
, ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) ');
END
IF @is_identity > 0 SET @stm = CONCAT(@stm
, ' SET IDENTITY_INSERT ', @DB
, '.[', @schema_name, '].[', @table_name, '] OFF');
IF @is_identity > 0 SET @stm = CONCAT(@stm
, ' DBCC CHECKIDENT ("', @table_name, '")');
SET @StartMoment = SYSDATETIME();
SET @ERROR = NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
SET @FinishMoment = SYSDATETIME();
SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM '
, '[', @schema_name, '].[', @table_name, '] WITH (NOLOCK);');
DELETE FROM @cnt;
INSERT INTO @cnt (cnt)
EXEC sys.sp_executesql @stmt = @stm;
INSERT INTO #tbl_res (
SchName,
TblName,
StartMoment,
FinishMoment,
Cnt,
ErrorMsg
)
SELECT @schema_name,
@table_name,
@StartMoment,
@FinishMoment,
COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,
@ERROR;
END
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
END
CLOSE r_cursor;
DEALLOCATE r_cursor;
Обычно после этого фрагмента производятся какие‑то еще необходимые манипуляции с данными. Например, добавляются нужные пользователи и роли с правами в соответствующие таблицы БД.
Затем производится удаление битых данных, то есть тех, по которым нет для записи из одной таблицы соответствующей записи в другой таблице по внешнему ключу
WHILE (@RowCount > 0)
BEGIN
SET @RowCount = 0;
SET @WhileDelCount += 1;
DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, '=src.', t.referenced_column_name, ')'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, ' IS NOT NULL)'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols
FROM dbo.FKs AS t
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_corr;
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object
,' AS trg WHERE ', @key_cols
, ' AND NOT EXISTS (SELECT 1 FROM ', @referenced_object,
' AS src WITH (NOLOCK) WHERE ', @rules, ');');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
SET @RowCount += @@ROWCOUNT;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
END
CLOSE r_cursor_fk_corr;
DEALLOCATE r_cursor_fk_corr;
END
PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
Удаление неконсистентных данных происходит до тех пор, пока такие данные обнаруживаются. Это нужно, чтобы исключить ситуацию, когда была удалена запись из одной таблицы, но при этом была потеряна соответствующая связь в другой уже ранее обработанной таблице.
Далее обновляем статистики для рассматриваемых выше таблиц
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT CONCAT('UPDATE STATISTICS ', @DB, '.['
, t.sch_name, '].[', t.[name], '] WITH FULLSCAN;') AS stm
FROM #tbls AS t;
OPEN r_cursor_stat;
FETCH NEXT FROM r_cursor_stat
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_stat
INTO @stm
END
CLOSE r_cursor_stat;
DEALLOCATE r_cursor_stat;
Теперь необходимо включить триггеры в БД
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id])
, OBJECT_NAME(t.parent_id), t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 1
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
OPEN r_cursor_trigg_on;
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
END
CLOSE r_cursor_trigg_on;
DEALLOCATE r_cursor_trigg_on;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
После этого восстанавливаем все ранее удаленные внешние ключи
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (t.referencing_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referencing_columns,
STRING_AGG (t.referenced_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referenced_columns,
t.delete_referential_action,
t.update_referential_action
FROM dbo.FKs AS t
WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('[', OBJECT_NAME(FK.constraint_object_id)
, ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name,
t.delete_referential_action,
t.update_referential_action;
OPEN r_cursor_fk_recover;
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
,' WITH CHECK ADD CONSTRAINT ', @constraint_name,
' FOREIGN KEY(', @referencing_columns, ') REFERENCES '
, @referenced_object, ' (', @referenced_columns, ') '
, CASE
WHEN @delete_referential_action = 1
THEN 'ON DELETE CASCADE '
WHEN @delete_referential_action = 2
THEN 'ON DELETE SET NULL '
ELSE ''
END
, CASE
WHEN @update_referential_action = 1
THEN 'ON UPDATE CASCADE '
WHEN @update_referential_action = 2
THEN 'ON UPDATE SET NULL '
ELSE ''
END
, '; '
, 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '
, @constraint_name, '; ');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
END
CLOSE r_cursor_fk_recover;
DEALLOCATE r_cursor_fk_recover;
В конце запускаем проверку всех ограничений, используя следующую команду:
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'", @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
После делаем вывод статистики работы перезаливки данных в таблицы:
SELECT t.SchName,
t.TblName,
t.Cnt,
DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,
t.ErrorMsg
FROM #tbl_res AS t
ORDER BY t.SchName ASC, t.TblName ASC;
В итоге получаем полный скрипт
SET NOCOUNT OFF;
DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()),
@DBMaster VARCHAR(255) = 'БД-мастер',
@ERROR VARCHAR(MAX);
DECLARE @HistoryLimited bit = 1,
@table_name nvarchar(255),
@is_identity int = 0,
@stm nvarchar(max) = '',
@cols nvarchar(max) = '',
@IsNOTInsert bit,
@schema_name nvarchar(255),
@col_name_identity nvarchar(255),
@referencing_object nvarchar(255),
@referenced_object nvarchar(255),
@constraint_name nvarchar(255),
@referencing_columns nvarchar(max),
@referenced_columns nvarchar(max),
@rules nvarchar(max),
@key_cols nvarchar(max),
@StartMoment DATETIME2,
@FinishMoment DATETIME2,
@delete_referential_action INT,
@update_referential_action INT,
@max_row_insert INT = 100000,
@isClearTableFKs BIT = 1,
@RowCount BIGINT = 1,
@WhileDelCount INT = 0;
;
DECLARE @cnt TABLE (cnt BIGINT NOT NULL);
DROP TABLE IF EXISTS #tbl_res;
CREATE TABLE #tbl_res (
SchName NVARCHAR(255) NOT NULL,
TblName NVARCHAR(255) NOT NULL,
StartMoment DATETIME2 NOT NULL,
FinishMoment DATETIME2 NOT NULL,
Cnt BIGINT NOT NULL,
ErrorMsg NVARCHAR(MAX) NULL
);
EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL";
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.object_id = t.parent_id
WHERE t.is_disabled = 0
AND t.type_desc = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,
OBJECT_NAME(t.parent_id) ASC;
OPEN r_cursor_trigg_off;
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
END
CLOSE r_cursor_trigg_off;
DEALLOCATE r_cursor_trigg_off;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
DROP TABLE IF EXISTS #tbls;
CREATE TABLE #tbls (
[name] NVARCHAR(255) NOT NULL,
sch_name NVARCHAR(255) NOT NULL,
IsNOTInsert BIT NOT NULL
);
INSERT INTO #tbls (
[name],
sch_name,
IsNOTInsert
)
SELECT t.[name],
SCHEMA_NAME(t.[schema_id]) AS sch_name,
--задаётся правило, по которому определяем
--нужно ли после очистки наполнять данными таблицу или нет
--по умолчанию нужно (0-да, 1-нет)
0 AS IsNOTInsert
FROM sys.tables AS t
--в фильтре задаем какие таблицы брать в расчет
--(в нашем случае какие не брать в расчет)
WHERE t.[name] NOT LIKE 'unused%'
AND t.[name] NOT LIKE 'removed%'
AND t.[name] NOT LIKE 'migrated%'
AND t.[name] NOT LIKE 'migration%'
AND t.[name] NOT LIKE 'sysdiag%'
AND t.[name] NOT LIKE 'test%'
AND t.[name] NOT LIKE 'tmp%'
AND t.[name] NOT LIKE '%_cache'
AND t.[name] NOT IN ('FKs');
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t
WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo'))
BEGIN
CREATE TABLE dbo.FKs (
referencing_object NVARCHAR(255) NOT NULL,
constraint_column_id INT NOT NULL,
referencing_column_name NVARCHAR(255) NOT NULL,
referenced_object NVARCHAR(255) NOT NULL,
referenced_column_name NVARCHAR(255) NOT NULL,
constraint_name NVARCHAR(255) NOT NULL,
delete_referential_action INT NOT NULL,
update_referential_action INT NOT NULL
);
END
ELSE IF (@isClearTableFKs = 1)
BEGIN
TRUNCATE TABLE dbo.FKs;
END
INSERT INTO dbo.FKs (
referencing_object,
constraint_column_id,
referencing_column_name,
referenced_object,
referenced_column_name,
constraint_name,
delete_referential_action,
update_referential_action
)
SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['
, OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,
FK.constraint_column_id,
CONCAT('[', COL_NAME(FK.parent_object_id
, FK.parent_column_id), ']') AS referencing_column_name,
CONCAT('[', SCHEMA_NAME(R.[schema_id]), '].['
, OBJECT_NAME(FK.referenced_object_id), ']') AS referenced_object,
CONCAT('[', COL_NAME(FK.referenced_object_id
, FK.referenced_column_id), ']') AS referenced_column_name,
CONCAT('[', OBJECT_NAME(FK.constraint_object_id)
, ']') AS constraint_name,
FKK.delete_referential_action,
FKK.update_referential_action
FROM sys.foreign_key_columns AS FK
INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]
= FK.constraint_object_id
INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id
INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id
WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0
WHERE t0.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'));
DELETE FROM trg
FROM dbo.FKs AS trg
WHERE NOT EXISTS (
SELECT 1
FROM #tbls AS src
WHERE trg.referencing_object = CONCAT('[', src.sch_name
, '].[', src.[name], ']')
OR trg.referenced_object = CONCAT('[', src.sch_name
, '].[', src.[name], ']')
)
DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name
FROM dbo.FKs AS t
WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_drop;
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
, ' DROP CONSTRAINT ', @constraint_name, ';');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name;
END
CLOSE r_cursor_fk_drop;
DEALLOCATE r_cursor_fk_drop;
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.[name],
t.sch_name,
t.IsNOTInsert
FROM #tbls AS t
ORDER BY t.[name] ASC;
OPEN r_cursor;
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @cols = '';
SET @is_identity = 0;
SET @col_name_identity = NULL;
SET @stm = CONCAT('TRUNCATE TABLE ', @DB
, '.[', @schema_name, '].[', @table_name, ']');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
IF (@IsNOTInsert = 0)
BEGIN
SELECT @cols = @cols + CASE WHEN @cols = ''
THEN c.[name] ELSE ',' + c.name END,
@is_identity = @is_identity + c.is_identity,
@col_name_identity = CASE WHEN (c.is_identity = 1)
THEN c.[name] ELSE @col_name_identity END
FROM sys.tables t,
sys.columns c
WHERE t.[object_id] = c.[object_id]
AND t.[name] = @table_name
AND c.is_computed = 0;
SET @stm = '';
IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] ON');
SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB, '.['
, @schema_name, '].[', @table_name
, '](', @cols, ') SELECT ', @cols
, ' FROM [',@DBMaster,'].['
, @schema_name, '].['
, @table_name, '] WITH(NOLOCK) ');
--здесь можно задать ограничение на наполнение данными
IF @HistoryLimited = 1
BEGIN
IF @table_name LIKE '%History'
SET @stm = CONCAT(@stm
, ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) ');
END
IF @is_identity > 0 SET @stm = CONCAT(@stm, ' SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] OFF');
IF @is_identity > 0 SET @stm = CONCAT(@stm, ' DBCC CHECKIDENT ("'
, @table_name, '")');
SET @StartMoment = SYSDATETIME();
SET @ERROR = NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
SET @FinishMoment = SYSDATETIME();
SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ', '[', @schema_name
, '].[', @table_name, '] WITH (NOLOCK);');
DELETE FROM @cnt;
INSERT INTO @cnt (cnt)
EXEC sys.sp_executesql @stmt = @stm;
INSERT INTO #tbl_res (
SchName,
TblName,
StartMoment,
FinishMoment,
Cnt,
ErrorMsg
)
SELECT @schema_name,
@table_name,
@StartMoment,
@FinishMoment,
COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,
@ERROR;
END
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
END
CLOSE r_cursor;
DEALLOCATE r_cursor;
WHILE (@RowCount > 0)
BEGIN
SET @RowCount = 0;
SET @WhileDelCount += 1;
DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, '=src.', t.referenced_column_name, ')'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, ' IS NOT NULL)'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols
FROM dbo.FKs AS t
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_corr;
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object
,' AS trg WHERE ', @key_cols, ' AND NOT EXISTS (SELECT 1 FROM '
, @referenced_object,
' AS src WITH (NOLOCK) WHERE ', @rules, ');');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
SET @RowCount += @@ROWCOUNT;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
END
CLOSE r_cursor_fk_corr;
DEALLOCATE r_cursor_fk_corr;
END
PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[', t.sch_name, '].['
, t.[name], '] WITH FULLSCAN;') AS stm
FROM #tbls AS t;
OPEN r_cursor_stat;
FETCH NEXT FROM r_cursor_stat
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_stat
INTO @stm
END
CLOSE r_cursor_stat;
DEALLOCATE r_cursor_stat;
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 1
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
OPEN r_cursor_trigg_on;
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
END
CLOSE r_cursor_trigg_on;
DEALLOCATE r_cursor_trigg_on;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (t.referencing_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referencing_columns,
STRING_AGG (t.referenced_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referenced_columns,
t.delete_referential_action,
t.update_referential_action
FROM dbo.FKs AS t
WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name,
t.delete_referential_action,
t.update_referential_action;
OPEN r_cursor_fk_recover;
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
,' WITH CHECK ADD CONSTRAINT ', @constraint_name,
' FOREIGN KEY(', @referencing_columns, ') REFERENCES '
, @referenced_object, ' (', @referenced_columns, ') '
, CASE
WHEN @delete_referential_action = 1
THEN 'ON DELETE CASCADE '
WHEN @delete_referential_action = 2
THEN 'ON DELETE SET NULL '
ELSE ''
END
, CASE
WHEN @update_referential_action = 1
THEN 'ON UPDATE CASCADE '
WHEN @update_referential_action = 2
THEN 'ON UPDATE SET NULL '
ELSE ''
END
, '; '
, 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '
, @constraint_name, '; ');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
END
CLOSE r_cursor_fk_recover;
DEALLOCATE r_cursor_fk_recover;
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'"
, @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
SELECT t.SchName,
t.TblName,
t.Cnt,
DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,
t.ErrorMsg
FROM #tbl_res AS t
ORDER BY t.SchName ASC, t.TblName ASC;
Для проверки скрипта мы прогнали его на тестовой БД:
Скрипт выполнил перенос данных без ошибок.
Важно и то, что при объеме данных в БД‑источнике около 100 ГБ (размер таблиц в строках до 100 млн записей), время выполнения всего скрипта составило 18 — 19 минут, то есть достигается высокая скорость перезаливки таблиц.
Компоненты скрипта
Теперь немного о подробностях реализации. В приведённом скрипте используется целый стек системных объектов:
sys.sp_msforeachtable — недокументированная хранимая процедура в SQL Server, которая позволяет итеративно применять команду T‑SQL к каждой таблице в текущей базе данных;
sys.triggers — системный объект, который содержит информацию о триггерах в БД;
sys.tables — системный объект, который содержит информацию о таблицах в БД;
sys.sp_executesql — системная хранимая процедура для выполнения инструкции Transact‑SQL или пакета, в том числе, созданных динамически;
sys.foreign_key_columns — системный объект, который содержит информацию о составе внешних ключей;
sys.foreign_keys — системный объект, который содержит информацию о внешних ключах;
sys.columns — системный объект, содержащий информацию о колонках.
Вместо выводов
Реализация перезаливки таблиц в базе данных — довольно скрупулезная задача, в которой без автоматизации легко столкнуться с «подводными камнями» и нерациональным расходованием ресурсов. Поэтому автоматизация — must have для любой системы, где надо перегонять данные между средами. Предложенный скрипт — один из способов такой автоматизации.
Безусловно, описанное решение — не «серебряная пуля». Но наш опыт и проведенные тесты показали, что оно вполне эффективно и надежно справляется с переносом данных между средами.