Привет! Меня зовут Александр Ледовский. Я тимлид команды аналитики и DS, строю рекламные аукционы в Авито. В работе мы активно используем Apache Spark. Одна из типовых задач аналитика — посчитать что-то на pySpark, а потом выгрузить это. Например:

  • маленькую табличку в экселе, чтобы сделать отчёт или презентацию;

  • большую таблицу в экселе или csv, чтобы отправить коллегам — до нескольких Гб;

  • большой датасет для обучения ML-модели — до 100 Гб.

Данная статья о том, как это правильно делать. В том числе, как правильно использовать функцию toPandas вместе с библиотекой pyArrow, когда toPandas использовать не стоит и какие ещё есть варианты.

Немного о toPandas

toPandas — основной инструмент получения семплов в PySpark и основной инструмент аналитика для просмотра данных. Вероятно, большинство из вас пользуется функцией df.limit(10).toPandas(). Она же подходит, чтобы сохранить таблицу в Excel. Например:

df = spark.read.orc('path_to_your_dataset')
pdf = df.toPandas()
pdf.to_excel('my_dataset.xlsx', index=False)

Всё нормально, если работать с небольшими датасетами. Когда они становятся большими, начинаются проблемы: toPandas тормозит, падает с ошибками.

Другая проблема: toPandas загружает данные в оперативную память Jupyter notebook. Если сделать toPandas большого датасета, это займёт много ресурсов и повредит работе целого сервера. А там ещё и могут работать коллеги.

Разберём, как правильно работать с toPandas.

Включите Arrow-оптимизацию

TL;DR Есть настройка, которая на порядок увеличит скорость работы toPandas. Она называется spark.sql.execution.arrow.pyspark.enabled

PySpark состоит из параллельно работающих JVM- (Java Virtual Machine) и Python-процессов. Подробнее об этом я писал в статье про выделение ресурсов →

Python и Java обмениваются друг с другом данными. Но из-за того, что каждый хранит их в своём формате, это происходит не быстро.

Arrow — формат хранения данных в памяти. Эта библиотека поддерживается одновременно и в Python, и в Java/Spark-процессах. Формат поддерживает быструю передачу данных по сети. Прирост скорости оказывается огромным по сравнению со стандартным механизмом обмена данных между Python и Java.

Чтобы включить Arrow-оптимизацию, нужно прописать параметры в конфиге:

app_name = "Your App"

spark_conf = {
    # another params

		# если у вас спарк >= 3.0
    'spark.sql.execution.arrow.pyspark.enabled': 'true',

	  # если у вас спарк < 3.0
		'spark.sql.execution.arrow.enabled': 'true'
}


builder = (
    SparkSession
    .builder
    .appName(app_name)
)

for k, v in spark_conf.items():
    builder.config(k, v)
    
spark = builder.getOrCreate()
spark.sql.execution.arrow.enabled

Подробная документация по использованию Arrow с PySpark →

Ограничения Arrow и как их обходить

У Arrow есть только один недостаток. Он не поддерживает некоторые типы данных. Вот выдержка из документации (на момент версии 3.3.2):

Currently, all Spark SQL data types are supported by Arrow-based conversion except ArrayType of TimestampType, and nested StructType. MapType is only supported when using PyArrow 2.0.0 and above.

Если использовать toPandas без Arrow, то в колонке получатся довольно неприятные питоновские типы. Например, в случае с MapType — pyspark.sql.types.Row

В случае со свежим Arrow MapType даст обычный dict. А при вложенной структуре вы получите предупреждение и pySpark откатится к неоптимизированному варианту:

UserWarning: toPandas attempted Arrow optimization because ...

Если у вас большой датасет, а отказываться от Arrow не хочется, можно использовать строковый формат:

pdf = (
    spark.read.orc()
    .select('col1', 'col2', F.to_json('bad_col_3').alias('bad_col_3'))
    .toPandas()
)
pdf['bad_col_3'] = pdf['bad_col_3'].apply(lambda x: json.loads(x))

Бенчмарки toPandas с Arrow-оптимизацией и без

Для сравнения я получил бенчмарки на одном из наших датасетов. Используемые ресурсы Spark: 60 экзекьюторов по 2 ядра.

Количество данных

Репартиционирование

Arrow

no Arrow

4m

Нет

9s

10s

40m

Нет

9s

15s

400m

Нет

9s

1m 10s

4g

Да

46s

11m 48s

16g

Да

2m 29s

45m 23s

40g

Да

6m 9s

??

