В маркетинге и продажах крупных компаний есть несколько аналитических задач, которые требуют регулярной обработки сотен тысяч и миллионов записей из разных источников. Например, это прогнозирование продаж или планирование рекламных кампаний. Как правило, их решение не обходится без построения длинного пайплайна обработки данных. ML‑инженеру или аналитику данных нужен ансамбль из нескольких моделей и сервисов, чтобы собрать качественный датасет, провести эксперименты и выбрать наиболее подходящие алгоритмы.

Сбор, очистка и агрегация данных занимают большую часть времени и вычислительных ресурсов, а эти затраты хочется оптимизировать. В статье покажем, как мы ускорили построение пайплайнов обработки данных с помощью связки DataSphere Jobs и Apache Airflow™.

Типичные дата-пайплайны и их проблемы    

Есть несколько наиболее ярких примеров задач, где нужна длинная цепочка обработки данных по продажам и клиентам.

  • Анализ эффективности продаж и их прогнозирование, который позволяет на основе данных планировать отгрузки партнёрам, рекламные акции и другие способы стимулирования сбыта. Развитые модели прогноза могут учитывать множество факторов, например, активность конкурентов, сезонность и локальные погодные условия.

  • Клиентская аналитика 360°, когда признаки клиентов собираются из нескольких систем, затем подвергаются анализу и кластеризации под конкретную гипотезу.

  • Сценарии Upsell и Cross‑sell, когда есть необходимость не просто кластеризовать клиентов, но и предложить им наиболее подходящий смежный товар и сформировать канал для покупки.

  • Построение моделей оттока клиентов, например, в финансовых или телеком‑компаниях, для планирования стратегий удержания.

  • Построение скоринговых моделей, где множество признаков агрегируются и используются для оценки рисков банка.

Многие из этих задач решаются с помощью инструментов ML: рекомендательных систем, различных алгоритмов классификации и нейросетевых моделей. При этом всё это — регулярные сценарии, для которых нужно каждый раз проходить несколько этапов: сбор данных, их очистка и обогащение, дообучение модели, оценка качества и скоринг модели, прогон на реальных данных, построение витрин. Для быстрого запуска таких цепочек даже раз в месяц важно настроить ML‑пайплайн максимально эффективно.

Как это выглядит на примере ритейла. Задача из практики: один из крупных производителей товаров повседневного спроса ежемесячно запрашивает у своих дистрибьюторов данные о продажах и складских остатках для оценки финансовой эффективности.

Поскольку партнёры в ритейле — чаще всего независимые компании, все они ведут учёт в разных системах. Данные предоставляются в разных форматах: от дампов баз данных до excel‑ и текстовых файлов, запакованных непопулярными архиваторами наподобие ARJ. Внутри этих файлов не хватает унификации: могут отличаться форматы дат и времени, кодировка.

Сам объём данных может измеряться миллионами записей ежемесячно. Так что довольно много ресурсов уже на первом этапе тратится на их сбор и очистку, а также агрегацию — поскольку аналитические запросы к большим данным могут выполняться долго. На локальных мощностях такие вычисления порой занимают месяц, что делает ежемесячный отчёт автоматически неактуальным. При этом ритейлеру может потребоваться отдельный расчёт показателей для разных сбытовых сетей и групп товаров, и такую нагрузку хочется распараллелить. А на финальном этапе — собрать все результаты в единый наглядный отчёт в виде дашборда.

Увеличивать локальные мощности для таких пиковых нагрузок — неэффективно. А если переносить такие задачи в облако, хочется быть уверенными в безопасности ключевых данных.

Как мы помогли ритейлеру выстроить единый пайплайн с помощью облачных сервисов 

Подготовили облачный инструментарий. Чтобы помочь таким клиентам решить задачу эффективнее, мы планировали использовать DataSphere Jobs — это инструмент для удалённого исполнения кода, который предназначен для «тяжёлых» задач: обучения моделей или расчёта статистики. По задумке, этот сервис помогает запускать задание на обработку данных с помощью простого конфига в несколько строк. А уже по заданию автоматически собирается окружение, зависимости, входные файлы, необходимые библиотеки переносятся с локального компьютера в облако, подготовленные виртуальные машины запускаются — и потом результат возвращается на локальный компьютер пользователя, с записью всех логов в реалтайме. Что важно, данные в облаке не персистятся, а для крупного ритейла это значимо с точки зрения защиты данных

