Правильное построение ETL-процессов (преобразования данных) — сложная задача, а при большом объёме обрабатываемых данных неизбежно возникают проблемы с ресурсами. Поэтому нам требуется выискивать новые архитектурные решения, способные обеспечить стабильность расчётов и доступность данных, а при необходимости и масштабируемость — с минимальными усилиями.

Когда я пришел в Ozon, мне пришлось столкнуться с огромным количеством ETL-джоб. Прежде чем применить модель машинного обучения, сырые данные проходят множество этапов обработки. А само применение модели (то, ради чего существует команда) занимает всего 5% времени.

Всем привет! Меня зовут Алексей, и в Ozon я занимаюсь матчингом. Что такое матчинг и зачем он нужен, мой коллега @alex_golubev13 объяснил в статье «Векторное представление товаров Prod2Vec». 

Ежедневно у нас добавляются сотни тысяч новых товаров, а также меняются те, которые уже есть на сайте. Это могут быть изменения картинок, описаний, названий или цен. Процесс ETL в данном случае заключается в извлечении признаков из товаров, которые появились или обновились в течение заданного промежутка времени (на данный момент за день). Данные мы забираем из HDFS и Hive, а для работы с ними используем PySpark.

Сразу скажу, что большую часть ресурсов и времени в ETL занимает обработка изображений и текстовых данных. Так, каждое изображение проходит через несколько свёрточных нейронных сетей, которые возвращают векторное представление для картинки (эмбеддинг). Для текста — та же схема.

Сначала ETL-процесс состоял из batch-джоб, которые брали партиции данных за конкретную дату и целиком её обрабатывали. Понятно, что с ростом числа товаров они будут работать всё дольше и дольше, а объём потребляемых ресурсов будет только расти. Особенно заметно это во время действия акций и сезонных распродаж — тогда часто меняется цена и добавляется много новых товаров. В такие моменты приходилось значительно поднимать память для приложения. К тому же процесс стал занимать слишком много времени — и весь остальной пайплайн был вынужден ждать завершения ETL. Всё закончилось тем, что на количество товаров, проходящих через ETL, выставлялся лимит, и максимально туда шла треть всех обновившихся товаров. Понятно, что при таком подходе очередь товаров, которые не проходят через пайплайн, будет стремительно расти.

Для того чтобы избежать большой очереди, мы решили никогда не останавливать наш пайплайн ETL — он теперь работает постоянно. Так мы пришли к Spark Structured Streaming.

Как у нас всё работает

Spark Structured Streaming позволяет работать с потоковыми данными, при этом можно использовать все преимущества Spark SQL. Теперь все обновления едут в Kafka-топик, а Streaming Session читает данные из него, обрабатывает и складывает в HDFS. Затем раз в день мы забираем эти данные и обновляем таблицы, которые являются результатом ETL. Таким образом, можем не выставлять лимит на количество обрабатываемых товаров в день, получая обновления равномерно в течение суток. Эмпирическим путём выяснили, что за день стриминг способен обрабатывать около 20 млн изображений и 50 млн текстовых объектов.

В качестве нейросетей для получения эмбеддингов используем BERT, ResNet50, fastText, NFNet, а с недавнего времени также считаем эмбеддинги для модели Prod2Vec.

Если вы работаете с PySpark (или просто со Spark), то наверняка знаете о возможности создания пользовательских функций (user-defined functions, UDF). На данный момент PySpark позволяет использовать три вида таких функций: Python UDF, Pandas UDF, Scala UDF. Об основных отличиях и бенчмарках можно прочитать в этой статье, а я лишь скажу, что для инференса моделей используем Pandas UDF.

Давайте на примере рассмотрим, как можно выполнять инференс ML-моделей с использованием PySpark Structured Streaming и Pandas UDF, а в качестве источника сообщения используем Kafka. Весь код ниже актуален для PySpark 3.X.

Для начала — небольшой ликбез по основной терминологии Kafka.

Kafka — это инструмент, который позволяет работать с потоками событий. Например, есть приложение, которое пишет много логов. Хочется иметь к ним быстрый доступ, чтобы анализировать и делать какие-то выводы или просто сохранять в базу данных. Kafka в этом случае — посредник между приложением, которое пишет логи, и приложением или человеком, читающим эти логи.