Бенчмарки показывают, что Arrow дает прирост скорости примерно в 10 раз.

Я, конечно, показал, что можно скачать себе 40 Гб, но делать так не рекомендую, это довольно много. Во-первых, вам стоит предварительно согласовать с вашими дата-инженерами, что вы будете работать с большими датасетами на одной ноде. Во-вторых, большие датасеты надёжнее и безопаснее скачивать в виде файлов.

Используйте sample вместо limit для семплов

При выгрузке данных приходится формировать семплы. Если бы данные можно было выкачать целиком, то Spark не был бы нужен.

Для семплов мы часто используем limit. Но его можно использовать только для небольших семплов, примерно до 100 строк. Нам нравится limit, потому что мы можем чётко указать количество строк, которые хотим получить. Это удобно, но такую операцию непросто сделать параллельной.

Возьмём пример запроса:

df = spark.read.orc('some_path').limit(10).toPandas()

Запустим запрос и посмотрим его план на вкладке SQL в SparkUI. Подробнее о том, как смотреть план запроса, я расскажу в одной из следующих статей.

+- == Initial Plan ==
   Project (11)
   +- GlobalLimit (10)
      +- Exchange (9)
         +- LocalLimit (8)
            +- Scan orc  (1)

Как работает limit:

  • LocalLimit — limit(10) выполняется на каждой партиции ваших данных;

  • Exchange — результат сливается вместе в одну партицию (операция Exchange);

  • GlobalLimit — у результата опять выполняется limit(10)

То есть, если у вас 200 партиций и вы сказали limit(1000), то Spark соберет 200 семплов по 1000 строк, объединит их, выделит из них 1000 строк и отдаст вам.

Sample работает по-другому. Вы указываете долю строк, которую хотите получить в семпле. Эта операция параллелится и работает эффективно.

df.sample(fraction=0.01, seed=42).toPandas()

Умейте исправлять ошибки нехватки памяти при toPandas

Ошибка нехватка памяти на драйвере

Все данные toPandas идут в Python через JVM-процесс драйвера. Нужно, чтобы он смог принять весь объём данных. Если вам не хватит памяти, то вы получите следующую ошибку

23/03/06 22:19:05 ERROR TaskSetManager: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
/var/jhub/venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o75.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
...
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)

Решение. Увеличивать параметры spark.driver.memory и spark.driver.maxResultSize

Ошибка нехватки памяти на экзекьюторе

toPandas требует больше памяти, чем занимают ваши данные, поэтому может получиться так, что вам не хватит памяти на экзекьюторе. Например, вы можете увидеть ошибки в логах драйвера:

23/03/14 13:45:10 ERROR YarnScheduler: Lost executor 2 on avi-ix-spark41.msk.avito.ru: Container from a bad node: container_1676537640029_9205_01_000003 on host: avi-ix-spark41.msk.avito.ru. Exit status: 143. Diagnostics: [2023-03-14 13:45:08.771]Container killed on request. Exit code is 143
[2023-03-14 13:45:08.771]Container exited with a non-zero exit code 143. 
[2023-03-14 13:45:08.772]Killed by external signal

В логах экзекьютора:

2023-03-14 13:44:50,112 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
2023-03-14 13:44:50,114 ERROR executor.Executor: Exception in task 51.0 in stage 5.0 (TID 266)
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:244)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:53)
	at org.apache.spark.scheduler.DirectTaskResult$$Lambda$801/1910985021.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:657)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Подробнее о том, как смотреть логи экзекьютора, я расскажу в одной из следующих статей.

Решение. Не стоит сразу увеличивать память экзекьюторов. Скорее всего, у вас большие партиции. Сделайте репартиционирование:

# Было
spark.read.orc().toPandas()

# Стало
spark.read.orc().repartition(200).toPandas()

Выгружайте большие датасеты консольной утилитой HDFS DFS

В моих бенчмарках на toPandas вы видели, что я смог быстро выгрузить 40 Гб в локальный процесс. Но выгрузка больших файлов ненадёжна. И самое главное: вы можете навредить серверу и всем людям и процессам на нём.

Я советую использовать для выгрузки больших датасетов консольную утилиту HDFS. Алгоритм состоит из следующих шагов:

  1. Сохранить ваш датасет на HDFS в csv-формате, несколькими файлами по несколько Гб каждый.

  2. Скачать их на локальную файловую систему через консоль: в папку с вашим ноутбуком или куда-нибудь рядом.

  3. Прочитать датасет через pandas батчами.

