К нашей новой программе "Apache Spark для дата-инженеров" и вебинару о курсе, который пройдет 2 декабря, мы подготовили перевод обзорной статьи о Spark 3.0.
Spark 3.0 вышел с целым набором важных улучшений, среди которых: повышение производительности с помощью ADQ, чтение бинарных файлов, улучшенная поддержка SQL и Python, Python 3.0, интеграция с Hadoop 3, поддержка ACID.
В этой статье автор постарался привести примеры использования этих новых функций. Это первый первый материал о функциональности Spark 3.0 и у этой серии статей планируется продолжение.
В этой статье освещены следующие функции Spark 3.0:
Структура адаптивного выполнения запросов (AQE)
Поддержка новых языков
Новый интерфейс для структурированной потоковой передачи
Чтение бинарных файлов
Рекурсивный просмотр папок
Поддержка многозначного разделителя данных (||)
Новые встроенные функции Spark
Переключение на пролептический григорианский календарь
Data Frame Tail
Функция Repartition в SQL запросах
Улучшенная совместимость с ANSI SQL
Адаптивное выполнение запросов
Функция адаптивной регулировки запросов (AQE) – это, пожалуй, одно из самых существенных улучшений Spark 3.0. Она повторно оптимизирует и корректирует планы запросов, основываясь на статистике времени их выполнения, собранной во время обработки запросов.
До выхода версии 3.0 оптимизация в Spark происходила путем создания плана до начала выполнения запроса, а как только запросы начинали обрабатываться, Spark не проводил дальнейшей оптимизации с использованием метрик, собираемых в процессе выполнения запросов. AQE восполняет этот пробел, применяя второй уровень оптимизации, основанный на метриках, получаемых на каждом этапе.
По умолчанию, функция адаптивной регулировки запросов (AQE) отключена. Для включения необходимо переключить конфигурацию spark.sql.adaptive.enabled на true. С включенной функцией AQE, Spark демонстрирует двукратное ускорение по данным TPC-DS по сравнению с версией Spark 2.4
AQE в Spark 3.0 выполняет 3 важных функции:
Динамически объединяет разделы, что позволяет оптимизировать их количество
Преобразует join sort-merge в broadcast соединение
Оптимизирует джойны со скошенными данными
Поддержка новых языков
В Spark 3.0 расширилась библиотека языков, теперь поддерживаются:
Python3 (Python 2.x)
Scala 2.12
JDK 11
Помимо поддержки новых языков также добавилась поддержка Hadoop до 3 версии, Kafka до 2.4.1 и много других обновлений.
Новый интерфейс для Spark Structured Streaming
В web-интерфейсе Spark появилась новая вкладка Структурированной потоковой передачи для мониторинга стриминговых приложений. В этой вкладке отображается идентификатор запуска, статус, время запуска, продолжительность обработки каждого микро-пакета, а также текущая статистика. Это позволяет разработчику видеть, что происходит с потоковыми запросами и вовремя выявлять ошибки.
Вкладка содержит 2 раздела:
Активные потоковые запросы
Завершенные потоковые запросы
Источник: Databricks
Раздел «Active Streaming Queries» отображает все текущие запросы, а раздел «Completed Streaming Queries» – все неудавшиеся и завершенные.
По ссылке Run ID представлена подробная статистика потоковых запросов по следующим параметрам: скорость поступления данных, скорость обработки, строки ввода, продолжительность каждого пакета, время, затраченное на выполнение различных операций. Вы можете найти примеры, как устранить проблемы структурированной потоковой передачи в Databricks.
Источник данных для чтения бинарных файлов
Spark 3.0 поддерживает источник данных в формате “binaryFile”, который позволяет считывать бинарные файлы.
С помощью источника binaryFile, DataFrameReader считывает форматы image, pdf, zip, gzip, tar и многие другие бинарные файлы. Каждый файл будет преобразован в отдельную запись в датафрейме, содержащую метаданные файла.
val df = spark.read.format("binaryFile").load("/tmp/binary/spark.png")
df.printSchema()
df.show()
Выдает следующий результат
root
|-- path: string (nullable = true)
|-- modificationTime: timestamp (nullable = true)
|-- length: long (nullable = true)
|-- content: binary (nullable = true)
+--------------------+--------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+--------------------+------+--------------------+
|file:/C:/tmp/bina…|2020-07-25 10:11:…| 74675|[89 50 4E 47 0D 0...|
+--------------------+--------------------+------+--------------------+
Функция рекурсивного просмотра папок
В Spark 3.0 теперь есть функция recursiveFileLookup, которая позволяет читать и загружать файлы из рекурсивных подпапок. После установки значения true для этой опции, DataFrameReader рекурсивно загружает файлы, перебирая все папки и подпапки по указанному пути.
spark.read.option("recursiveFileLookup", "true").csv("/path/to/folder")
Поддержка многозначного разделителя данных
В Spark 3.0 поддерживается функция многосимвольного разделителя (||) при чтении и записи CSV файлов. К примеру, если у вас есть CSV файл с подобным содержанием:
col1||col2||col3||col4
val1||val2||val3||val4
val1||val2||val3||val4
Он может считываться следующим образом:
val df = spark.read
.option("delimiter","||")
.option("header","true")
.csv("/tmp/data/douplepipedata.csv")
В Spark 2.x многозначный разделитель не поддерживался, поэтому нам приходилось преобразовывать двойной разделитель в одинарный с помощью препроцессора. Попытки считывать файлы напрямую без препроцессора заканчивались следующей ошибкой:
throws java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ||
Новые встроенные функции Spark
К внушительному списку из сотен встроенных функций Spark SQL, третья версия Spark добавила новые.
sinh,cosh,tanh,asinh,acosh,atanh,any,bitand,bitor,bitcount,bitxor,
booland,boolor,countif,datepart,extract,forall,fromcsv,
makedate,makeinterval,maketimestamp,mapentries
mapfilter,mapzipwith,maxby,minby,schemaofcsv,tocsv
transformkeys,transform_values,typeof,version
xxhash64
Переключение на Пролептический Григорианский календарь
Предыдущая версия Spark поддерживает даты в формате Юлианского и Григорианского календаря: для дат до 1582 года используется Юлианский календарь, а для более поздних дат – Григорианский.
В JDK 7 и более ранних версиях используется тот же принцип посредством java.sql.Date API
. В JDK 8 представлен java.time.LocalDate API
с опцией пересчета дат по Пролептическому Григорианскому календарю.
Теперь в Spark 3.0 тоже есть опция переключения на Пролептический Григорианский календарь, который уже давно используется такими системами обработки данных как Pandas, R и Apache Arrow. Если вы оперируете датами в периоде после 15 октября 1582 г., то все функции Date&Timestamp, которые работали в Spark до версии 3.0, будут работать корректно. Ваши результаты могут отличаться, если вы используете даты в периоде до 15 октября 1582 г.
Вместе с опцией переключения на Пролептический Григорианский календарь в Spark 3.0 представлены новые Date & Timestamp функции:
makedate(), maketimestamp(), makeinterval().
makedate(year, month, day)
– эта функция возвращает дате из входных аргументов поля <год>, <месяц> и <число>.
makedate(2014, 8, 13)
//returns 2014-08-13.
maketimestamp(year, month, day, hour, min, sec[, timezone])
– эта функция возвращает Timestamp из полей <год>, <месяц>, <час>, <мин>, <сек>, <часовой пояс>.
maketimestamp(2014, 8, 13, 1,10,40.147)
//returns Timestamp 2014-08-13 1:10:40.147
maketimestamp(2014, 8, 13, 1,10,40.147,CET)
makeinterval(years, months, weeks, days, hours, mins, secs)
– создает интервал между значениями
Отправка недопустимых дат в makedate()
и make_timestam()
возвращает 0.
Новая опция DataFrame.tail()
В Spark есть действие head(),
которое возвращает элементы из начала дата фрейма, но нет действия tail()
, которое уже поддерживается Pandas в Python. В датафреймы Spark 3.0 действие tail()
добавили и теперь есть возможность возвращать определенные элементы из его хвоста. Действие tail()
возвращает scala.Array[T]
в Scala.
val data=spark.range(1,100).toDF("num").tail(5)
data.foreach(print)
//Returns
//[95][96][97][98][99]
Функция repartition в SQL запросах
В SQL запросах Spark отсутствовали некоторые actions, присутствующие в Dataset/DataFrame, например, в Spark SQL не поддерживалась функция repartition() для датасета. Репартишен теперь доступен внутри SQL-выражения. Функция позволяет уменьшить или увеличить количество партиций.
val df=spark.range(1,10000).toDF("num")
println("Before re-partition :"+df.rdd.getNumPartitions)
df.createOrReplaceTempView("RANGE?C17CTABLE")
println("After re-partition :"+df2.rdd.getNumPartitions)
//Returns
//Before re-partition :1
//After re-partition :20
Улучшенная совместимость с ANSI SQL
Поскольку Spark часто используется data-инженерами, которые уже знакомы с ANSI SQL, в Spark 3.0 поработали над улучшением совместимости с ним. Вы можете активировать эту функцию, установив true в spark.sql.parser.ansi.enabled
Spark конфигурации.
Програмы Newprolab по Apache Spark:
Apache Spark для дата-инженеров (Scala). Интенсивный практический курс из 11 занятий, 5 лабораторных.
Apache Spark (Python). Модуль в составе курса "Специалист по большим данным". 6 занятий, 5 лабораторных.
sshikov
Интересно, и как же мы это делали еще в Spark 1.6? Все не так просто, как тут описано. Это было можно давно, например, в путях для хадупа всегда можно было писать * на любом уровне, и это работает.