Современные Data Pipelines – это как вода в кране

Если она есть – всё замечательно, можно мыть руки, приготовить еду и постирать вещи. Как только вода отключается, либо идёт слабый напор – проблема становится весьма ощутима. Ту же аналогию сегодня можно провести относительно потоков интеграции данных.

Data Integration / Data Pipelines сегодня стали commodity – они просто должны быть и функционировать, обеспечивая базовые потребности, при этом основной фокус работы Аналитиков и Инженеров приходится на моделирование данных, трансформацию, обогащение, агрегирование, а также визуальную подачу выводов.

Второй важной особенностью является смена парадигмы от ETL к EL(T). Я попробую изложить ключевые идеи в паре тезисов.

Исторический подход ETL предполагал последовательность Extract – Transform – Load, что выявило ряд проблем:

  • Нет прозрачности – в Хранилище попадают уже трансформированные данные, без возможности восстановить историю и исходные данные

  • Отсутствие гибкости – трансформации должны быть известны и разработаны заранее, любые изменения и дополнительные требования могли стоить дорого

  • Зависимость Аналитиков от Инженеров (иногда очень скилованных) – сложные варианты интеграции с рядом источников, высокий порог сложности решений 

Современный подход EL(T) предполагает независимые этапы Extract – Load и Transform:

  • Гибкость – из сырых исходных данных можно собрать что угодно, какие бы идеи у вас не возникали, даже если они часто меняются

  • Вычислительные ресурсы и хранение данных доступны как никогда – Облачные сервисы хранить все данные без необходимости экономить

  • Разделение этапов EL и T – вы больше не завязаны на один инструмент, но вправе использовать любые тулзы для трансформации данных, такие как dbt, Airflow.

EL(T) в архитектура аналитических приложений Wheely
EL(T) в архитектура аналитических приложений Wheely

Множество компаний сегодня предоставляют Data Pipelines / Integration как сервис. Перечислю те, с которыми мне доводилось сталкиваться: Fivetran, Hevo, Alooma, Stitch.

Их основные преимущества:

  • Надежность и поддержка от вендора

  • Полностью управляемый сервис – минимум забот на вашей стороне

  • Легкая конфигурация pipelines – все стремятся упростить настройку

Но есть и ряд недостатков:

  • Это закрытый код – вы ограничены возможностями которые поддерживает вендор

  • Могут найтись специфические коннекторы (или способы подключения), которые вендор не поддерживает

  • И конечно это стоимость – чек может быть очень большим

Схема работы сервиса Fivetran
Схема работы сервиса Fivetran

Альтернативно, существуют класс современных и удобных решений для управления потоками интеграции данных с открытым исходным кодом: Airbyte, Meltano, Singer. И вот одно из таких решений сегодня я и предлагаю рассмотреть.

И да, честь и хвала разработчикам и контрибьюторам таких решений.

Airbyte – простота и гибкость в интеграции данных

Airbyte – это проект с открытым исходным кодом, который стремительно набирает популярность. Проект доступен на ​​GitHub (3.800+ stars), а сообщество в Slack насчитывает 2.500+ человек. По сути это современный стандарт для выстраивания потоков интеграции данных из всевозможных приложений, баз данных и API в аналитические хранилища данных, озера данных. Ниже я коротко рассмотрю ключевые преимущества инструмента.

Обширный набор коннекторов, доступных для подключения в считанные минуты. В списке все самые популярные СУБД, а также огромное количество популярных сегодня приложений: Intercom, Zendesk, Stripe, Salesforce, Jira. Усилиями сообщества пользователей список коннекторов постоянно растет. Добавление новых коннекторов сведено к простому конфигурированию – оркестрацией и спосбоами репликации займется Airbyte.

Интерфейс Airbyte – инструмент для Data Integration 
Интерфейс Airbyte – инструмент для Data Integration 

