> Часть 1
> Часть 3

В этой части изменим логику загрузки справочника Products:

  1. При помощи компонента «Union All» объединим два входящих потока в один;
  2. Для новых записей будем делать вставку, а для записей, которые уже были добавлены ранее будем делать обновление. Для разделения записей на добавляемые и обновляемые воспользуемся компонентом Lookup;
  3. Для обновления записей применим компонент «OLE DB Command».

В завершении этой части рассмотрим компонент Multicast для того чтобы распараллелить выходящий набор.

Итого в этой части мы познакомимся с четырьмя новыми компонентами: Union All, Lookup, OLE DB Command и Multicast.

Дальше так же будет очень много картинок.

Продолжим знакомство с SSIS


Создадим новый пакет:



И переименуем его в «LoadProducts_ver2.dtsx»:



В области «Control Flow» создадим «Data Flow Task»:



Двойным щелчком по элементу «Data Flow Task» зайдем в его область «Data Flow». Создадим два элемента «Source Assistant» для соединений SourceA и SourceB. Переименуем эти элементы в «Source A» и «Source B» соответственно:



«Source A» настроим следующим образом:



Текст запроса:

SELECT
  ID SourceProductID,
  Title,
  Price
FROM Products

В целях демонстрации больших возможностей за раз, здесь я намеренно отпустил SourceID.

«Source B» настроим следующим образом:


Текст запроса:

SELECT
  ID SourceProductID,
  'B' SourceID,
  Title,
  Price
FROM Products

В результате набор A у нас будет иметь 3 колонки [SourceProductID, Title, Price], а набор B будет иметь 4 колонки [SourceProductID, SourceID, Title, Price].

Воспользуемся элементом «Union All», чтобы объединить данные из 2-х наборов в один. Направим в него синие стрелки из «Source A» и «Source B»:



Каким образом делается сопоставление колонок двух входящих наборов, можно увидеть дважды щелкнув на элементе «Union All»:



Как мы видим, здесь сделалось автоматическое сопоставление колонок имена которых совпадают. При необходимости мы можем сделать свое сопоставление, для примера добавим колонку SourceID из второго набора:



В данном случае значения SourceID набора «Source A» будут равны NULL.

Объединение двух наборов в данном случае делается на стороне SSIS. Здесь стоит обратить внимание на то, что базы источники и принимающая база могут располагаться на разных серверах/экземплярах SQL Server, по этой причине мы не всегда сможем так просто написать SQL запрос используя в нем таблицы из разных баз с применением SQL-операции UNION или JOIN (который можно было использовать вместо Lookup описанного ниже).

Для того чтобы заменить NULL значения на «A» воспользуемся компонентом «Derived Column» в который направим поток из «Union All»:



Двойным щелчком зайдем в редактор «Derived Column» и настроим его следующим образом:


Проделаем следующее (мышь в помощь):

  1. Укажем в «Derived Column» значение «Replace 'SourceID'» — это будет означать что мы на выходе заменяем старую колонку SourceID на новую;
  2. Перетащим в область «Expression» функцию REPLACENULL;
  3. Перетащим на место первого аргумента функции REPLACENULL колонку SourceID;
  4. В качестве второго аргумента пропишем константу «A».

Для того чтобы понять, что произошло с данными после прохождения «Union All» сделайте «Enable Data Viewer» для стрелки, идущей от «Union All» к «Derived Column»:



Теперь при запуске пакета на выполнение вы сможете увидеть набор, который получился в результате:


Здесь видно, что на этом этапе (до Derived Column) в колонке SourceID для строк первого набора стоят значения NULL.

Для того чтобы определить была ли добавлена ранее запись в базу DemoSSIS_Target воспользуемся компонентом Lookup:



Дважды щелкнув по нему настроим данный элемент:



Здесь мы скажем, что те строки, для которых не найдено соответствие, мы будем перенаправлять в поток «no match output». В этом случае на выходе мы получим 2 набора «Lookup Match Output» и «Lookup No Match Output».
Например, если выставить значение «Ignore failure», то в строках, для которых не нашлось сопоставления в поле TargetID (см. ниже) будет записано значение NULL и все строки будут возвращены через один набор «Lookup Match Output».

«Full cache» говорит о том, что набор, который будет использоваться в качестве справочника одним SQL запросом (см.на следующей вкладке) будет полностью загружен в память и строки будут сопоставляться уже с кэша без повторных обращений к SQL Server.

Если же выбрать «Partial cache» или «No cache», то на вкладке Advanced можно будет прописать запрос с параметрами, который будет выполняться для сопоставления каждой строки входящего набора. Для интереса можно поиграться с этим свойством и через SQL Server Profiler посмотреть какие будут формироваться запросы при выполнении пакета.

На следующей вкладке нам нужно определить набор, который будет выступать в роли справочника:



Я прописал здесь запрос:

SELECT
  SourceID,
  SourceProductID,
  ID TargetID
FROM Products