Чтобы ориентироваться в Kafka-терминологии из данной статьи, необходимо знать про три вещи. На примере приложения с логами определим Producer, Consumer и Topic:

  1. Topic показывает, где будут храниться логи в Kafka. Можно представить, что Topic — это папка, а каждый лог в нём —файл из этой папки. У каждого объекта (лога, сообщения) есть свой индекс (offset). Kafka также партиционирует топик, разбивая его на несколько частей и раскидывая по Kafka-кластеру.

  2. Producer в данном случае будет записывать логи в Topic. Он создаётся в логируемом приложении и пишет всё, что ему скажет пользователь.

    1. log = get_log()

    2. producer.produce(log, topic)

  3. Consumer будет читать сообщения, которые находятся в топике. Он создаётся в приложении для чтения и обработки логов. При этом один топик могут читать сразу несколько консьюмеров.

Начинаем

Представим, что есть Kafka-топик, куда поступает событие изменения описания товара на сайте или добавления нового. Необходимо обрабатывать все такие события и извлекать необходимую информацию из текстов. Скажем, что известна схема сообщений (protobuf-схема), которые находятся в топике (ID и описание товара), а к текстам мы хотим применять какую-то ML-модель, которая возвращает эмбеддинг из текста.

syntax = "proto3";

message ItemText{
	int64 item_id = 1;
	string item_text = 2;
}

Чтобы десериализовывать proto-сообщения в Python, необходимо создать .py-файл из .proto-файла. Я это делаю командой protoc --python_out=. <filename>.proto.

Если вы не знакомы с Protobuf, то можно перейти по ссылке — и буквально за 30 секунд понять, что это :)

Объявим функцию, которая будет создавать и возвращать текстовую модель:

class TextModel:
		...
		def predict(self, x):
			# your code
			return model_prediction
		...

def get_text_model(**kwargs) -> TextModel:
		# your code
		return text_model

Определим функции process_text, которая будет добавлять колонку “embedding” к входным данным, и get_dataframe_from_messages, которая десериализует сообщения.

# types_pb2 как раз создается из .proto
from types_pb2 import ItemText

def get_dataframe_from_messages(messages: pd.Series) -> pd.DataFrame:
		proto_buffer = ItemText()
    schema = [
        "item_id",
        "item_text",
    ]
    columns = {col: [] for col in schema}
    for msg in data:
        data = proto_buffer.FromString(msg)
        for col in columns:
            columns[col].append(getattr(data, col))
		return pd.DataFrame(columns)

@F.pandas_udf("item_id int, item_text string, embedding array<float>")
def process_text(data: pd.Series) -> pd.DataFrame:
		model = get_text_model(**kwargs)
		data = get_dataframe_from_messages(data)
		data["embedding"] = data["item_text"].apply(lambda x: model.predict(x))
		return data

Функция обработки сообщения моделью готова. Теперь нужно научиться работать с топиком и поднять Spark Session для стриминга.

Создадим сессию и подпишемся на конкретный топик:

spark = SparkSession.builder.getOrCreate()
df = (
	  spark.readStream
		.format("kafka")
	  .option("kafka.bootstrap.servers", bootstrap_servers)
	  .option("subscribe", topic)
)

Для контроля сессии также рекомендуется выставлять дополнительные параметры. Список таких параметров доступен в документации. Наиболее важными, на мой взгляд, являются:

  • maxOffsetsPerTrigger — отвечает за количество сообщений, которые попадают в батч;

  • minPartitions — указывает число партиций, на которое разбивается этот самый батч. Неправильный выбор этого параметра может значительно замедлять стриминг в micro-batch режиме.

Рассмотрим пример, когда у топика есть три партиции. Тогда по умолчанию в параметрах контекста будет minPartitions = 3. Это значит, что на каждый экзекутор прилетит по одной партиции, и текущий батч будет состоять из трёх задач. 

Так, а что делать, если хочется повысить производительность стриминг-джобы? Какой параметр нужно изменить?

Окей, вы добавляете больше экзекуторов в Spark-приложение в надежде, что оно ускорится. К сожалению, оно не ускоряется ☹️ Что же происходит в действительности?

Добавляется один экзекутор, для которого просто нет данных, так как в параметрах контекста значение minPartitions выставлено по умолчанию. В итоге такой экзекутор простаивает. Можно ли тогда изменить число партиций в Kafka, чтобы оно совпадало с количеством экзекуторов? Да, так тоже можно сделать, но только если у вас есть доступ к настройкам топика ????  