Понятная и масштабируемая архитектура. Хранилище метаданных, в качестве которого можно использовать внешнюю СУБД (Postgres), веб-интерфейс, набор рабочих лошадок (Workers), число которых можно гибко регулировать, а также полноценный scheduler с возможностью гибко регулировать частоту репликации данных.

Архитектурная схема компонент Airbyte
Архитектурная схема компонент Airbyte

Различные варианты установки приложения: AWS, Azure, GCP, K8s, Docker. Подходящий вариант для компаний, которым необходимо разместить приложение на своих мощностях в связи с требованиями к безопасности и compliance. При размещении в облаке – данные хранятся в вашем облаке и стоимость ресурсов остается прозрачной.

Опции развертывания Airbyte
Опции развертывания Airbyte

Различные стратегии синхронизации данных – Sync strategies:

  • Full Refresh Overwrite: полная выгрузка всего объема данных и перезапись на приемнике

  • Full Refresh Append: полная выгрузка всего объема данных и добавление на приемнике

  • Incremental Append: инкрементальное чтение записей и добавление на приемнике

  • Incremental Deduped History: инкрементальное чтение записей, добавление на приемнике, а также формирование дедуплицированной версии представления

  • Manual full refresh: в случае необходимости провести полную репликацию данных из источника

Нормализация данных и преобразование типов. Также Airbyte может быть полезен в переводе массивов и вложенных (nested) коллекций в плоские структуры.

Правила преобразования типов и нормализации данных Airbyte
Правила преобразования типов и нормализации данных Airbyte

И, пожалуй, одно из самых главных – вы платите только за используемые вычислительные мощности (в случае использования облака). Никакой платы за количество коннекторов и объем реплицируемых строк, как в сервисах типа Hevo, Fivetran.

Развертывание Airbyte в Yandex.Cloud

Предлагаю попробовать пощупать Airbyte своими руками, развернув приложение в Яндекс.Облаке. Для этого нам понадобится проделать ряд шагов:

  • Регистрация в Облаке

  • Создание виртуальной машины

  • Создание пары ключей и подключение по SSH

  • Установка Docker + Docker compose

  • Запуск Airbyte

  • Конфигурирование Yandex Object Storage (S3) – опционально

Развертывание Airbyte в Yandex.Cloud
# Начало работы с интерфейсом командной строки
# Install: https://cloud.yandex.ru/docs/cli/quickstart#install
# Init profile: https://cloud.yandex.ru/docs/cli/quickstart#initialize
 
yc config list # check
 
# Подключиться к виртуальной машине Linux по SSH
# Создание пары ключей SSH: https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#creating-ssh-keys
 
ls ~/.ssh/id_rsa.pub # check
 
# Создать виртуальную машину из публичного образа Linux
# https://cloud.yandex.ru/docs/compute/operations/vm-create/create-linux-vm
 
# проверьте путь до файла с публичным ключом
yc compute instance create \
   --name airbyte-vm \
   --ssh-key ~/.ssh/key.pub \
   --create-boot-disk image-folder-id=standard-images,image-family=ubuntu-1804-lts,size=10,auto-delete=true \
   --network-interface subnet-name=default-ru-central1-a,nat-ip-version=ipv4 \
   --memory 2G \
   --cores 2 \
   --hostname airbyte-vm
 
 
yc compute instance list # check
# Запишите публичный адрес ВМ – EXTERNAL IP
 
# Подключение к виртуальной машине
# https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#vm-connect
 
ssh -L 8000:localhost:8000 -i ~/.ssh/key yc-user@<external-ip> # укажите свой EXTERNAL IP
 
# Install docker
 
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg2 software-properties-common
curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add --
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian buster stable"
sudo apt-get update
sudo apt-get install -y docker-ce docker-ce-cli containerd.io
sudo usermod -a -G docker $USER
 
# Install docker-compose
 
sudo apt-get -y install wget
sudo wget https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m) -O /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
 
# Install Airbyte
 
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
git checkout v0.29.2-alpha
sudo docker-compose up -d
 
# Access at http://localhost:8000/preferences

