Как и было обещано, в завершение серии ( 1 2 3 4 5 ) статей о разработке инструмента для ETL больших данных, я выкладываю выжимку ответов на вопросы.


А то статьи были в формате туториала для разработчиков таких инструментов (длинные и с высоким уровнем сложности), так что стоит рассказать более кратко и понятно для каждого.


Q. Что это такое?


A. Специализированный инструмент для а) быстрого создания ETL процессов и б) эффективного по стоимости их выполнения.


Промка: https://dcetl.ru
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl


Q. Что за маркетинговый булшит. Пруфы будут?


A. Да, у меня есть конкретный кейс. Сравнивать будем с реализацией скриптом на pyspark (SQL и датафреймы).


Дано: геолокационные данные от внешнего поставщика в файлах Parquet, выложенные в бакет на S3. Объём: 1–5 ТБ, загружаются раз в сутки по отдельному префиксу.
Поля: latitude, longitude, user_id, time_stamp, horizontal_accuracy, + ещё ~2 десятка полей, не имеющих значения для рассматриваемого процесса.


Требуется: произвести ingestion по нижеследующему алгоритму.


  1. Отфильтровать по полю horizontal_accuracy (исключить не-числовые значения и числовые > 100).
  2. Разбить поле time_stamp на компоненты year, month, day, dow, hour, minute со сменой часового пояса на GMT.
  3. Посчитать по (latitude; longitude) хеш Uber H3 заданного уровня.
  4. Разбить на 2 набора данных по полю dow (1–5 рабочие дни и 6–7 выходные).
  5. Результат должен оказаться в другом бакете на S3 в файлах формата TSV.

Написание скрипта для данного процесса на Python заняло 6 часов времени аналитика (вместе с отладкой). Прогон процесса на кластере EMR из 5 нод размера c6i.xlarge занимает в среднем 48 минут, стоимость — 14 $.


Написание SQL скрипта для Data Cooker ETL заняло 47 минут времени аналитика. Прогон на кластере того же размера — 8 минут, стоимость — 1.8 $.


Тестировался процесс на EMR версии 6.9 на 7 прогонах (по данным, поставленным за 1 неделю) в августе 2023 года.


С учётом того, что данный процесс выполняется у нас каждые сутки, и всего таких процессов полтора десятка, то экономия времени и денег на перспективе получается существенная. Причём, у нас они ещё и меняются раз в несколько недель, так что даже на времени разработки большая экономия.


Q. И что это за волшебный SQL такой?


A. Специализированный диалект, заточенный специально только на задачи ETL. У языка есть формальная спецификация — https://pastorgl.github.io/datacooker-etl/TDL4.html


Намеренно не содержит поддержки аналитических и агрегирующих языковых конструкций, подразумевает только итерацию по набору данных целиком, динамическую параметризацию и вызов подключаемых функций. Ещё он умеет в циклы и ветвления, чтобы развесистые процессы писать.


Q. Зачем мне учить какой-то специальный SQL? У меня уже есть Spark SQL и датафреймы.


A. Во-первых, подмножество языка простое и компактное, и ничего особенно учить не нужно. Даже редакторы кода подсвечивают его вполне приемлемо без дополнительной настройки.


Во-вторых, наборы данных в Data Cooker ETL в некоторых аспектах покруче будут, чем датафреймы.


Q. В каком это смысле?


A. В том смысле, что датафрейм — это Dataset[Row], и заточен он конкретно под табличные данные. А DS в датакукере — это коллекция объектов, которые не обязательно соответствуют таблице.


Конкретно, типов 6:


  • колоночные (то есть, аналог табличных),
  • структурированные (то есть, произвольные объекты, загруженные, к примеру, из JSON),
  • непрозрачный / простой текст (то есть, byte[]),
  • и ещё три геометрических: точки, полигоны, сегментированные треки.

Q. И что, в Spark SQL тоже есть поддержка JSON полей через функции.


A. А в датакукере поддержка реализована прямо на уровне языка. JSON объект в датакукеровском DS — это объект первого класса, к его свойствам можно напрямую обращаться из SELECT:


SELECT "categories[1:]", "address.city", "contact.email"
    FROM places INTO places_secondary_cats
    WHERE "categories[0]" IN $categories AND "categories[1]" IS NOT NULL;

(То же и с сегментированным треком, например. В SELECT можно обратиться к свойствам трека, сегментов, и точек напрямую. Проект, для которого изначально разрабатывался датакукер, геоинформационный, для него это важно.)


Q. Ну так в чём проблема хранить JSON в поле? Схему задал, и вперёд.


A. Так в датакукере схему вообще задавать не нужно, он изначально schema-less. Если нужно обратиться к атрибуту объекта по имени, просто пишем его имя, и всё.


С табличными данными, чтобы обращаться к полям, имена задавать нужно (но при создании DS ненужные поля можно просто пропускать), а вот указывать их тип — нет. Тип всегда автоматически выводится при обращении к полю в выражении. Можно просто передать список полей в процесс при запуске (например, через командную строку, или файл параметров), и этого будет достаточно.


И никакой обвязки для инференса типов вообще не требуется.


Q. Ну вроде ладно. А что там было про подключаемые функции?


A. Есть API для расширения на Java, ориентированное на уровень Spark RDD.


Подключаемые функции бывают для адаптеров хранилищ (например, чтобы использовать источник JDBC), для трансформаций (например, можно написать читалку из XML текста в структурированный объект), для операций — запакованных алгоритмов, расширяющих возможности языка.


Писать собственные функции несложно. В стандартной поставке примерно по 20 трансформаций и операций, и по паре штук адаптеров хранилищ на чтение и на запись, так что кода для использования в качестве примера более чем достаточно.


Q. Что там было про циклы?


A. Для упрощения реализации логики в процессах ETL, в языке есть поддержка операторов контроля процесса исполнения (LOOP, IF) и динамически вычисляемых переменных (например, LET $arr = SELECT DISTINCT property FROM data_set).


Кроме того, все строки интерполируемые, то есть, имена наборов данных, например, можно вычислять прямо в рантайме. Так что в большинстве случаев можно обходиться без каких-то дополнительных обвязочных скриптов, и весь процесс целиком писать на SQL, просто передавая параметры во время запуска.


Q. Ну, допустим, что-то написали. А отлаживаться как?


A. У датакукера есть интерактивный отладчик с REPL, в котором можно пошагово писать и отлаживать скрипты, вызывать любые подключаемые функции (с автодополнением параметров), и заглядывать в наборы данных. А также смотреть статистику, и всё такое прочее.


Более того, режим этот может работать как локально, так и удалённо, когда сам движок запущен на кластере, а консоль с локальной машины обращается к нему через REST.


Q. Это что, какой-то отдельный артефакт?


A. Нет, что вы. Вся функциональность инструмента запакована в один FatJAR, у которого нет никаких внешних зависимостей. Но есть много режимов запуска — для разных окружений.


И вообще, сборка простая (многомодульный Maven проект с профилями), но продвинутая. Например, документация по подключаемым функциям автоматически собирается в HTML и PDF прямо из исходника, включая примеры SQL скриптов — с раскраской синтаксиса.


Q. О как, и такие плюшки, значит. Ну что ж, удачи вам.


A. Спасибо! Однако, прошу заметить, что мы ведь далеко не первый год используем датакукер в проме, и вполне успешно ETL-им терабайты данных каждый день. Точно даже не скажу, сколько уже отъетлили, счёт идёт на десятки тысяч процессов (если считать в обработанных записях, то будут, вероятно, триллионы). Просто, чтобы открыть код, руки дошли только сейчас.


Более того, в нашем приватном форке куча аналитических алгоритмов в виде подключаемых функций реализована, так что не только ETL, но и аналитику мы тоже делаем им же самым.


Короче, не какое-нибудь экспериментальное фуфло, а достаточно зрелый продукт с практикой эксплуатации. Так что мы с большим удовольствием и поможем внедрить, и научим, как правильно готовить ETL процессы. Обращайтесь!

Комментарии (0)