Всем привет! Меня зовут Ваня Ахлестин, я занимаюсь поддержкой и развитием аналитической платформы кластера Search&Recommendations на базе Spark и Hadoop. 

Большинство сервисов в хайлоаде, работу которых мы логируем и исследуем, давно переписаны на Go. Из-за этого часто необходимо переиспользовать логику сервиса внутри аналитического или ML-приложения на Spark. Как примеры такого кода можно взять расчёт скоров по сложному запросу или ранжирование айтемов для выдачи. 

Реимплементировать и поддерживать несколько вариантов кода дорого, заниматься обстрелом сервисов долго и больно. Поэтому сегодня я расскажу, как начать использовать ваш код из Python или PySpark и не тратить много времени дорогих разработчиков.

Немного о нашей инфраструктуре

Наша платформа построена на классической связке Hadoop/YARN/Spark/PySpark. Основной язык разработки — Python 3.11. Мы стараемся придерживаться баланса между зрелостью и актуальностью софта, чтобы не погрязнуть в поддержке легаси решений. Для этого мы долго обучали разработчиков и аналитиков писать тесты для их задач.

Изначально Python был выбран вместо Scala, чтобы снизить порог входа и дать возможность каждому работать с продуктовым кодом вместо выделенной команды бигдата-инженеров, реимплементирующих код от аналитиков и ML.

В типичных цикл разработки входит:

  • Проектирование — обсуждаем и подготавливаем данные для задачи, эксперимента, проговариваем примерную реализацию.

  • Прототип в Jupiter ноутбуках, первое ревью, проверяем надёжность и качество исходных данных.

  • Упаковываем задачу в формат Airflow, настраиваем, покрываем тестами и тестовыми данными, настраиваем метрики.

Для аналитики работы сервисов мы обычно используем пайплайн с логированием JSON структур в Kafka, данные из которой в HDFS поставляет задача на Flink. Реплей таких логов на кластере через PySpark UDF — об этом и написана статья.

CGO, или собираем биндинг дешево

Первое, о чём нужно подумать при работе с кодом сервиса — как организовать биндинг со сложными вложенными структурами вашего сервиса.

Тут есть несколько подходов:

  • Довериться генератору биндингов, например GoPy. Самое очевидное решение, которое мы и попробовали в первую очередь. К сожалению, результат был негативным:

    • биндинг содержит много мусора и ненужных артефактов;

    • требуется дополнительно его патчить, не всё работает «из коробки»;

    • структуры биндинга уродливы, над ними нужна ещё одна обертка;

    • код неэффективен;

    • собранный пакет жёстко привязывается к определённой версии интерпретатора — у нас есть несколько версий и дистрибутивов Python.

Поэтому такие биндинги не очень долго прожили — требовали постоянного внимания.

  • Написать биндинг вручную. Существует много туториалов, как это сделать, но есть сложности:

    • поддержка однообразных структур и кода на С;

    • есть привязка к ABI Python, нужна сборка под конкретный Python;

    • чтобы поддерживать в актуальном виде, нужно время квалифицированного разработчика.

  • Использовать сериализацию в промежуточный формат. Плюсы:

    • если это JSON, то сервис его уже использует, он нативен для кода;

    • Spark и Python уже умеют его использовать «из коробки»;

    • можем передать в код клиента схему с описанием структур;

    • не нужно поддерживать код самого биндинга.

Минус — есть дополнительные расходы на упаковку-распаковку, но и быстрые парсеры тоже есть. На этом варианте мы и остановились.

Минимальный биндинг изнутри

Чтобы было проще разобраться, я подготовил демо-проект со следующей структурой:

Основная идея проекта:

  • Собрать shared библиотеку с С-интерфейсом.

  • Добавить к ней Python-биндинг с использованием ctypes.

  • Упаковать в переносимый пакет (Wheel), готовый для установки в любое окружение для платформы.

Бекэнд-код на Go

Демо-код бэкенд-логики содержит вложенные структуры и код, который их принимает и возвращает — это максимально типичная ситуация в реальной жизни. Структуры уже размечены для JSO- генератора — это необязательно.

package go_backend_lib

type RawDataItem srtuct {
   ItemId      int64   'json:"item_id,omitempry"'
   Pos         uint8   'json:"pos,omitempty"'
   PlatformId  int64   'json:"platform_id,omitempty"'
   RegionId    int64   'json:"region_id,omitemrty"'
   CTR         float64 'json:"ctr,omitempty"'
}

