Создание пайплайнов для трансфера данных — рутинная задача Data-инженеров. Чтобы ее решить, многие копируют код коннекторов из одного проекта в другой. Из-за копипаста общая структура ломается, и в перспективе может возникнуть трудность с поддержкой проекта.

Источников данных много — Яндекс.Директ, Google Analytics и другие. По отдельности они не дают нужной картины, — данные всё равно приходится собирать в один Data Warehouse. Тут на помощь приходит Meltano: он позволяет стандартизировать написание коннекторов к различным источникам данных и быстро перенести все нужные данные.

Data-пайплайны

Большинство компаний данные централизуют: это помогает упростить управление данными и работать с общей картиной. Data-аналитики получают доступ к централизованным данным через специализированные аналитические базы данных — Data Lakes — и хранилища данных — Data Warehouse.

Чтобы получить данные, инженеры настраивают и поддерживают Data-пайплайны: берут необработанные данные исходной системы или приемника данных и передают их в аналитическую базу данных и в производственные системы — CRM и рекламные платформы. Как правило, Data-пайплайн включает в себя три элемента:

  1. Извлечение (E) данных из исходной системы или приемника данных.

  2. Преобразование (T) данных в определенную схему или модель данных.

  3. Загрузка (L) данных в пункт назначения.

Раньше, когда хранение данных стоило дорого или очень дорого, было важно понимать, какие данные поступают в конечную цель, и преобразовывать их до того, как они туда попадут. Использовали подход ETL — «извлечение, преобразование, загрузка». Сейчас, когда можно сделать трансфер всех данных из всех источников в одно место и уже потом думать, что с этим делать, перешли на ELT — «извлечение, загрузка, преобразование».

Один из самых распространенных Data-пайплайнов — получение данных из облачного хранилища в Data Warehouse. Эти технологии хранения часто используют в качестве приемников данных, куда сбрасываются «мгновенные фотоснимки» виртуального диска — снапшоты — из производственных баз данных или экстракты из SaaS-систем для дальнейшей обработки. Когда Data-инженеру нужно проанализировать данные, он копирует их в Data Warehouse и готовит к использованию. 

Представьте, что ваши аналитики хотят проанализировать эволюцию медленно меняющихся характеристик своих клиентов: где они живут, где работают, сколько у них детей и так далее. Один из способов провести такой анализ — делать ежедневный снапшот исходной базы данных клиентов, выгружая ее в Storage Bucket. Затем с помощью Data-пайплайна загрузить все данные снапшота из Storage Bucket в Data Warehouse. 


Для организаций с миллионами клиентов объем этих данных может быстро вырасти до наборов данных с миллиардами строк. Meltano поможет быстро обработать весь массив данных благодаря инкрементальной записи: когда в источнике появляется что-то новое, Meltano сразу это дописывает в Data Warehouse.

Одно из самых популярных облачных решений для хранения и обработки больших объемов данных — это Google BigQuery. Загрузить туда данные можно множеством способов — от прямой загрузки CSV или Apache Parquet через веб-интерфейс до Python-клиента, который позволяет писать свои скрипты экстракции и загрузки данных в Google BigQuery.

В следующем примере рассмотрим, как загрузить данные из Яндекс.Директа в Google BigQuery, используя Meltano и инструменты Singer, Prefect и DBT.

Как написать тап для REST API, GraphQL, базы данных или другого источника

Meltano предоставляет свой набор для разработки, — Meltano SDK — который позволяет быстро и легко создавать свои собственные тапы под любой источник. Для каждого источника нужно указать его тип, REST API, GraphQL, SQL, указать метод авторизации и описать стримы. В случае с SQL это будут таблицы, с REST API — отдельные методы. Каждый стрим требует описания схемы данных.

Руководство по написанию своего тапа

Для начала нужно установить cookiecutter:

pip3 install pipx
pipx ensurepath
pipx install cookiecutter
pipx install poetry

Далее скопировать шаблон для нового тапа:

cookiecutter https://github.com/meltano/sdk --directory="cookiecutter/tap-template"

Затем ответить на вопросы инсталлятора и завершить инициализацию тапа.


Теперь нам нужно переписать несколько классов:

YandexDirectStream в файле client.py

