Представьте: есть задача наладить аналитику данных, находящихся в интернете. Ежедневно менеджеры компании посещают сайты партнёров или сайты с публичной информацией, выгружают из них отчёты или смотрят дашборды. Цель – загрузить всю информацию в единственный приёмник и предоставить к ней доступ пользователям и BI-системам.

У каждого сервиса, разумеется, есть API, через которое можно выгружать данные. В основном это JSON или CSV файлы, доступные через HTTP API партнёров или сервисов, которыми пользуются коллеги. Но есть проблемы:

  • HTTP(S) в качестве источника поддерживается Informatica PowerCenter, на тот момент основного ETL-инструмента в «Магните», а вот JSON в нашей версии — нет. 

  • Источники лежат в интернете, что усложняет доступ из on-prem серверов.

Сегодня я хотел бы поделиться с вами опытом, который мы приобрели в компании «Магнит» при загрузке данных из внешних источников в корпоративное хранилище данных. Расскажу о проблемах, с которыми мы столкнулись и решениях, которые нам помогли облегчить процесс загрузки, повысить эффективность и ускорить получение доступа к данным. Давайте начнем!

На каком стеке работал Data WareHouse (DWH) раньше

Хранилище в «Магните» было создано в 2010 году на технологиях Informatica PowerCenter в качестве ETL инструмента, Oracle Exadata, Hadoop и Teradata для хранения данных. С их помощью мы создавали потоки данных из множества источников: баз данных компании, шин, файловых систем и так далее.

Первая попытка наладить получение данных из интернета

В 2020 году перед нами встала задача создать в нашем DWH модель данных электронной коммерции. В неё должна были войти информация о продажах через интернет-ресурсы: заказы, сборка, доставка, оценки. Источники этих данных можно разделить на:

  • внутренние — в основном это базы данных внутри корпоративной сети

  • внешние, расположенные в интернете.

Сначала мы решили сделать самое очевидное: подняли виртуальную машину в DMZ сети (сегменте сети, содержащий общедоступные сервисы) с доступом в интернет, на которой запускали Python-скрипты по расписанию. Скрипты обращались в нужное API и сохраняли ответы от них в CSV в файловую систему. После этого Informatica PowerCenter уже могла подбирать файлы через SSH. Таким образом мы быстро решили проблему с доступом в интернет и форматами файлов.

Шло время. Запросов от бизнеса на создание новых потоков становилось всё больше. Появлялись новые партнёрские системы, нужны были новые отчёты. В итоге список задач на выгрузку внутри виртуальной машины сильно разросся, и поддерживать в таком виде потоки было очень трудно: 

  • непонятно, какой джоб за что отвечает; 

  • тяжело найти, где происходит ошибка, если что-то упало; 

  • новым сотрудникам тяжело ориентироваться в существующей архитектуре и так далее.

Помимо этого, мы столкнулись с ограничениями сервисов, которые опрашивали. Иногда оказывалось, что в компании есть другая команда, которая тоже занимается выгрузкой из того же API, что и мы, и складывает данные куда-нибудь в своё хранилище. И если наши скрипты запускались одновременно, обе команды получали ошибку по лимиту запросов за отведенное время.

Версия 2.0 

Нужен был инструмент с возможностью работать с задачами: визуализировать, управлять и отслеживать. Желательно без необходимости кардинально переписывать существующую кодовую базу. 

Внутри команды мы сошлись на мнении, что Apache AirFlow нам отлично подходит: у сотрудников уже есть нужный опыт, в нём легко работать как разработчикам, так и аналитикам, инструмент находится в open source, есть сформированное комьюнити. На тот момент компания активно переносила облачные сервисы на Yandex Cloud, и мы присоединились к коллегам, развернув сервер AirFlow в корпоративном облаке. 

Для хранения файлов решили использовать Yandex Object Storage вместо локальной файловой системы. Это позволяет сэкономить средства на хранение файлов, делает возможным версионирование и охлаждение.

Концепцию хранения файлов сделали такой: в один из бакетов (S3 bucket — контейнер для хранения данных в облачном сервисе), назовем его raw, складываем файлы один в один как получили их от источника — в исходном формате и с версионированием. В бакет для подготовленных данных (ods) загружаем сконвертированные в формат parquet файлы с учетом дедуплицирования, строгого типизирования колонок, сжатия и без версионирования. 

После этого Informatica PowerCenter могла забирать все файлы, загруженные из интернета, единообразно из одного источника.

На этом этапе мы начали подключать к процессу другие заинтересованные в данных команды, которые, так же, как и наша Informatica Power Center, стали забирать файлы в свои хранилища. 

Помимо прочего, такой подход позволяет очень просто интегрироваться с сервисами, которые могут делиться данными только через экспорт. Достаточно создать бакет для такой системы и выдать ей данные для авторизации.

На какие грабли мы наступили 

Yandex Object Storage не может быть включен в корпоративную сеть. Все запросы к нему идут через интернет, соответственно, он видит внешние IP-адреса, что делает достаточно сложным организацию сетевого доступа. Чтобы разрешить доступ в определенный бакет, можно настроить пул IP-адресов – опять же, внешних. Но для этого IP-адрес системы, которая хочет обращаться к бакету, должен быть статическим (или пулом статических адресов).