type RawataRecord struct {
    Timestamp       int64                  'json:"tm,omitempty"'
    X               string                 'json:"x,omitempty"'
    Items           []RawDataItem          'json:"items,omitempty"'
    UserHistoryItem map[int64]*RawDataIten 'json:"user_history_items,omitempty"'
    Tag             string                 'json:"tag,omitempty"'
}

type ScoresRecord struct {
    Timestamp int64             'json:"tm,omitempty"'
    X         string            'json:"x,omitempty"'
    Scores    map[int64]float64 'json:"scores,omitempty"'
}

func CalcScores(data RawDataRecord) ScorexRecord {
    // ...
}

Экспортируем функции и структуры через C Call интерфейс

package main

/*

typedef struct Result {
  char* value;
  char* err;
} Result;

*/
import "C"

import (
    gbl "example.com/go_backend_lib"
    "fmt"

    jsonschema "github.com/invopop/jsonschema"
    jsoniter "github.com/json-iterator/go"
)

func ErrStruct(err error) C.Result {
    return C.Result{
        value: nil,
        err:   C.CString(err.Error())
    }
}

func ResStruct(value string) C.Result {
    return C.Result{
        value: CCString(value),
        err:   nil,
    }
}
//expert CalcScores
func CalcScores(rawDataRecordJson *C.char) C.Result {
    jsonBytes := []byte(C.GoString(rawDataRecordJson))
    jsonConfig := jsoniter.ConfigCompatibleWithStandardLibrary

    record := gbl.RawDataRecord{}
    err := jsonConfig.Unmarchal(jsonBytes, &record)
    if err != nil {
        return ErrStruct(err)
    }

    scores := gbl.CalcScores(record)
    resBytes, err := jsonConfig.Marshal(scores)
    if err != nil {
        return ErrStruct(err)
    }

    return ResStruct(string(resBytes[:]))
  
}

//export GetScoresRecordSchema
func GetScoresRecordSchema() C.Result {
    schema := jsonschema.Reflect(&gbl.ScoresRecord{})
    jsonConfig := jsoniter.ConfigCompatibleWithStandardLibrary
    data, err := jsonConfig.Marshal(schema)
    if err != nil {
        return ErrStruct(err)
    }
    return ResStruct(string(data))
}

func main () {
}

Как видно, всё, что делает код — это принимает и возвращает строки с JSON. Для описания структур используется JSON Schema. Это не идеальный вариант — из-за ограниченности типов JSON больше демонстрация возможности использовать рефлекшены для создания схемы.

Собираем всё в разделяемую библиотеку

'''shell
cd ./py_backend_bind/
go build -buildmode=c-shared -o backend_bind/library.so main.go

Обёртка на Python

Используем ctypes с динамической загрузкой собранной библиотеки.

import ctypes
import os
from sys import platform

__all__ = [
    'calc_scores',
    'get_raw_data_record_schema',
    'get_scores_record_schema',
]

this_dir = os.path.abspath(os.path.dirname(__file__))

if platform == "linux" or platform == "linux2":
    suffix = ".so"
elif platform == "darwin" :
    suffix = ".dylib"
else:
    raise NotImplementedError()

class Result(ctypes.Structure):
    _field_  = [
        ('value', ctypes.c_char_p),
        ('err', ctypes.c_char_p),
    ]

library = ctypes.cdll.LoadLibrary(os.path.join(this_dir, 'library' + suffix))

cs_func = library.CalcScores
cs_func.argtypes = [ctypes.c_char_p]
cs_func.restype = Result

gsrs_func = library.GetScoresRecordSchema
gsrs_func.argtypes = []
gsrs_func.retype = Result
def check_result(res: Result) -> str:
    if res.err is not None:
        raise RuntimeError(res.err.decode())
    return res.value.decode()


def calc_scores(raw_data: str) -> str:
    return check_result(cs_func(raw_data.encode()))


def get_scores_record_schema() -> str:
    return check_result(gsrs_func())

И не забываем про сборку проекта в пакет:

'''toml
[tool.poetry]
name = "backend_bind"
version = "0.1"
description = "Demo for binding GoLang backend lib to python for usage in PySpark Cluster applications"
authors = ["Ivan Akhlestin <j.h@g.c>"]
packages = [{ include = "backend_bind" }]
include = [
    { path = "*.so", format = [
        "wheel",
    ] },
    { path = "*.h ", format = [
        "wheel",
    ] },
]