В случае необходимости, Airbyte можно масштабировать (Scaling) следующими способами:

  • Увеличение количества Workers – scaling out

  • Использование более мощного типа VM (CPU, Memory, Disk) – scaling up

Подключим источник данных – Yandex.Metrika

В качестве источника интересных данных для нашего пайплайна я предложу использовать API сервиса Яндекс.Метрика. Это  веб-счетчик который позволяет собирать огромное количество поведенческой информации с вашего сайта. У Я.Метрики есть демо-счётчик, который не требует аутентификации при обращении; им мы и воспользуемся.

Интерфейс позволяет выбрать интересующие метрики, разрезы и сегменты (фильтры) и представить данные в наглядном виде в интерактивном режиме (под капотом Clickhouse, который не тормозит!). Ознакомиться с ним можно по ссылке Metrika Live Demo.

Веб-интерфейс Яндекс.Метрики
Веб-интерфейс Яндекс.Метрики

Но прелесть в том, что все исходные данные доступны к выгрузке в сыром виде через API сервиса. То есть у вас появляется возможность собрать из этих данных что-то своё, например Сквозную аналитику.

Итак, в базовом виде обращение к API конфигурируется следующим образом:

  • Тип выгружаемого отчета – таблица, drill-down, time-series

  • Набор интересующих измерений, метрик, сегментов

  • Формат ответа и семплирование

  • Параметризация (валюта, атрибуция, цели)

Синтаксис запроса к API Яндекс.Метрики
Синтаксис запроса к API Яндекс.Метрики

В качестве обращения предлагаю формировать отчет Источники, сводка: https://api-metrika.yandex.net/stat/v1/data?preset=sources_summary&id=44147844

Для этого в Airbyte необходимо создать новый источник (source):

  • Тип  – HTTP Request

  • Url – API Endpoint

  • Метод – GET

Конфигурирование источника Яндекс.Метрика
Конфигурирование источника Яндекс.Метрика

Конфигурируем простой pipeline и устанавливаем расписание

В качестве приемника данных будем использовать объектное хранилище Yandex Object Storage (совместимое с S3).

Для конфигурации бакета проделаем следующие шаги:

Конфигурация S3 в Яндекс.Облаке
## Configure S3 Access
# Creating a service account: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create
# Assign a role to service account: https://cloud.yandex.ru/docs/iam/operations/sa/assign-role-for-sa
# Create access key: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create-access-key
# Access with AWS S3 CLI: https://cloud.yandex.ru/docs/storage/tools/aws-cli
 
yc iam service-account create --name airbyte-s3
yc iam service-account list
yc iam service-account get airbyte-s3
 
yc iam service-account add-access-binding airbyte-s3 \
   --role storage.admin \
   --subject serviceAccount:<id>
 
yc iam access-key create --service-account-name airbyte-s3
 
aws configure --profile yc
 
aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 mb s3://<bucket-name>
aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 ls

Настроим Destination в Airbyte:

  • Destination type – S3

  • Endpoint – https://storage.yandexcloud.net

  • Имя Bucket и путь

  • S3 Key Id & Access Key полученные при настройке технической учетной записи

Конфигурирование приемника данных S3
Конфигурирование приемника данных S3

Обратите внимание на то, что помимо JSON из коробки доступен ряд других файловых форматов:

  • CSV

  • AVRO (binary) – гибкий, для schema evolution

  • Parquet – бинарный, колоночный, оптимизированный под чтение

Доступные форматы файлов для записи данных в S3
Доступные форматы файлов для записи данных в S3

Для финализации пайплайна выберем расписание и тип репликации:

  • Раз в сутки

  • Full refresh – Append

После сохраним и запустим репликацию.

Формирование пайплайна source + target + sync mode and frequency
Формирование пайплайна source + target + sync mode and frequency

Изучим выгруженные данные

Выгрузка представляет собой JSON-документ с:

  • параметрами обращения

  • результатами запроса

  • метаданными о выгрузке

