Современные Data Pipelines – это как вода в кране
Если она есть – всё замечательно, можно мыть руки, приготовить еду и постирать вещи. Как только вода отключается, либо идёт слабый напор – проблема становится весьма ощутима. Ту же аналогию сегодня можно провести относительно потоков интеграции данных.
Data Integration / Data Pipelines сегодня стали commodity – они просто должны быть и функционировать, обеспечивая базовые потребности, при этом основной фокус работы Аналитиков и Инженеров приходится на моделирование данных, трансформацию, обогащение, агрегирование, а также визуальную подачу выводов.
Второй важной особенностью является смена парадигмы от ETL к EL(T). Я попробую изложить ключевые идеи в паре тезисов.
Исторический подход ETL предполагал последовательность Extract – Transform – Load, что выявило ряд проблем:
Нет прозрачности – в Хранилище попадают уже трансформированные данные, без возможности восстановить историю и исходные данные
Отсутствие гибкости – трансформации должны быть известны и разработаны заранее, любые изменения и дополнительные требования могли стоить дорого
Зависимость Аналитиков от Инженеров (иногда очень скилованных) – сложные варианты интеграции с рядом источников, высокий порог сложности решений
Современный подход EL(T) предполагает независимые этапы Extract – Load и Transform:
Гибкость – из сырых исходных данных можно собрать что угодно, какие бы идеи у вас не возникали, даже если они часто меняются
Вычислительные ресурсы и хранение данных доступны как никогда – Облачные сервисы хранить все данные без необходимости экономить
Разделение этапов EL и T – вы больше не завязаны на один инструмент, но вправе использовать любые тулзы для трансформации данных, такие как dbt, Airflow.
Множество компаний сегодня предоставляют Data Pipelines / Integration как сервис. Перечислю те, с которыми мне доводилось сталкиваться: Fivetran, Hevo, Alooma, Stitch.
Их основные преимущества:
Надежность и поддержка от вендора
Полностью управляемый сервис – минимум забот на вашей стороне
Легкая конфигурация pipelines – все стремятся упростить настройку
Но есть и ряд недостатков:
Это закрытый код – вы ограничены возможностями которые поддерживает вендор
Могут найтись специфические коннекторы (или способы подключения), которые вендор не поддерживает
И конечно это стоимость – чек может быть очень большим
Альтернативно, существуют класс современных и удобных решений для управления потоками интеграции данных с открытым исходным кодом: Airbyte, Meltano, Singer. И вот одно из таких решений сегодня я и предлагаю рассмотреть.
И да, честь и хвала разработчикам и контрибьюторам таких решений.
Airbyte – простота и гибкость в интеграции данных
Airbyte – это проект с открытым исходным кодом, который стремительно набирает популярность. Проект доступен на GitHub (3.800+ stars), а сообщество в Slack насчитывает 2.500+ человек. По сути это современный стандарт для выстраивания потоков интеграции данных из всевозможных приложений, баз данных и API в аналитические хранилища данных, озера данных. Ниже я коротко рассмотрю ключевые преимущества инструмента.
Обширный набор коннекторов, доступных для подключения в считанные минуты. В списке все самые популярные СУБД, а также огромное количество популярных сегодня приложений: Intercom, Zendesk, Stripe, Salesforce, Jira. Усилиями сообщества пользователей список коннекторов постоянно растет. Добавление новых коннекторов сведено к простому конфигурированию – оркестрацией и спосбоами репликации займется Airbyte.
Понятная и масштабируемая архитектура. Хранилище метаданных, в качестве которого можно использовать внешнюю СУБД (Postgres), веб-интерфейс, набор рабочих лошадок (Workers), число которых можно гибко регулировать, а также полноценный scheduler с возможностью гибко регулировать частоту репликации данных.
Различные варианты установки приложения: AWS, Azure, GCP, K8s, Docker. Подходящий вариант для компаний, которым необходимо разместить приложение на своих мощностях в связи с требованиями к безопасности и compliance. При размещении в облаке – данные хранятся в вашем облаке и стоимость ресурсов остается прозрачной.
Различные стратегии синхронизации данных – Sync strategies:
Full Refresh Overwrite: полная выгрузка всего объема данных и перезапись на приемнике
Full Refresh Append: полная выгрузка всего объема данных и добавление на приемнике
Incremental Append: инкрементальное чтение записей и добавление на приемнике
Incremental Deduped History: инкрементальное чтение записей, добавление на приемнике, а также формирование дедуплицированной версии представления
Manual full refresh: в случае необходимости провести полную репликацию данных из источника
Нормализация данных и преобразование типов. Также Airbyte может быть полезен в переводе массивов и вложенных (nested) коллекций в плоские структуры.
И, пожалуй, одно из самых главных – вы платите только за используемые вычислительные мощности (в случае использования облака). Никакой платы за количество коннекторов и объем реплицируемых строк, как в сервисах типа Hevo, Fivetran.
Развертывание Airbyte в Yandex.Cloud
Предлагаю попробовать пощупать Airbyte своими руками, развернув приложение в Яндекс.Облаке. Для этого нам понадобится проделать ряд шагов:
Регистрация в Облаке
Создание виртуальной машины
Создание пары ключей и подключение по SSH
Установка Docker + Docker compose
Запуск Airbyte
Конфигурирование Yandex Object Storage (S3) – опционально
Развертывание Airbyte в Yandex.Cloud
# Начало работы с интерфейсом командной строки
# Install: https://cloud.yandex.ru/docs/cli/quickstart#install
# Init profile: https://cloud.yandex.ru/docs/cli/quickstart#initialize
yc config list # check
# Подключиться к виртуальной машине Linux по SSH
# Создание пары ключей SSH: https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#creating-ssh-keys
ls ~/.ssh/id_rsa.pub # check
# Создать виртуальную машину из публичного образа Linux
# https://cloud.yandex.ru/docs/compute/operations/vm-create/create-linux-vm
# проверьте путь до файла с публичным ключом
yc compute instance create \
--name airbyte-vm \
--ssh-key ~/.ssh/key.pub \
--create-boot-disk image-folder-id=standard-images,image-family=ubuntu-1804-lts,size=10,auto-delete=true \
--network-interface subnet-name=default-ru-central1-a,nat-ip-version=ipv4 \
--memory 2G \
--cores 2 \
--hostname airbyte-vm
yc compute instance list # check
# Запишите публичный адрес ВМ – EXTERNAL IP
# Подключение к виртуальной машине
# https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#vm-connect
ssh -L 8000:localhost:8000 -i ~/.ssh/key yc-user@<external-ip> # укажите свой EXTERNAL IP
# Install docker
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg2 software-properties-common
curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add --
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian buster stable"
sudo apt-get update
sudo apt-get install -y docker-ce docker-ce-cli containerd.io
sudo usermod -a -G docker $USER
# Install docker-compose
sudo apt-get -y install wget
sudo wget https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m) -O /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
# Install Airbyte
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
git checkout v0.29.2-alpha
sudo docker-compose up -d
# Access at http://localhost:8000/preferences
В случае необходимости, Airbyte можно масштабировать (Scaling) следующими способами:
Увеличение количества Workers – scaling out
Использование более мощного типа VM (CPU, Memory, Disk) – scaling up
Подключим источник данных – Yandex.Metrika
В качестве источника интересных данных для нашего пайплайна я предложу использовать API сервиса Яндекс.Метрика. Это веб-счетчик который позволяет собирать огромное количество поведенческой информации с вашего сайта. У Я.Метрики есть демо-счётчик, который не требует аутентификации при обращении; им мы и воспользуемся.
Интерфейс позволяет выбрать интересующие метрики, разрезы и сегменты (фильтры) и представить данные в наглядном виде в интерактивном режиме (под капотом Clickhouse, который не тормозит!). Ознакомиться с ним можно по ссылке Metrika Live Demo.
Но прелесть в том, что все исходные данные доступны к выгрузке в сыром виде через API сервиса. То есть у вас появляется возможность собрать из этих данных что-то своё, например Сквозную аналитику.
Итак, в базовом виде обращение к API конфигурируется следующим образом:
Тип выгружаемого отчета – таблица, drill-down, time-series
Набор интересующих измерений, метрик, сегментов
Формат ответа и семплирование
Параметризация (валюта, атрибуция, цели)
В качестве обращения предлагаю формировать отчет Источники, сводка: https://api-metrika.yandex.net/stat/v1/data?preset=sources_summary&id=44147844
Для этого в Airbyte необходимо создать новый источник (source):
Тип – HTTP Request
Url – API Endpoint
Метод – GET
Конфигурируем простой pipeline и устанавливаем расписание
В качестве приемника данных будем использовать объектное хранилище Yandex Object Storage (совместимое с S3).
Для конфигурации бакета проделаем следующие шаги:
Конфигурация S3 в Яндекс.Облаке
## Configure S3 Access
# Creating a service account: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create
# Assign a role to service account: https://cloud.yandex.ru/docs/iam/operations/sa/assign-role-for-sa
# Create access key: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create-access-key
# Access with AWS S3 CLI: https://cloud.yandex.ru/docs/storage/tools/aws-cli
yc iam service-account create --name airbyte-s3
yc iam service-account list
yc iam service-account get airbyte-s3
yc iam service-account add-access-binding airbyte-s3 \
--role storage.admin \
--subject serviceAccount:<id>
yc iam access-key create --service-account-name airbyte-s3
aws configure --profile yc
aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 mb s3://<bucket-name>
aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 ls
Настроим Destination в Airbyte:
Destination type – S3
Endpoint – https://storage.yandexcloud.net
Имя Bucket и путь
S3 Key Id & Access Key полученные при настройке технической учетной записи
Обратите внимание на то, что помимо JSON из коробки доступен ряд других файловых форматов:
CSV
AVRO (binary) – гибкий, для schema evolution
Parquet – бинарный, колоночный, оптимизированный под чтение
Для финализации пайплайна выберем расписание и тип репликации:
Раз в сутки
Full refresh – Append
После сохраним и запустим репликацию.
Изучим выгруженные данные
Выгрузка представляет собой JSON-документ с:
параметрами обращения
результатами запроса
метаданными о выгрузке
Параметры обращения:
{
"query": {
"ids": [
44147844
],
"preset": "sources_summary",
"dimensions": [
"ym:s:lastSignTrafficSource",
"ym:s:lastSignSourceEngine"
],
"metrics": [
"ym:s:visits",
"ym:s:users",
"ym:s:bounceRate",
"ym:s:pageDepth",
"ym:s:avgVisitDurationSeconds"
],
"sort": [
"-ym:s:visits"
],
"date1": "2021-08-19",
"date2": "2021-08-25",
"limit": 100,
"offset": 1,
"group": "Week",
"auto_group_size": "1",
"attr_name": "",
"quantile": "50",
"offline_window": "21",
"attribution": "LastSign",
"currency": "RUB",
"adfox_event_id": "0"
}
}
Метаданные о выгрузке:
{
"total_rows": 116,
"total_rows_rounded": false,
"sampled": false,
"contains_sensitive_data": false,
"sample_share": 1.0,
"sample_size": 2698,
"sample_space": 2698,
"data_lag": 184,
"totals": [
2694.0,
2363.0,
36.82256867,
1.63437268,
85.72345954
],
"min": [
1.0,
1.0,
0.0,
1.0,
0.0
],
"max": [
833.0,
736.0,
100.0,
9.0,
1201.0
]
}
Результаты запроса:
{
"data": [
{
"dimensions": [
{
"icon_id": "0",
"icon_type": "traffic-source",
"name": "Direct traffic",
"id": "direct"
},
{
"name": null,
"id": null,
"favicon": null,
"url": null
}
],
"metrics": [
833.0,
736.0,
28.81152461,
1.7334934,
105.03601441
]
},
...
]
}
Ради интереса можно попытаться найти соответствие чисел в веб-интерфейсе Я.Метрики и выгрузке через API.
Понравилось – хочу еще
Браво! Поздравляю с первым потоком интеграции на реальных данных и использованием Modern Data Stack.
Еще больше современных инструментов мы изучаем на занятиях курсов Data Engineer и Analytics Engineer в OTUS: dbt, Clickhouse, Dataproc, Airflow. Но главное – то, что я и мои коллеги стремимся дать полноценную картинку и практические навыки, так необходимые для работы.
Главные преимущества такого подхода:
Живое общение на регулярных вебинарах
Пошаговые практические инструкции и домашние задания
Обратная связь и возможность получить консультации к своим решениям
Data Engineer – один из самых успешных тиражных курсов, в запусках которого я участвую уже более 2-х лет. К новому старту готовы кардинальные обновления по содержанию, используемым инструментам, инфраструктуре, включая выделенные вебинары на разбор домашних заданий.
Analytics Engineer – попытка закрыть потребность на людей-мультиинструменталистов, которые сильны и в понимании специфики бизнеса, моделировании и в инженерной части. Львиная доля курса посвящена современным аналитическим СУБД, BI-инструментам, практикам продвинутой аналитики и моделирования в dbt.
Спасибо за внимание!
Комментарии (3)
mkozhin
14.09.2021 22:05А где вы взяли такой источник среди коннекторов?
Вот только что, свежий установленный airbyte локально в докере и там при подключении нового источника нет HTTP Request
kzzzr Автор
14.09.2021 22:07Действительно Airbyte решили выпилить коннектор
HTTP Request
.
Подробности здесь: https://github.com/airbytehq/airbyte/pull/5185.
API Метрики можно подключить так:
– Source type: File
– Provider: HTTPS
NuclearShmell
Вообще, в классическом ETL есть такое понятие как "аудиторский след" - по сути это как раз исторический лог, из которого можно вытащить информацию об источниках и как во что трансформировалось...