df.repartition(10).write.mode('overwrite').csv('path_in_hdfs')
hdfs dfs -get path_in_hdfs .
path_list = os.listdir(local_path)
# pdfs = []
for path in path_list:
    pdf = pd.read_csv(path)
    # do something
.   # pdfs.append(pdf)

Существует питоновские библиотеки для работы с HDFS (например, HdfsCLI), но я не рекомендую ими пользоваться, т.к. они используют WebHDFS, который работает менее эффективно, чем нативный драйвер HDFS, который используется в консольной утилите.

Рекомендации

Я рассказал про подходы к работе с toPandas и альтернативный способ выгрузки данных через консоль. Подведём итоги:

  • если вы выгружаете только маленькие таблички для презентаций, то материалы этой статьи вам не понадобятся;

  • если вам периодически нужно выгрузить семплы от 100 Мб до нескольких Гб, то вам стоит использовать toPandas с включенным Arrow;

  • если вам необходимо выгрузить большой датасет, то лучше не лениться и сохранить его через HDFS и консольную утилиту. Используйте toPandas, только если знаете, что делаете.

Немного обо мне

Я занимаюсь разработкой аукционных механик в компании Авито, в том числе автобиддингом. Этот функционал используется в алгоритмах продвижения. Для обработки сложных вложенных структур поисковых логов мы в основном используем Spark, хотя основной DWH в Авито построен на Vertica и ClickHouse. До этого я работал в Сбере, где создал дата-команду для трайба малого и микробизнеса с собственным промышленным Hadoop-кластером.

Периодически я делюсь своими инсайтами и впечатлениями в своём телеграм-канале https://t.me/big_ledovsky. Буду рад ответить на вопросы по статье и вообще обсудить Apache Spark и алгоритмы анализа данных.

Предыдущая статья: Как я очень захотел перейти из фронтенда в бэкенд — и перешёл