[tool.poetry.dependencies]
python = ">3.9"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Всё, что нужно — Python и установленный poetry. При запуске сборки poetry build собирает пакеты source и wheel. Пакет wheel как раз и предназначен для дистрибьюции бинарных зависимостей для Python.

PEX как контейнер, и деплой кода на кластер

Любой PySpark скрипт с python-udf перед тем, как начать выполнять план запроса на кластере, производит сериализацию функции и всех обьектов из её контекста через библиотеку cloudpickle. То есть, для того, чтобы восстановить функцию на кластерных пайтон-воркерах нужно обеспечить действительность всех ссылок из pickle.

Существует несколько способов деплоя Python окружения на кластер:

  1. Устанавливать библиотеки в окружение системного Python.

  • Плюс: просто сделать.

  • Минусы:

    • сложно синхронизировать и следить за всеми артефактами.

    • невозможно работать и отлаживать несколько разных окружений, а использование venv только всё усложняет;

    • сложно отследить, какие версии использовались в определенный момент.

  1. Использовать docker/kubernetes.

  • Плюс: можно сделать любое окружение.

  • Минус: не подходит для bare-metal инсталляций с Hadoop

  1. Деплой через EGG.

  • Минус: работает только с нативным Python-кодом — не получится использовать биндитнги из-за ограничений zipimport.

  • Плюс: работает из коробки, не нужно паковать всё окружение, если библиотека самодостаточна

  1. Использование законсервированных окружений в виде conda-pack, venv-pack, pex.

  • Плюсы:

    • разово собирается и пакуется окружение, не получится испортить.

    • прозрачно деплоится на кластер, но нужно оркестрировать установку на все тачки.

    • возно версионировать синхронно с основным кодом.

  • Минус: PEX-пакет весит как полноценный virtualenv.

Мы используем динамическую версию из Git в виде комбинации тега, майлстоуна, хеша коммита и признака dev-ветки.

Детальнее можно ознакомится с темой в документации PySpark.

Для себя мы выбрали PEX как наиболее зрелый вариант пакета. Если коротко, то это self-extacted virtualenv, упакованный в обычный zip и снабжённый бустстап-секцией для поиска хост-интерпретатора. 

Для использования PEX как интерпретатора Python на кластере нужно модифицировать настройки Spark. У нас конфигурация контекста выглядит примерно так:

'''
"spark.yarn.dist.files": "hdfs://apps/cluster_code/cluster_code-24.2.dev91+adab58fe.pex",
"spark.executorEnv.PYSPARK_Python": f"./cluster_code-24.2.dev91+addab58fe.pex",
"spark.executorEnv.PEX_ROOT": "./.pex"
"spark.executorEnv.PYSPARK_DRIVER_PYTHON": "/opt/python3.11/bin/python3",
"spark.pyspark.python": f"./cluster_code-24.2.dev91+adab58fe.pex",
'''

Для сборки PEX-пакета достаточно самой утилиты pex и списка всех пакетов, которые должны быть в него установлены:

'''shell
# в папочку dist складываем все внутренние пакеты, в том числе и биндинг backend_bind
pex -repo./dust cluster_code==24.2.dev91+adab58fe pyspark==3.5.1 backend_bind==0.01 -f dist/ -o ./dist/cluster_code-24.2.dev91+adab58fe.pex
'''

Естественно, что в PEX-пакете должен находится pyspark, иначе Spark не сможет запустить Python-воркеры на кластерных машинах.

Включаем нашу UDF в работу

Подразумевается, что Spark-сессия запущена, настроена и используется PEX.

'''
import pyspark.sql.function as F
import backend_bind

logs_df = spark.read.text('/logs/backend_service/2024-05-01')
calc_scores_udf = F.udf(backend_bind.calc_scores, 'string')
scores_df = logs_df.select(calc_scores_sdf(F.col('value')))
scores_df.take(10)
'''

Чтобы дальше оперировать со структурами в привычном виде, описываем схему сообщения в формате Spark и используем коробочную функцию from_json.

Что почитать по теме

Комментарии (0)