Мы уже показывали, как DataSphere Jobs работает в разовых сценариях, например, при генерации изображений с помощью нейросетей. Но в кейсе с ритейлером есть необходимость регулярно запускать одно и то же задание с переопределением некоторых параметров.

Задачу можно решить с помощью совместного использования DataSphere Jobs и Apache Airflow — это довольно популярный инструмент для создания масштабируемых пайплайнов, связанных с обработкой данных. В нём можно управлять автоматизированными процессами обработки данных — воркфлоу, которые представляют направленный ациклический граф (Directed Acyclic Graph, DAG), реализованный с помощью скрипта на Python.

Вот что мы сделали для создания этой связки.

  1. Для повторного запуска заданий с изменением параметров в DataSphere Jobs появилась команда fork. Она доступна и как команда CLI, и в составе DataSphere Jobs SDK. Пример вызова в SDK:

    sdk.fork_job(
      source_job_id: str,
      args: Optional[Dict[str, str]] = None,
      vars: Optional[Dict[str, str]] = None,
      env_vars: Optional[Dict[str, str]] = None,
      name: Optional[str] = None,
      desc: Optional[str] = None,
      docker: Optional[Union[str, dict]] = None,
      working_storage: Optional[dict] = None,
      cloud_instance_types: Optional[List[str]] = None,
    )

    Как видим, для запуска команды нужно знать ID исходного задания (параметр source_job_id). Используя fork, можно переопределить:

    • name, desc — имя и описание задания,

    • args — аргументы командной строки задания,

    • vars — файлы с входными и выходными данными,

    • env_vars — переменные окружения,

    • docker — докер-образ, используемый в задании,

    • working_storage — конфигурацию расширенной рабочей директории, 

    • cloud_instance_type — конфигурацию вычислительных ресурсов.

  2. Для встраивания запуска задания в общий процесс обработки данных используется Airflow DAG — направленный ациклический граф задач, описываемый кодом на Python. 

    Ниже простой пример DAG-файла, где осуществляется только запуск задания с ожиданием его завершения. DAG настроен на ежедневный запуск, начиная с текущего момента.

    from typing import Dict
    
    from airflow.decorators import dag, task
    import pendulum
    
    from datasphere import SDK
    
    now = pendulum.now()
    
    @dag(dag_id='fork_job_sync', start_date=now, schedule="@daily", catchup=False)
    def run():
        @task(task_id='fork_job')
        def fork_job(source_job_id: str, args: Dict[str, str]):
            sdk = SDK()
            job = sdk.fork_job(source_job_id, args=args)
            job.wait()
    
        fork_job('<source_job_id>', {'RANGE': '1'})
    
    run()
  3. Для того чтобы эта связка работала в облаке, необходимо cоздать кластер Managed Airflow и указать следующие параметры:

    • В разделе «Зависимости > Pip пакеты» пакет с DataSphere Jobs: datasphere.

    • В разделе «Настройки доступа > Сервисный аккаунт» указать сервисный аккаунт, имеющий роль Develop в DataSphere‑проекте, в котором запускаются задания.

    Теперь останется сохранить в S3-бакет DAG‑файл с нужным графом заданий и управлять им через UI веб‑сервера.

Упорядочили сбор данных и их загрузку. Как выглядел целевой пайплайн:

  • На первом этапе данные о продажах от коммерческих партнёров аккумулируются в облачном кластере ClickHouse.

  • В конце каждого месяца запускается перерасчёт показателей для разных групп товаров и сбытовых сетей. Для ускорения расчётов запускается несколько параллельных задач в DataSphere Jobs с использованием мощных серверов с GPU.

  • В конце показатели агрегируются в виде отчётов, доступных в Yandex Cloud DataLens.