Комментарии (12)


  1. sshikov
    09.06.2023 15:30
    +1

    Python и Java обмениваются друг с другом данными

    Чего только не сделают, чтобы не ходить на овощную базу (с) старый советский анекдот.


    А поделитесь, что вас держит при работе на спарке в связке с питоном? Скажем, скала дает легкий доступ к куче фич той же HDFS, к Hive Metastore, и многому другому. Я помню, как тут кто-то опубликовал кусок кода на питоне строк на 50, вся цель которого была в том, чтобы построить список схем или таблиц. И что в скале со спарком делается буквально в две строки (в реальности — в одну). Готов допустить, что питоновское решение там было не самое эффективное, но разница в 25 раз...


    Понятно что всегда есть старый опыт, и если знаешь питон — то наверное проще на нем. Но моя практика показывает, что для эффективного пользования спарк шеллом достаточно знать скалу в пределах процентов от всех возможностей языка, то есть за неделю вполне можно научиться. И не иметь множества проблем связки питон — Java, который в хадупе таки мешают жить довольно сильно.


    1. aledovskiy Автор
      09.06.2023 15:30
      +1

      Привет! Спасибо за комментарий)

      Да, я согласен, что скала для спарка, это несколько процентов от скалы. Я на самом деле тоже оч топлю за скалу. Но порог входа все-таки высокий. Я причем имею выборку из довольно большого количества людей, которые учились писать на спарке, в том числе переходили с питона на скалу. Когда я еще в Сбере работал, мы обучили порядка 50 человек, примерно половина из которых до этого не умела писать на питоне особо даже (sql+excel)

      Скала к тому же не покрывает всех задач аналитиков и ds-ов. Обычный пайплайн: собрал витринку, сделал toPandas, строишь графики обычным питоновским стеком, модели обучаешь и тд. Дмитрий Бугайченко я помню активно топил за анализ данных прямо на скале (https://habr.com/ru/companies/vk/articles/442688/ например) и даже в ньюпролабах был такой курс. Но это все-таки не мейнстрим.


      1. sshikov
        09.06.2023 15:30
        +1

        Но порог входа все-таки высокий.

        Ну я 50 человек не учил, у нас всего примерно пятеро подопытных, большинство из которых аналитики, то есть не разработчики (то есть примерно с таким же опытом, как у вас, наверное). В принципе, необходимый уровень скалы — это умение писать функции (по сути — просто оформлять код как функцию), и их комбинировать, практически так же, как в спарке — то есть, map/flatMap/filter, где-то так. Изредка бывают нужны коллекции. Остальное что нужно — ну это вообще базовые вещи типа переменных, типов данных (числовые и строки), регулярки… то есть это может непривычно — но вообще не сложно.


        1. aledovskiy Автор
          09.06.2023 15:30

          Да, полностью согласен. Функциональное программирование изучать не нужно)


    1. aledovskiy Автор
      09.06.2023 15:30

      Во времена моей работы в Сбере, у нас все продовые витрины были на скале, и на скале писали ноутбуки аналитики из команды витрин. Но все остальные сидели на pySpark.

      В Авито же мы сидим только на pySpark, ибо количество людей тут меньше и объем работы на человека больше. Дата инженеров мало. Аналитик сам должен докатить витрину до airflow и даже тесты написать. Нашими инженерами было принято решение дополнительную технологию пока не затаскивать. Хотя смотря на некоторые монструозные питоновские udf я кусаю локти))


      1. sshikov
        09.06.2023 15:30
        +1

        Понятно. Ну да, наличие людей и знание ими технологий — это очень важный факто, который определяет выбор этих самых технологий.


        строишь графики обычным питоновским стеком, модели обучаешь и тд

        Про графики я почти ничего не знаю, как-то давно не приходилось сталкиваться (но как по мне, неужто для скалы нельзя найти аналоги?). А вот про модели — Spark ML же есть, он прямо совсем-совсем не тянет на замену питоновским инструментам? (Я далек от ML — поэтому вопрос может быть глупый).


        1. aledovskiy Автор
          09.06.2023 15:30

          Какие-то библиотеки для скалы конечно есть, но по уровню их развития и сообщества несравнимы. Можно сказать, что питоновские либы отлажены на сотнях тысяч если не миллионах пользователей.


          1. sshikov
            09.06.2023 15:30

            Ну меня скорее удивляет то, что многие "питоновские" либы — они же на самом деле написаны на C, к примеру. Или вообще может на фортране :) Ну то есть, сделать к ним еще одну обертку для скалы — в общем задача не сильно сложная. Должна бы быть.


            1. aledovskiy Автор
              09.06.2023 15:30

              Да, на C много low-level алгоритмов. Но это как правило специальный код, где используются питоновские типы данных. Вот, например, преобразование фурье в numpy. Там его ну никак не отделишь..

              Есть библиотеки вроде xgboost или catboost, который написан на cpp и имеют биндинги к нескольким языкам. У них есть версии для JVM. Но это довольно ограниченный набор библиотек

              Но также очень много кода (интерфейсного) написано на чистом питоне. И тут вообще никак не отделить


        1. aledovskiy Автор
          09.06.2023 15:30

          Ну и Spark ML - оч достойный, но узкоспециализированный фрейморк. Он содержит небольшой набор алгоритмов. Популярность не оч высокая. Алгоритмы только параллельные (а не все алгоритмы хорошо параллелятся, поэтому параллелизация дает трейдоффы)
          Есть конечно, например, catboost для spark, который мы даже запускали. Но там не все фичи поддерживаются и мы отказались в итоге


    1. illuminati16
      09.06.2023 15:30
      +1

      Я думаю, основное непонимание возникает из-за различий в специфике работы. Для аналитики разница в уровне проработанности библиотек и развитости коммьюнити между Python и Scala колоссален. Если брать базовую работу аналитика - сделать несколько запросов в БД, определить алгоритм соединения таблиц, в процессе несколько раз проверить count-ы в разных разрезах, вывести сэмплы и подготовить финальную выборку для дальнейшего анализа, то автоматом приходим к тому, что писать надо в ноутбуке, а исторически это Python, хотя все вышеперечисленное можно с успехом написать и на Scala. Но найти на рынке умеющих в Python гораздо легче и можно работать сразу, не перекчивая.

      Но вот что касается дальнейшей работы аналитика - построить несколько графиков с выводами, вызвать несколько статистических либ для проведения А/В- теста, или тем более обучить модель - нормальных либ на Scala на данный момент почему-то нет, очевидно из-за комьюнити. И делать это в разных ноутбуках - в одном готовить данные, а в другом обрабатывать - не слишком удобно.

      При этом в некоторых кейсах Scala дает кратное преимущество. Например недавно решали DS задачу на графах, и на Scala получилось практически за линейное время обсчитать сотни джойнов с использованием внутреннего rdd-представления - но такие задачи все же возникают редко. Зато чаще возникает необходимость обсчитать например трендовые фичи и в Scala начинаются танцы с бубнами там, где в Python одна строчка)


      1. aledovskiy Автор
        09.06.2023 15:30

        Да, полностью согласен!

        А что за задача если не секрет?)