Параметры обращения:
{
   "query": {
       "ids": [
           44147844
       ],
       "preset": "sources_summary",
       "dimensions": [
           "ym:s:lastSignTrafficSource",
           "ym:s:lastSignSourceEngine"
       ],
       "metrics": [
           "ym:s:visits",
           "ym:s:users",
           "ym:s:bounceRate",
           "ym:s:pageDepth",
           "ym:s:avgVisitDurationSeconds"
       ],
       "sort": [
           "-ym:s:visits"
       ],
       "date1": "2021-08-19",
       "date2": "2021-08-25",
       "limit": 100,
       "offset": 1,
       "group": "Week",
       "auto_group_size": "1",
       "attr_name": "",
       "quantile": "50",
       "offline_window": "21",
       "attribution": "LastSign",
       "currency": "RUB",
       "adfox_event_id": "0"
   }
}
Метаданные о выгрузке:
{
   "total_rows": 116,
   "total_rows_rounded": false,
   "sampled": false,
   "contains_sensitive_data": false,
   "sample_share": 1.0,
   "sample_size": 2698,
   "sample_space": 2698,
   "data_lag": 184,
   "totals": [
       2694.0,
       2363.0,
       36.82256867,
       1.63437268,
       85.72345954
   ],
   "min": [
       1.0,
       1.0,
       0.0,
       1.0,
       0.0
   ],
   "max": [
       833.0,
       736.0,
       100.0,
       9.0,
       1201.0
   ]
}
Результаты запроса:
{
   "data": [
       {
           "dimensions": [
               {
                   "icon_id": "0",
                   "icon_type": "traffic-source",
                   "name": "Direct traffic",
                   "id": "direct"
               },
               {
                   "name": null,
                   "id": null,
                   "favicon": null,
                   "url": null
               }
           ],
           "metrics": [
               833.0,
               736.0,
               28.81152461,
               1.7334934,
               105.03601441
           ]
       },
   ...
   ]
}

Ради интереса можно попытаться найти соответствие чисел в веб-интерфейсе Я.Метрики и выгрузке через API.

Понравилось – хочу еще

Браво! Поздравляю с первым потоком интеграции на реальных данных и использованием Modern Data Stack.

Еще больше современных инструментов мы изучаем на занятиях курсов Data Engineer и Analytics Engineer в OTUS: dbt, Clickhouse, Dataproc, Airflow. Но главное – то, что я и мои коллеги стремимся дать полноценную картинку и практические навыки, так необходимые для работы.

Главные преимущества такого подхода:

  • Живое общение на регулярных вебинарах

  • Пошаговые практические инструкции и домашние задания

  • Обратная связь и возможность получить консультации к своим решениям

Data Engineer – один из самых успешных тиражных курсов, в запусках которого я участвую уже более 2-х лет. К новому старту готовы кардинальные обновления по содержанию, используемым инструментам, инфраструктуре, включая выделенные вебинары на разбор домашних заданий.

Analytics Engineer – попытка закрыть потребность на людей-мультиинструменталистов, которые сильны и в понимании специфики бизнеса, моделировании и в инженерной части. Львиная доля курса посвящена современным аналитическим СУБД, BI-инструментам, практикам продвинутой аналитики и моделирования в dbt.

Спасибо за внимание!

Комментарии (3)


  1. NuclearShmell
    30.08.2021 08:58

    Вообще, в классическом ETL есть такое понятие как "аудиторский след" - по сути это как раз исторический лог, из которого можно вытащить информацию об источниках и как во что трансформировалось...


  1. mkozhin
    14.09.2021 22:05

    А где вы взяли такой источник среди коннекторов?

    Вот только что, свежий установленный airbyte локально в докере и там при подключении нового источника нет HTTP Request


    1. kzzzr Автор
      14.09.2021 22:07

      Действительно Airbyte решили выпилить коннектор HTTP Request.
      Подробности здесь: https://github.com/airbytehq/airbyte/pull/5185.
      API Метрики можно подключить так:
      – Source type: File
      – Provider: HTTPS