Привет! Меня зовут Александр Ледовский. Я тимлид команды аналитики и DS, строю рекламные аукционы в Авито. В работе мы активно используем Apache Spark. Одна из типовых задач аналитика — посчитать что-то на pySpark, а потом выгрузить это. Например:
маленькую табличку в экселе, чтобы сделать отчёт или презентацию;
большую таблицу в экселе или csv, чтобы отправить коллегам — до нескольких Гб;
большой датасет для обучения ML-модели — до 100 Гб.
Данная статья о том, как это правильно делать. В том числе, как правильно использовать функцию toPandas вместе с библиотекой pyArrow, когда toPandas использовать не стоит и какие ещё есть варианты.
Немного о toPandas
toPandas — основной инструмент получения семплов в PySpark и основной инструмент аналитика для просмотра данных. Вероятно, большинство из вас пользуется функцией df.limit(10).toPandas(). Она же подходит, чтобы сохранить таблицу в Excel. Например:
df = spark.read.orc('path_to_your_dataset')
pdf = df.toPandas()
pdf.to_excel('my_dataset.xlsx', index=False)
Всё нормально, если работать с небольшими датасетами. Когда они становятся большими, начинаются проблемы: toPandas тормозит, падает с ошибками.
Другая проблема: toPandas загружает данные в оперативную память Jupyter notebook. Если сделать toPandas большого датасета, это займёт много ресурсов и повредит работе целого сервера. А там ещё и могут работать коллеги.
Разберём, как правильно работать с toPandas.
Включите Arrow-оптимизацию
TL;DR Есть настройка, которая на порядок увеличит скорость работы toPandas. Она называется |
PySpark состоит из параллельно работающих JVM- (Java Virtual Machine) и Python-процессов. Подробнее об этом я писал в статье про выделение ресурсов →
Python и Java обмениваются друг с другом данными. Но из-за того, что каждый хранит их в своём формате, это происходит не быстро.
Arrow — формат хранения данных в памяти. Эта библиотека поддерживается одновременно и в Python, и в Java/Spark-процессах. Формат поддерживает быструю передачу данных по сети. Прирост скорости оказывается огромным по сравнению со стандартным механизмом обмена данных между Python и Java.
Чтобы включить Arrow-оптимизацию, нужно прописать параметры в конфиге:
app_name = "Your App"
spark_conf = {
# another params
# если у вас спарк >= 3.0
'spark.sql.execution.arrow.pyspark.enabled': 'true',
# если у вас спарк < 3.0
'spark.sql.execution.arrow.enabled': 'true'
}
builder = (
SparkSession
.builder
.appName(app_name)
)
for k, v in spark_conf.items():
builder.config(k, v)
spark = builder.getOrCreate()
spark.sql.execution.arrow.enabled
Подробная документация по использованию Arrow с PySpark →
Ограничения Arrow и как их обходить
У Arrow есть только один недостаток. Он не поддерживает некоторые типы данных. Вот выдержка из документации (на момент версии 3.3.2):
Currently, all Spark SQL data types are supported by Arrow-based conversion except ArrayType of TimestampType, and nested StructType. MapType is only supported when using PyArrow 2.0.0 and above. |
Если использовать toPandas без Arrow, то в колонке получатся довольно неприятные питоновские типы. Например, в случае с MapType — pyspark.sql.types.Row
В случае со свежим Arrow MapType даст обычный dict. А при вложенной структуре вы получите предупреждение и pySpark откатится к неоптимизированному варианту:
UserWarning: toPandas attempted Arrow optimization because ...
Если у вас большой датасет, а отказываться от Arrow не хочется, можно использовать строковый формат:
pdf = (
spark.read.orc()
.select('col1', 'col2', F.to_json('bad_col_3').alias('bad_col_3'))
.toPandas()
)
pdf['bad_col_3'] = pdf['bad_col_3'].apply(lambda x: json.loads(x))
Бенчмарки toPandas с Arrow-оптимизацией и без
Для сравнения я получил бенчмарки на одном из наших датасетов. Используемые ресурсы Spark: 60 экзекьюторов по 2 ядра.
Количество данных |
Репартиционирование |
Arrow |
no Arrow |
4m |
Нет |
9s |
10s |
40m |
Нет |
9s |
15s |
400m |
Нет |
9s |
1m 10s |
4g |
Да |
46s |
11m 48s |
16g |
Да |
2m 29s |
45m 23s |
40g |
Да |
6m 9s |
?? |
Бенчмарки показывают, что Arrow дает прирост скорости примерно в 10 раз.
Я, конечно, показал, что можно скачать себе 40 Гб, но делать так не рекомендую, это довольно много. Во-первых, вам стоит предварительно согласовать с вашими дата-инженерами, что вы будете работать с большими датасетами на одной ноде. Во-вторых, большие датасеты надёжнее и безопаснее скачивать в виде файлов.
Используйте sample вместо limit для семплов
При выгрузке данных приходится формировать семплы. Если бы данные можно было выкачать целиком, то Spark не был бы нужен.
Для семплов мы часто используем limit. Но его можно использовать только для небольших семплов, примерно до 100 строк. Нам нравится limit, потому что мы можем чётко указать количество строк, которые хотим получить. Это удобно, но такую операцию непросто сделать параллельной.
Возьмём пример запроса:
df = spark.read.orc('some_path').limit(10).toPandas()
Запустим запрос и посмотрим его план на вкладке SQL в SparkUI. Подробнее о том, как смотреть план запроса, я расскажу в одной из следующих статей.
+- == Initial Plan ==
Project (11)
+- GlobalLimit (10)
+- Exchange (9)
+- LocalLimit (8)
+- Scan orc (1)
Как работает limit:
LocalLimit —
limit(10)
выполняется на каждой партиции ваших данных;Exchange — результат сливается вместе в одну партицию (операция Exchange);
GlobalLimit — у результата опять выполняется
limit(10)
То есть, если у вас 200 партиций и вы сказали limit(1000)
, то Spark соберет 200 семплов по 1000 строк, объединит их, выделит из них 1000 строк и отдаст вам.
Sample работает по-другому. Вы указываете долю строк, которую хотите получить в семпле. Эта операция параллелится и работает эффективно.
df.sample(fraction=0.01, seed=42).toPandas()
Умейте исправлять ошибки нехватки памяти при toPandas
Ошибка нехватка памяти на драйвере
Все данные toPandas идут в Python через JVM-процесс драйвера. Нужно, чтобы он смог принять весь объём данных. Если вам не хватит памяти, то вы получите следующую ошибку
23/03/06 22:19:05 ERROR TaskSetManager: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
/var/jhub/venv/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
An error occurred while calling o75.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1536.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
...
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Решение. Увеличивать параметры spark.driver.memory и spark.driver.maxResultSize
Ошибка нехватки памяти на экзекьюторе
toPandas требует больше памяти, чем занимают ваши данные, поэтому может получиться так, что вам не хватит памяти на экзекьюторе. Например, вы можете увидеть ошибки в логах драйвера:
23/03/14 13:45:10 ERROR YarnScheduler: Lost executor 2 on avi-ix-spark41.msk.avito.ru: Container from a bad node: container_1676537640029_9205_01_000003 on host: avi-ix-spark41.msk.avito.ru. Exit status: 143. Diagnostics: [2023-03-14 13:45:08.771]Container killed on request. Exit code is 143
[2023-03-14 13:45:08.771]Container exited with a non-zero exit code 143.
[2023-03-14 13:45:08.772]Killed by external signal
В логах экзекьютора:
2023-03-14 13:44:50,112 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
2023-03-14 13:44:50,114 ERROR executor.Executor: Exception in task 51.0 in stage 5.0 (TID 266)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:244)
at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:53)
at org.apache.spark.scheduler.DirectTaskResult$$Lambda$801/1910985021.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:657)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Подробнее о том, как смотреть логи экзекьютора, я расскажу в одной из следующих статей.
Решение. Не стоит сразу увеличивать память экзекьюторов. Скорее всего, у вас большие партиции. Сделайте репартиционирование:
# Было
spark.read.orc().toPandas()
# Стало
spark.read.orc().repartition(200).toPandas()
Выгружайте большие датасеты консольной утилитой HDFS DFS
В моих бенчмарках на toPandas вы видели, что я смог быстро выгрузить 40 Гб в локальный процесс. Но выгрузка больших файлов ненадёжна. И самое главное: вы можете навредить серверу и всем людям и процессам на нём.
Я советую использовать для выгрузки больших датасетов консольную утилиту HDFS. Алгоритм состоит из следующих шагов:
Сохранить ваш датасет на HDFS в csv-формате, несколькими файлами по несколько Гб каждый.
Скачать их на локальную файловую систему через консоль: в папку с вашим ноутбуком или куда-нибудь рядом.
Прочитать датасет через pandas батчами.
df.repartition(10).write.mode('overwrite').csv('path_in_hdfs')
hdfs dfs -get path_in_hdfs .
path_list = os.listdir(local_path)
# pdfs = []
for path in path_list:
pdf = pd.read_csv(path)
# do something
. # pdfs.append(pdf)
Существует питоновские библиотеки для работы с HDFS (например, HdfsCLI), но я не рекомендую ими пользоваться, т.к. они используют WebHDFS, который работает менее эффективно, чем нативный драйвер HDFS, который используется в консольной утилите.
Рекомендации
Я рассказал про подходы к работе с toPandas и альтернативный способ выгрузки данных через консоль. Подведём итоги:
если вы выгружаете только маленькие таблички для презентаций, то материалы этой статьи вам не понадобятся;
если вам периодически нужно выгрузить семплы от 100 Мб до нескольких Гб, то вам стоит использовать toPandas с включенным Arrow;
если вам необходимо выгрузить большой датасет, то лучше не лениться и сохранить его через HDFS и консольную утилиту. Используйте toPandas, только если знаете, что делаете.
Немного обо мне
Я занимаюсь разработкой аукционных механик в компании Авито, в том числе автобиддингом. Этот функционал используется в алгоритмах продвижения. Для обработки сложных вложенных структур поисковых логов мы в основном используем Spark, хотя основной DWH в Авито построен на Vertica и ClickHouse. До этого я работал в Сбере, где создал дата-команду для трайба малого и микробизнеса с собственным промышленным Hadoop-кластером.
Периодически я делюсь своими инсайтами и впечатлениями в своём телеграм-канале https://t.me/big_ledovsky. Буду рад ответить на вопросы по статье и вообще обсудить Apache Spark и алгоритмы анализа данных.
Предыдущая статья: Как я очень захотел перейти из фронтенда в бэкенд — и перешёл
sshikov
Чего только не сделают, чтобы не ходить на овощную базу (с) старый советский анекдот.
А поделитесь, что вас держит при работе на спарке в связке с питоном? Скажем, скала дает легкий доступ к куче фич той же HDFS, к Hive Metastore, и многому другому. Я помню, как тут кто-то опубликовал кусок кода на питоне строк на 50, вся цель которого была в том, чтобы построить список схем или таблиц. И что в скале со спарком делается буквально в две строки (в реальности — в одну). Готов допустить, что питоновское решение там было не самое эффективное, но разница в 25 раз...
Понятно что всегда есть старый опыт, и если знаешь питон — то наверное проще на нем. Но моя практика показывает, что для эффективного пользования спарк шеллом достаточно знать скалу в пределах процентов от всех возможностей языка, то есть за неделю вполне можно научиться. И не иметь множества проблем связки питон — Java, который в хадупе таки мешают жить довольно сильно.
aledovskiy Автор
Привет! Спасибо за комментарий)
Да, я согласен, что скала для спарка, это несколько процентов от скалы. Я на самом деле тоже оч топлю за скалу. Но порог входа все-таки высокий. Я причем имею выборку из довольно большого количества людей, которые учились писать на спарке, в том числе переходили с питона на скалу. Когда я еще в Сбере работал, мы обучили порядка 50 человек, примерно половина из которых до этого не умела писать на питоне особо даже (sql+excel)
Скала к тому же не покрывает всех задач аналитиков и ds-ов. Обычный пайплайн: собрал витринку, сделал toPandas, строишь графики обычным питоновским стеком, модели обучаешь и тд. Дмитрий Бугайченко я помню активно топил за анализ данных прямо на скале (https://habr.com/ru/companies/vk/articles/442688/ например) и даже в ньюпролабах был такой курс. Но это все-таки не мейнстрим.
sshikov
Ну я 50 человек не учил, у нас всего примерно пятеро подопытных, большинство из которых аналитики, то есть не разработчики (то есть примерно с таким же опытом, как у вас, наверное). В принципе, необходимый уровень скалы — это умение писать функции (по сути — просто оформлять код как функцию), и их комбинировать, практически так же, как в спарке — то есть, map/flatMap/filter, где-то так. Изредка бывают нужны коллекции. Остальное что нужно — ну это вообще базовые вещи типа переменных, типов данных (числовые и строки), регулярки… то есть это может непривычно — но вообще не сложно.
aledovskiy Автор
Да, полностью согласен. Функциональное программирование изучать не нужно)
aledovskiy Автор
Во времена моей работы в Сбере, у нас все продовые витрины были на скале, и на скале писали ноутбуки аналитики из команды витрин. Но все остальные сидели на pySpark.
В Авито же мы сидим только на pySpark, ибо количество людей тут меньше и объем работы на человека больше. Дата инженеров мало. Аналитик сам должен докатить витрину до airflow и даже тесты написать. Нашими инженерами было принято решение дополнительную технологию пока не затаскивать. Хотя смотря на некоторые монструозные питоновские udf я кусаю локти))
sshikov
Понятно. Ну да, наличие людей и знание ими технологий — это очень важный факто, который определяет выбор этих самых технологий.
Про графики я почти ничего не знаю, как-то давно не приходилось сталкиваться (но как по мне, неужто для скалы нельзя найти аналоги?). А вот про модели — Spark ML же есть, он прямо совсем-совсем не тянет на замену питоновским инструментам? (Я далек от ML — поэтому вопрос может быть глупый).
aledovskiy Автор
Какие-то библиотеки для скалы конечно есть, но по уровню их развития и сообщества несравнимы. Можно сказать, что питоновские либы отлажены на сотнях тысяч если не миллионах пользователей.
sshikov
Ну меня скорее удивляет то, что многие "питоновские" либы — они же на самом деле написаны на C, к примеру. Или вообще может на фортране :) Ну то есть, сделать к ним еще одну обертку для скалы — в общем задача не сильно сложная. Должна бы быть.
aledovskiy Автор
Да, на C много low-level алгоритмов. Но это как правило специальный код, где используются питоновские типы данных. Вот, например, преобразование фурье в numpy. Там его ну никак не отделишь..
Есть библиотеки вроде xgboost или catboost, который написан на cpp и имеют биндинги к нескольким языкам. У них есть версии для JVM. Но это довольно ограниченный набор библиотек
Но также очень много кода (интерфейсного) написано на чистом питоне. И тут вообще никак не отделить
aledovskiy Автор
Ну и Spark ML - оч достойный, но узкоспециализированный фрейморк. Он содержит небольшой набор алгоритмов. Популярность не оч высокая. Алгоритмы только параллельные (а не все алгоритмы хорошо параллелятся, поэтому параллелизация дает трейдоффы)
Есть конечно, например, catboost для spark, который мы даже запускали. Но там не все фичи поддерживаются и мы отказались в итоге
illuminati16
Я думаю, основное непонимание возникает из-за различий в специфике работы. Для аналитики разница в уровне проработанности библиотек и развитости коммьюнити между Python и Scala колоссален. Если брать базовую работу аналитика - сделать несколько запросов в БД, определить алгоритм соединения таблиц, в процессе несколько раз проверить count-ы в разных разрезах, вывести сэмплы и подготовить финальную выборку для дальнейшего анализа, то автоматом приходим к тому, что писать надо в ноутбуке, а исторически это Python, хотя все вышеперечисленное можно с успехом написать и на Scala. Но найти на рынке умеющих в Python гораздо легче и можно работать сразу, не перекчивая.
Но вот что касается дальнейшей работы аналитика - построить несколько графиков с выводами, вызвать несколько статистических либ для проведения А/В- теста, или тем более обучить модель - нормальных либ на Scala на данный момент почему-то нет, очевидно из-за комьюнити. И делать это в разных ноутбуках - в одном готовить данные, а в другом обрабатывать - не слишком удобно.
При этом в некоторых кейсах Scala дает кратное преимущество. Например недавно решали DS задачу на графах, и на Scala получилось практически за линейное время обсчитать сотни джойнов с использованием внутреннего rdd-представления - но такие задачи все же возникают редко. Зато чаще возникает необходимость обсчитать например трендовые фичи и в Scala начинаются танцы с бубнами там, где в Python одна строчка)
aledovskiy Автор
Да, полностью согласен!
А что за задача если не секрет?)