Поэтому лучше сделать по-другому: выставить значение minPartitions = 6 и количество экзекуторов тоже 6 — тогда каждая партиция в Kafka будет разбиваться Spark’ом на две подпартиции и спокойно скейлиться на шесть экзекуторов, которые смогут параллельно выполнять задачи. Здесь важно отметить, что сами сообщения из топика не шафлятся. Драйвер просто отдаёт офсеты каждому экзекутору, где создаётся консьюмер для их чтения.

Параметр maxOffsetPerTrigger позволяет контролировать, сколько сообщений (офсетов) из топика попадёт в текущий батч. Здесь тоже важно соблюдать баланс, так как слишком большой батч может долго обрабатываться (и в случае падения придётся его пересчитывать), а слишком маленький батч создаст много маленьких файлов, что не очень хорошо для HDFS и нагружает неймноду.

Что за неймнода?

Неймнода на Hadoop-кластере хранит информацию о дереве файлов и директорий, а также знает, где лежит тот или иной файл. В случае если неймнода перестаёт работать, падает весь кластер :)

Зададим и применим указанные параметры:

kafka_params = {
	 "maxOffsetPerTrigger": 100_000,
	 "minPartitions": 6
}
for k, v in kafka_params.items():
		df = df.option(k, v)
df = df.load()

После выполнения load() можно работать с df как с обычным DataFrame, применяя к нему привычные SQL-трансформации.

Посмотрим на схему df:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

В данный момент нас интересуют сами сообщения. Они лежат в поле value. К этому полю мы применяем Pandas UDF. UDF вернёт структуру, которую можно распаковать через звёздочку, — и получить готовый для записи DataFrame.

query = (
    df.withColumn("result", process_text(F.col("value").cast("binary")))
    .select("result.*")
    .writeStream
    .foreachBatch(do_smth)
)

Чтобы записывать данные в HDFS или просто увидеть результат в логах, можно создать функцию, которая будет вызываться для каждого кусочка данных, и передать её в foreachBatch.

def do_smth(df, *args):
		df.show(10)

В этой функции просто смотрим на десять строк данных, которые приезжают из Pandas UDF. Теперь метод start() начинает чтение из потокового источника информации (в данном случае — из Kafka-топика).

query = query.start()

Существует множество способов завершения Spark Structured Streaming — от перезагрузки кернела (в случае Jupyter Notebook) до graceful shutdown c реализацией на Scala (тык). В этой статье я не буду подробно разбирать каждый, а просто покажу тот, который используем мы.

is_stopped = False
while not is_stopped:
    is_stopped = query.awaitTermination(timeout=timeout)
    if not is_stopped and exists(indicator_path):
        logger.info("Received stop indicator, stopping query...")
        query.stop()

Что здесь происходит? Каждые timeout секунд идём в HDFS и смотрим, есть ли там файл-индикатор indicator_path. Как только он появляется, стриминг завершается.

Таким образом, получаем работающее Spark Structured Streaming приложение, где в качестве источника данных выступает Kafka-топик с обновлениями, а для обработки используется текстовая ML-модель.


Daemon Module

Данный кусок относится не только к Spark Structured Streaming, но и к работе с Pandas UDF в целом.

Внимательный читатель может заметить следующее: Pandas UDF работает батчами, размер которых контролируется через spark.sql.execution.arrow.maxRecordsPerBatch. Неужели на каждом батче приходится инициализировать модель заново? Хорошо, если загрузка модели занимает одну-две секунды. Но при работе с тяжёлыми моделями (например, из Hugging Face) или загрузке больших структур данных инициализация может занимать гораздо больше времени.

Да, в этом месте возникает большой оверхед, но его можно избежать :)

Было бы хорошо единожды создать объект на экзекуторе, а потом передавать ссылку на него в каждый дочерний процесс (на воркеры). Такая стратегия называется copy-on-write. Как понятно из названия, копирование объекта происходит, только когда дочерний процесс пытается изменить объект по ссылке. Также у Spark Session есть параметр spark.python.daemon.module. Он указывает на модуль, который стартует при появлении нового экзекутора. Данный модуль запустится один раз на экзекуторах и форкнет процесс для запуска воркеров.