class YandexDirectStream(RESTStream):
   """YandexDirect stream class."""

   url_base = "https://api.direct.yandex.com/json/v5"

   records_jsonpath = "$.result[*]"  # Or override `parse_response`.
   next_page_token_jsonpath = "$.next_page"  # Or override `get_next_page_token`.
   def validate_response(self, response):
       if response.status_code == 400:
           data = response.json()
           raise FatalAPIError(f"Error message found: {data['error']['error_string']} {data['error']['error_detail']}")
       super().validate_response(response)

       if response.status_code in [201, 202]:
           raise RetriableAPIError("The report is being generated in offline mode")
       try:
           data = response.json()
           if data.get("error"):
                   raise FatalAPIError(f"Error message found: {data['error']['error_detail']}")
       except JSONDecodeError:
           if 200 <= response.status_code < 300:
               pass
  
   def backoff_max_tries(self) -> int:
       return 8

Обратите внимание на атрибуты url_base, которые нужно заменить в соответствии с URL вашего API, records_jsonpath и next_page_token — в соответствии со структурой возвращаемого ответа, и в случае Яндекс.Директ — метод validate_response.

В файле streams.py нужно создать классы, унаследованные от YandexDirectStream для всех эндпоинтов, с которых вы хотите получать данные. Для примера будем использовать https://api.direct.yandex.com/json/v5/campaigns.

class CampaignsStream(YandexDirectStream):
   """Define custom stream."""

   name = "campaigns"
   path = "/campaigns"
   primary_keys = ["Id"]
   replication_key = None
   records_jsonpath = "$.result.Campaigns[*]"
   schema = th.PropertiesList(
       th.Property("Name", th.StringType),
       th.Property("Id", th.IntegerType, description="The user's system ID"),
   ).to_dict()

   def prepare_request_payload(
       self, context: Optional[dict], next_page_token: Optional[Any]
   ) -> Optional[dict]:
       data = {
           "method": "get",
           "params": {"SelectionCriteria": {}, "FieldNames": ["Id", "Name", "Status"]},
       }
       return data

name — название Stream, оно будет отражено в названии таблицы после работы тапа.

path — название эндпоинта.

primary_keys — набор полей, которые будут уникальными для всей таблицы.

replication_key — поле, по которому будет производиться инкрементальная репликация.

records_jsonpath — путь к массиву с записями.

schema — описание схемы таблицы.

метод prepare_request_payload — payload, который нужно передать в соответствии с документацией API.

И наконец файл tap.py:

from typing import List

from singer_sdk import Tap, Stream
from singer_sdk import typing as th  # JSON schema typing helpers
# TODO: Import your custom stream types here:
from tap_yandexdirect.streams import (
   YandexDirectStream,
   CampaignsStream,
)

STREAM_TYPES = [
   CampaignsStream,
]


class TapYandexDirect(Tap):
   """YandexDirect tap class."""
   name = "tap-yandexdirect"

   # TODO: Update this section with the actual config values you expect:
   config_jsonschema = th.PropertiesList(
       th.Property(
           "access_token",
           th.StringType,
           required=True,
           description="The token to authenticate against the API service"
       ),
       th.Property(
           "start_date",
           th.DateTimeType,
           description="The earliest record date to sync"
       ),
       th.Property(
           "end_date",
           th.DateTimeType,
           description="The earliest record date to sync"
       ),
   ).to_dict()

   def discover_streams(self) -> List[Stream]:
       """Return a list of discovered streams."""
       return [stream_class(tap=self) for stream_class in STREAM_TYPES]

Здесь нужно импортировать стримы из streams.py и указать, что нужно для конфигурации тапа.

Для запуска выполните команду:

poetry run tap-yandexdirect

Чтобы получить возможность использовать ваш тап в Meltano-проектах, добавьте его в репозиторий.

На выходе получаются необходимые данные из источника, отформатированные в соответствии со спецификацией Singer. Это позволяет загружать их в любой из предложенных источников — Google BigQuery, PostgreSQL, Microsoft Azure или любую другую платформу для хранения данных. Таким образом, единожды описав схему, мы автоматически получаем возможность загрузить эти данные куда угодно без дополнительного написания кода.

Создание и настройка проекта Meltano

Запустите проект Meltano с помощью команды init. Это основа вашей централизованной платформы обработки данных, которую мы будем развивать.

meltano init my-project

Затем добавьте необходимые плагины Meltano с помощью команды add. Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:

Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:

meltano add --custom extractor tap-yandexdirect 
# (namespace) [tap_yandexdirect]: <hit enter, no value needed>
# (pip_url) [tap-yandexdirect]: git+https://github.com/epoch8/tap-yandexdirect.git 
# (executable) [pipelinewise-tap-s3-csv]: tap-yandexdirect
# (capabilities) [[]]: properties,discover,state
Затем добавьте target-bigquery:
meltano add loader target-bigquery


Все плагины будут добавлены в файл meltano.yml. Для каждого плагина используйте команду invoke с флажком --help, которая вызовет плагин и напечатает его справочное сообщение. Если вы видите такое сообщение, значит, плагины установлены успешно:

meltano --log-level=debug invoke tap-yandexdirect --help 
meltano --log-level=debug invoke target-bigquery --help

Создание тапа Яндекс.Директ, Google BigQuery и их конфигурации

Для создания тапа Яндекс.Директа и его конфигураций нужно указать дату начала экстракции, дату конца экстракции и токен для доступа — его можно получить в личном кабинете в Яндекс.Директе.

REST API Яндекс.Директа предполагает использование отчетов с указанием необходимых полей. В нашем примере мы сделали сложные отчеты, но гибкость в описании тапов позволяет создавать конфигурируемые отчеты в случае необходимости.

Для минимальных конфигураций таргета Google BigQuery понадобится указать идентификатор проекта, идентификатор Data-сета, его местонахождение и путь к реквизитам для входа.

Для начала настройте yan tap-yandexdirect. С помощью Meltano CLI вы можете использовать следующие команды для настройки экстрактора и загрузчика:

  • start_date: Дата начала экстракции данных;

  • end_date: Дата окончания экстракции данных;

  • access_token: Ваш токен доступа к API yandex-direct.

meltano config tap-yandexdirect set start_date 2021-11-02 

meltano config  tap-yandexdirect set end_date 2022-01-01

meltano config  tap-yandexdirect set access_token your-yandexdirect-token

Если у вас возникли какие-либо проблемы при настройке экстрактора, попробуйте почитать «Устранение неполадок» или присоединяйтесь к сообществу Slack, — там можно задавать любые вопросы другим членам сообщества.

Затем выполните следующие команды и настройте загрузчик Bigquery:

meltano config target-bigquery set project_id {your_project_id}

meltano config target-bigquery set dataset_id {your_dataset_id}

meltano config target-bigquery set location {your_dataset_location}

meltano config target-bigquery set credentials_path {your_path_to_service_account_credentials.json}

Когда выполните шаги настройки с помощью CLI, meltano.yml обновится и будет выглядеть примерно так:

version: 1
default_environment: dev
project_id: e564ac62-b9ed-41c5-8941-083c535b50eb
plugins:
 extractors:
 - name: tap-yandexdirect
   namespace: tap_yandexdirect
   pip_url: git+https://github.com/epoch8/tap-yandexdirect.git
   executable: tap-yandexdirect
   capabilities:
   - state
   - discover
   - catalog
   config:
     access_token: your-access-token
     start_date: 2021-11-02
     end_date: 2022-01-01
 loaders:
 - name: target-bigquery
   variant: adswerve
   pip_url: git+https://github.com/adswerve/target-bigquery.git
   config:
     project_id: your_project_id
     dataset_id: your_dataset_id
     location: your_dataset_locations
     credentials_path: your_path_to_service_account_credentials

environments:
- name: dev
- name: staging
- name: prod

Запуск Data-пайплайна Meltano

Запуск пайплайна загрузки данных стартуется командой Meltano run, которая принимает два аргумента: название источника и название таргета. С помощью одной этой команды мы получаем загруженные данные.

Что дальше?

Если вы хотите изучить больше, попробуйте после загрузки данных в Bigquery выполнить преобразования с помощью плагина dbt, запланировать Data-пайплайн с помощью плагина Airflow orchestrator или объединить Data-пайплайн с другими экстракторами и загрузчиками с помощью команды run Meltano. Как только ваши данные преобразуются, включите в свой проект плагин Superset для анализа и создания информационных панелей.

Комментарии (1)


  1. barloc
    18.09.2022 19:41

    Репозиторий тапов очень неоднороден - для апсфлаера, например, он выглядит древнючим и нерабочим.

    Скажите, есть опыт использования инстаграм и фейсбук адс тапов? Можно использовать?