Решение этой проблемы нашли инженеры облачных сервисов «Магнита»: они подняли балансировщик, который маршрутизировал запросы в Object Storage. Так любой сервис компании при попытке обратиться к storage.yandexcloud.net попадал не напрямую в него, а на балансировщик через корпоративный DNS. В таком случае S3 всегда получает запрос от конкретного IP-адреса балансировщика, и нам не приходится писать правила доступа для каждого сервиса: достаточно создавать им учётные записи.

Есть, конечно, нюансы. Существуют облачные сервисы, которые не используют наш DNS. К примеру, в том же Yandex Cloud это ClickHouse и Data Proc. Оказалось, что ClickHouse использует набор системных адресов формата IPv6, которые вы можете узнать только у поддержки Яндекса. В таком случае приходится прописывать эти адреса в пул разрешенных внутри настроек бакета. 

Что касается Data Proc, на его нодах можно настроить файл hosts и задать маршрутизацию в балансировщик вручную, однако этого будет недостаточно. Даже если вы при попытке подключиться к S3 бакету указываете ключи от учётной записи, которая имеет к нему доступ, он все равно будет пытаться подключиться через свою учётную запись. Чтобы предоставить доступ из Data Proc, нужно узнать, через какую учётку он запускается, и дать именно ей нужные права.

Нехватка ресурсов AirFlow. У нас в планах было добавлять новые потоки. Но проблема в том, что ресурсов текущего сервера им бы не хватало: выполнение задач занимало бы неадекватное время. 

Нужно было придумать, как эффективно масштабировать систему. Варианты пришли следующие:

  • Добавить ещё серверов в Celery-кластер. Тогда AirFlow бы смог распределять выполнение заданий на разные машины.

  • Развернуть AirFlow в Kubernetes. Тогда под каждое задание поднимался бы свой контейнер, а после выполнения удалялся.

  • Использовать AirFlow только для оркестрации заданий, а выполнение делегировать другому сервису.

Версия 2.1

Мы выбрали третий вариант, делегировав выполнение задач Yandex Data Proc — сервису для обработки многотерабайтных массивов данных. Этот вариант лишен главного минуса предыдущих – низкой производительности Python.

Начали с простого: постепенно стали переводить самые ресурсоемкие операции, конвертацию сырых файлов в .parquet, на spark джобы Data Proc. Результат радовал: на больших объемах данных скорость выполнения задачи возросла в 3–5 раз.

Data Proc при этом открыл возможность легко масштабировать кластер. Количество вычислительных нод в нем можно задать в любой момент, а можно и включить автомасштабирование кластера.

Но есть и минусы: время простоя обходится достаточно дорого. Держа включенным вычислительные ноды Data Proc, вы платите за работу виртуальных машин в облаке. Чтобы избежать лишних затрат, мы сделали DAG (Directed Acyclic Graph, группу связанных задач в Apache AirFlow), который по расписанию тушит кластер Data Proc, если в нем нет активных задач. А в DAG, которые используют кластер, добавили кастомные операторы для его включения и выключения.

Был также вариант сделать не включение и выключение, а создание и удаление кластера. Мы решили отложить его на будущее, потому что встретили ряд технических проблем в ходе разработки, которые сейчас пытаемся решить с поддержкой Yandex.

Пример DAG с конвертацией через Data Proc:

Получаем настройки -> Вычисляем даты, которые нужно загрузить -> Загружаем из API в S3 RAW бакет -> Поднимаем Data Proc -> Запускаем Spark джоб конвертации -> [Сохраняем в БД список загруженных файлов, останавливаем Data proc]
Получаем настройки -> Вычисляем даты, которые нужно загрузить -> Загружаем из API в S3 RAW бакет -> Поднимаем Data Proc -> Запускаем Spark джоб конвертации -> [Сохраняем в БД список загруженных файлов, останавливаем Data proc]

Вот как выглядит инструмент теперь:

  • Разработчики отправляют код для AirFlow DAG и PySpark джобов в Gitlab.

  • GitLab CI/CD запускает синхронизацию кода с сервером Airflow.

  • По заданному в DAG расписанию AirFlow запускает загрузку данных из API партнеров в S3, а затем Spark-задачу в Data Proc на конвертацию загруженных файлов в .parquet. При создании задачи AirFlow передает нужные параметры и ссылку на .py файл с кодом.

  • Иногда загрузка сырых данных выполняется Data Proc, если переливается очень большой объём.

  • Партнёры могут сами выкладывать файлы в созданные специально для них бакеты.

  • Затем все заинтересованные команды получают доступ к S3 и разбирают подготовленные файлы.

Благодарю вас за внимание! Если у вас есть вопросы или комментарии, буду рад на них ответить.

 

Комментарии (3)


  1. AlexeyDoinikov
    08.08.2023 08:16
    +2

    Отличная статья, все по делу, очень содержательно, автору огромное спасибо за труд


  1. Jajang
    08.08.2023 08:16
    +2

    Схема дополняет статью - тут отдельное спасибо. Итого - прекрасный пример автоматизации рутины обработки ручных файлов от их рождения до кхд

    Полный жизненный цикл


  1. atshaman
    08.08.2023 08:16

    А о каком объеме данных идет речь, если не секрет?