Для загрузки ML-модели как раз используем свой daemon-модуль. Чтобы организовать copy-on-write, необходимо ограничить запуск дочернего процесса форком, а также заимпортить модуль, в котором лежит загрузка модели.

Например, так:

multiprocessing.set_start_method("fork")
module = importlib.import_module(module_path)

Важно, чтобы при импорте модуля сама загрузка модели триггернулась.

Затем кладём в кэш импортированный модуль через sys.modules:

sys.modules[module_name] = module

Теперь можно запустить стандартный daemon manager, который будет форкать родительский процесс. При этом пространство sys.modules у дочерних процессов будет идентично родительскому.

После этого можно импортировать нужную модель прямо из модуля, избегая процесса её инициализации (благодаря системе импортов в Python, когда первым делом проверяется sys.modules).

@F.pandas_udf("item_id int, item_text string, embedding array<float>")
def process_text(data: pd.Series) -> pd.DataFrame:
		from daemon_module import model
		data = get_dataframe_from_messages(data)
		data["embedding"] = data["item_text"].apply(lambda x: model.predict(x))
		return data

Таким образом можно реализовать потоковую обработку данных в PySpark с использованием Pandas UDF. Такой подход позволяет не только ускорить инициализацию модели, но и уменьшить потребление памяти (!), поскольку все дочерние процессы используют объект из родительского процесса без копирования.


Что в итоге

Так, обработка данных происходит постоянно, а затраты памяти не зависят от количества новых и обновившихся товаров, поскольку есть ограничение на число объектов сверху в одном батче (maxOffsetsPerTrigger). При возникновении большого лага (в топик идёт слишком много товаров, стриминг не успевает их обработать) можно увеличить количество экзекуторов, тем самым ускорив обработку батча. 

Но как определить, что лаг увеличивается или уменьшается? :/ Можно воспользоваться стандартными средствами Spark UI. Идём в Application Master и видим такую картину:

Окей, вроде понятно, че-то считаем :)

Но как быть, если хотим сделать мониторинг стриминга в Grafana или, например, алерты, которые скажут о критическом значении лага для консьюмера в топике? Об этом поговорим во второй части ????

Полезные материалы

  1. Дока по Spark Structured Streaming

  2. Интеграция с Kafka

  3. Про память в Spark приложениях (немного оффтоп, но если часто работаете со Спарком — тут много полезной инфы)

А пока приглашаю вас на открытый Data Science Meetup в апреле, где обсудим мироустройство DS в крупной IT-компании.

Следите за анонсами здесь, в Телеграме или на Таймпеде и приходите в гости!

Комментарии (4)


  1. borisd_ru
    24.03.2022 23:27

    А как конкретно spark разбивает партиции на "подпартиции"? Кафка в рамках консьюмер группы позволяет читать партицию только одному консьюмеру: https://kafka.apache.org/documentation/#design_consumerposition

    Мониторите ли вы системные метрики кафки? презентация по метрикам от confulent

    Если да, и если не жалко рассказать, то:

    • Какая нагрузка на кафку у вас в bytes/sec

    • Какой при этом LocalTimeMs

    Я столкнулся с тем, что не могу найти в интернетах, чтоб кто-то этими метриками делился, поэтому, пока не могу понять, норм ли это, иметь значения >1s.


    1. XHuviX Автор
      26.03.2022 15:07

      На каждом триггере получаем range оффсетов для конкретной партиции. Смотрим на параметр minPartition и пытаемся побить этот range на несколько маленьких частей, чтобы каждый воркер получил свою таску.

      Условно есть экзекутор получивший 500 оффсетов, но у него есть два воркера (в терминах данной статьи), здесь просто бьем на два range размера 250 и отдаем их каждому воркеру.

      https://github.com/apache/spark/blob/b11252806d49ed7915744147216db41a747e7d4d/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala#L58

      По поводу второго вопроса: такие метрики я не мониторил. Посмотрю, если на забуду :)


  1. netcitizen
    25.03.2022 10:21
    +1

    Отличная статья, спасибо!


  1. yurkao
    25.03.2022 19:29

    про грейсфул шатдаун интересно (особенно в Python), только что за exists(indicator_path)? проверка есть ли "специальный" файл, типа "передачу закончил - можешь выключаться"?

    если так, то почему не лучше ли поллить на наличие входных данных черезquery.lastProgress['numInputRows']/query.recentProgress?