На следующей вкладке нужно указать по каким полям делается поиск строки в справочнике и какие колонки из справочника нужно добавить в выходной набор (если это нужно):



Для определение связи нужно при помощи мыши перетащить поле SourceProductID на SourceProductID и поле SourceID на SourceID.

Добавим компонент «Destination Assistant» для вставки записей с потока «Lookup No Match Output»:



Перетащим синюю стрелку с «Lookup» на «OLE DB Destination» и в диалоговом окне выберем поток «Lookup No Match Output»:



В итоге мы получим следующее:



Дважды щелкнув по «OLE DB Destination» настроим его:





Обработку вставки новых записей мы сделали.

Теперь для обновления ранее вставленных записей воспользуемся компонентом «OLE DB Command» и перенесем на него синюю стрелку от Lookup:



В этот компонент автоматически будет направлен поток «Lookup Match Output», т.к. поток «Lookup No Match Output» мы уже выбрали ранее:



Дважды щелкнем на «OLE DB Command» и настроим его:





Пропишем следующий запрос на обновление:

UPDATE Products
SET
  Title=?,
  Price=?
WHERE ID=?

На следующей вкладке укажем каким образом будут задаваться параметры на основании данных строк входящего набора «Lookup Match Output»:



Через SSMS добавим новых продуктов в базу DemoSSIS_SourceB:

USE DemoSSIS_SourceB
GO

-- добавим новых товаров
SET IDENTITY_INSERT Products ON

INSERT Products(ID,Title,Price)VALUES
(6,N'Точилка',NULL),
(7,N'Ластик',NULL),
(8,N'Карандаш простой',NULL)

SET IDENTITY_INSERT Products OFF
GO

Для того чтобы отследить как менялись данные, вы можете, перед запуском пакета на выполнение, в необходимых местах сделать «Enable Data Viewer»:



Запустим пакет на выполнение:



В итоге мы должны увидеть, что 3 строки было вставлено при помощи компонента «OLE DB Destination» и 10 строк обновлено при помощи компонента «OLE DB Command».

Запрос прописанный в «OLE DB Command» выполнился для каждой строки входящего набора, т.е. в данном примере 10 раз.

В «OLE DB Command» можно прописать более сложную логику на TSQL, например, сделать проверку, были ли изменены Title или Price, и делать обновление соответствующей строки только если какое-то из значений отличается.

Для наглядности добавим новую колонку в таблицу Products в базе DemoSSIS_Target:

USE DemoSSIS_Target
GO

ALTER TABLE Products ADD UpdatedOn datetime
GO

Давайте теперь пропишем следующую команду:



Текст команды:

DECLARE @TargetID int=?
DECLARE @Title nvarchar(50)=?
DECLARE @Price money=?

IF(EXISTS(
      SELECT Title,Price
      FROM Products
      WHERE ID=@TargetID
      EXCEPT
      SELECT @Title,@Price
    )
  )
BEGIN
  UPDATE Products
  SET
    Title=@Title,
    Price=@Price,
    UpdatedOn=GETDATE()
  WHERE ID=@TargetID
END

Так же можно было бы все это оформить в виде хранимой процедуры, а здесь прописать ее через вызов «EXEC ProcName ?,?,?». Здесь, думаю, кому как удобнее, мне порой удобнее, чтобы все было прописано в одном месте, т.е. в SSIS-проекте. Но если использовать процедуру, то тоже получаем свои удобства, в этом случае можно, было бы просто изменить процедуру и избежать переделки и повторного развертывания SSIS-проекта.

После чего переопределим привязку параметров согласно их очередности в тексте команды:



Сделаем в базе DemoSSIS_SourceA обновление:

USE DemoSSIS_SourceA
GO

UPDATE Products
SET
  Price=30
WHERE ID=2 -- Корректор

И снова запустим проект на выполнение. В результате после очередного запуска пакета на выполнение, UPDATE должен будет выполниться только 1 раз, только для этой записи.



После выполнения пакета проверим это при помощи запроса:

USE DemoSSIS_Target
GO

SELECT *
FROM Products
ORDER BY UpdatedOn DESC



В рамках данной части рассмотрим еще компонент «Multicast». Данный компонент позволяет получить из одного потока несколько. Это может быть полезно, когда одни и те же данные необходимо записать в два или более разных мест – т.е. входит один набор, а выходит столько его копий сколько нам нужно, и с каждой копией этого набора мы можем делать что захотим.

Для примера создадим в базе DemoSSIS_Target еще одну таблицу LastAddedProducts:

USE DemoSSIS_Target
GO

CREATE TABLE LastAddedProducts(
  SourceID char(1) NOT NULL, -- используется для идентификации источника
  SourceProductID int NOT NULL, -- ID в источнике
  Title nvarchar(50) NOT NULL,
  Price money,
CONSTRAINT PK_LastAddedProducts PRIMARY KEY(SourceID,SourceProductID),
CONSTRAINT CK_LastAddedProducts_SourceID CHECK(SourceID IN('A','B'))
)
GO
?
Для очистки этой таблицы добавим в область «Control Flow» компонент «Execute SQL Task» и пропишем в нем команду «TRUNCATE TABLE LastAddedProducts»:





Перейдем в область «Data Flow» компонента «Data Flow Task» и добавим компонент следующим образом:



Обратите внимание на желтый восклицательный знак – это произошло из-за того, что мы добавили колонку UpdatedOn и не привязали ее. Зайдем в элемент «OLE DB Destination», перейдем на вкладку Mappings оставим для колонки UpdatedOn в качестве входящего поля Ignore и нажмем OK:



Создадим еще один элемент «OLE DB Destination» и перетащим на него вторую синюю стрелку от элемента Multicast:



Переименуем для наглядности:



Настроим «To LastAddedProducts»:





Удалим через SSMS три последние вставленные записи:

USE DemoSSIS_Target
GO

DELETE Products
WHERE SourceID='B'
  AND SourceProductID>=6

И запустим пакет на выполнение:



В итоге добавление произошло в 2 таблицы – Products и LastAddedProducts.

Заключение по второй части


В этой части мы рассмотрели каким образом можно делать синхронизацию небольших справочников. Здесь конечно не учитывается тот момент, что данные в источниках могут еще удаляться, но при необходимости вы сможете попробовать сделать это самостоятельно, т.к. при удалении иногда нужно учитывать дополнительные факторы, например, на удаляемую запись могут быть ссылки из других таблиц (в следующей части планируется это сделать).
Чтобы не нарушать ссылочную целостность, иногда запись в принимающей таблице удаляется логически, для этого, например, можно в эту таблицу добавить поле Deleted типа bit (флаг логического удаления) или DeletedOn типа datetime (дата/время логического удаления).

Порой на сервере, на котором располагается база Target делается вспомогательная промежуточная база (обычно ее называют Staging) и первым делом «сырые» данные из Source загружаются в нее. Так как теперь Target и Staging находятся на одном сервере, то вторым шагом мы можем легко написать SQL-запрос (например, используя SQL-конструкцию MERGE или запрос с применение конструкции JOIN), который оперирует с наборами обеих этих баз.

SSIS достаточно интересный инструмент, который на мой взгляд не помешает иметь в своем арсенале, так как в некоторых случаях он может сильно упростить процесс интеграции. Но конечно бывают ситуации, когда все взвесив, разумнее написать интеграцию прибегая к другим способам, например, использовать Linked Servers и писать процедуры на чистом TSQL или писать свою утилиту на каком-то другом языке программирования с применением всей мощи ООП и т.п.

Изучая материал проявляйте больше любопытства, например, щелкайте по вкладкам, которые я не показал, смотрите и анализируйте информацию на них, щелкайте по стрелкам, у них тоже есть свои свойства и настройки. Экспериментируйте, со всем что вам покажется интересным, не ленитесь делать свои небольшие тестовые примеры. Меняйте схему, так чтобы это приводило к исключениям, выбирайте более подходящие параметры у компонент пытаясь найти наиболее подходящий выход из сложившейся ситуации.

Спасибо за внимание! Удачи!

> Часть 3
Поделиться с друзьями
-->

Комментарии (9)


  1. jankovsky
    12.06.2017 23:16
    +1

    Никогда не устану наблюдать как люди пытаются автоматизировать инфраструктуру на устаревших технологиях )


    1. mindw00rk
      13.06.2017 06:39
      +1

      А когда SSIS устареть успел?


    1. Buzzzzer
      13.06.2017 08:26
      +1

      И какие есть альтернативы ?


      1. mayorovp
        13.06.2017 08:31
        +1

        Да, есть. Как показала практика, обычное консольное приложение на C#, запускаемое через все тот же SQL Server Agent, гораздо проще в написании и поддержке если вам нужно хоть что-нибудь выходящее за рамки "изкоробочной" функциональности SSIS.


        1. Buzzzzer
          13.06.2017 13:14

          SSIS меня подкупил только своей скоростью.
          Достаточно большие объемы информации (5-50 Гбайт), которые нужно еще всячески "покрутить" и записать в DWH обрабатываются очень быстро и не грузят основной сервер c OLTP-базами.
          Сам интерфейс разработки, да, мягко говоря, не очень.
          Разработка только началась, и если есть более удобные альтернативы, может сразу уйти ?


  1. mindw00rk
    13.06.2017 06:50
    +2

    Решал как-то раз задачу выгрузки данных из базы в зазипованый текстовый файл на ftp сервер сторонней организации. Сделал как раз через SSIS и SQL Server Агент. Вполне себе работало. Решение не требовало привлечения каких-либо сторонних средств, только SQL Server.


  1. zindur2
    13.06.2017 16:44

    BIML — пишите на c# и создайёте SSIS package



    плюс — merging будет работать :)


    1. zindur2
      13.06.2017 17:20

      а можно обосновать почему я шпалу получил?
      Чем BIML плох?


    1. mindw00rk
      14.06.2017 12:09

      Спасибо, надо будет глянуть.