Посмотрим, как автоматизировать загрузку файлов в решение так, чтобы выходные файлы одного задания можно было использовать для запуска последующего задания или иного способа их обработки.

  1. Начнём с примера, где исходное задание — это bash‑скрипт, запускающий поиск подстроки (grep) по входному файлу. Затем результат отправляется в выходной файл. Вот config.yaml задания:

    name: simple-bash-script
    desc: Find text pattern in input file with grep
    cmd: grep -C ${RANGE} ${OPTIONS} -f ${PATTERN} ${INPUT} > ${OUTPUT}
    args:
      RANGE: 0
      OPTIONS: "-h -r"
    inputs:
      - pattern_1.txt: PATTERN
      - input_1.txt: INPUT
    outputs:
      - output_1.txt: OUTPUT 

    Как видим, входной, выходной файлы, а также файл с паттерном подстроки указаны в переменных INPUT, OUTPUT и PATTERN, соответственно. Кроме того, есть аргументы командной строки — RANGE, где указан интервал вывода поиска, и OPTIONS с дополнительными флагами команды grep.

  2. После запуска этого задания получаем его ID из логов CLI-команды execute или на странице проекта во вкладке DataSphere Jobs в браузере. Далее, можно повторно запустить это задание с помощью SDK, указав его ID и переопределив необходимые параметры:

    from datasphere import SDK
    
    sdk = SDK()
    
    sdk.fork_job(
      '<source_job_id>',
      args={'RANGE': '1'},
      vars={'INPUT': '<new_input_file_path>'},
    )

    В этом примере указывается новый аргумент командной строки RANGE (значение OPTIONS остается неизменным и равным -h -r, как в исходном задании) и новый файл с входными данными INPUT.

  3. Для загрузки выходных файлов задания используем SDK-функцию download_job_files:

    download_job_files(job_id)

    Выходные файлы будут загружены в рабочую директорию. У функции также есть необязательные параметры:

    download_job_files(
      job_id,
      with_logs=True,  # также загрузить лог файлы, False по умолчанию
      with_diagnostics=True,  # также загрузить диагностические файлы, False по умолчанию
      output_dir='my/output/dir',  # директория для загрузки файлов, рабочая директория по умолчанию
    )

    В DAG‑операторе можно загрузить файлы и использовать их в другом задании. В этом примере предполагается, что у задания — источника выходных файлов есть выходной файл по пути result.txt. А у задания, которые мы повторно запускаем с помощью fork, есть входной файл с переменной INPUT_DATA.

    from typing import Dict
    
    from airflow.decorators import dag, task
    import pendulum
    
    from datasphere import SDK
    
    now = pendulum.now()
    
    @dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
    def run():
        @task(task_id='fork_job')
        def fork_job(files_job_id: str, fork_source_job_id: str):
            sdk = SDK()
            sdk.download_job_files(files_job_id)
            job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
            job.wait()
    
    
        fork_job('<files_job_id>', '<fork_source_job_id>')
    

Разберёмся с параллельным запуском заданий. Следующий пример DAG иллюстрирует неблокирующий вызов fork. Что даёт неблокирующий запуск:

  • После создания задания он позволяет сразу проводить некоторые полезные вычисления, не дожидаясь его завершения.

  • Для проверки статуса задания можно использовать Airflow Sensor, который позволяет экономить вычислительные ресурсы Airflow кластера для запуска других графов заданий.

from typing import Optional

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='fork_job_async', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(source_job_id: str, args: Dict[str, str]) -> str:
        sdk = SDK()
        job = sdk.fork_job(source_job_id, args=args)
        return job.id

    @task(task_id='print_job_info')
    def print_job_info(job_id: str):
        print(f'Do something useful, may be with job {job_id}')

    @task.sensor(task_id='wait_for_job', poke_interval=60, timeout=3600, mode='reschedule')
    def wait_for_job(job_id: str) -> PokeReturnValue:
        sdk = SDK()
        job = sdk.get_job(job_id)
        return PokeReturnValue(is_done=job.done, xcom_value='xcom_value')

    @task(task_id='handle_job_result')
    def handle_job_result():
        print('Processing job results')

    job_id = fork_job('<source_job_id>', {'RANGE': '1'})
    print_job_info(job_id)
    wait_for_job(job_id) >> handle_job_result()

run()

Сенсор каждую минуту узнаёт статус задания с помощью другой функции SDK — get_job. Когда задание завершено (job.done = True), запускается последний оператор handle_job_result.

Также можно добавить дополнительные операторы до или после запуска задания, реализовав интеграции с другими системами или внешними хранилищами.

Что даёт использование этих инструментов

Размещение инфраструктуры в Yandex Cloud в этом конкретном кейсе ритейла позволило ускорить расчёт показателей в 30 раз. При этом важно отметить, что запуск заданий DataSphere Jobs тарифицируется по модели PAYG, и это делает связку эффективной ещё и с точки зрения финансовой выгоды.

Больше подробностей использования DataSphere Jobs с примерами можно найти в документации. В скором времени добавим ещё несколько кейсов.

Комментарии (0)