Это мой вольный перевод статьи "Why You Should Start Writing Spark Custom Native Functions", которая вдохновила меня на некоторые собстенные изыскания по данной теме. Их результат я планирую опубликовать позже, а пока выношу на ваш суд этот перевод.
Один из первых способов, к которому люди обращаются, когда им нужно сделать что-то, что не поддерживается из коробки в Spark, это написание UDF (user-defined function), которая позволяет им получить нужный функционал. Но является ли это лучшим способом? Каковы последствия для производительности при написании UDF?
В этом посте мы рассмотрим пример по реализации функции, возвращающей UUID, с помощью двух разных подходов: с использованием UDF и написание собственного нативного кода для Spark, и сравним их производительность. Пост будет следовать следующей структуре:
Введение
Реализация UUID с использованием UDF
Реализация UUID с использованием Catalyst Expressions
Сравнение производительности
Заключение
Примечание 1: Мы будем, естестенно, писать код на Scala
Примечание 2: Функция UUID() доступна в Spark с версии 3.0.0, но ее реализация все равно полезна в качестве упражнения из-за ее простоты.
Примечание 3: весь код из данной статьи вы можете найти тут: https://github.com/imdany/spark_catalyst_udf
Введение
Если вы работали с Spark, то знаете, что бывают случаи, когда сам Spark не предоставляет необходимого функционала, и вам приходится его расширять.
Обычно вы можете сделать это, написав UDF, которая выполняет нужную работу. Но знали ли вы, что есть и другой альтернативный способ? Он называется Catalyst Expressions, и, должен признаться, их не так просто написать, но они могут повысить производительность вашего приложения до нового уровня. Итак, приступим к делу!
Реализация UUID с помощью UDF
Реализация генератора UUID с использованием UDF проста. С некоторыми вариациями или другим синтаксисом, один из способов написания этой функции выглядит так:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
def uuid: UserDefinedFunction = udf(() => java.util.UUID.randomUUID().toString)
spark.sqlContext.udf.register("uuid", uuid)
Таким образом, мы можем использовать функцию UUID как с помощью SQL-выражений, так и с помощью API DataFrame.
Основная идея здесь заключается в том, что мы используем существующую функцию Java, но не в Spark. Чтобы использовать ее из Spark, необходимо обернуть этот код в UDF, а если мы хотим использовать его и в SQL API, то еще и зарегистрировать.
Просто, верно? И вы можете использовать этот подход для многих, многих разных вещей. Это легко сделать, и это работает, но есть и другие способы расширения функциональности Spark.
Реализация UUID с использованием Catalyst Expressions
Написание Catalyst Expressions определенно сложнее, чем UDF, но как вы увидите в следующем разделе, это имеет свои преимущества в части производительности.
Для написания пользовательской функции в Spark нам нужно как минимум два файла:
первый будет реализовывать саму логику, расширяя функционал Catalyst
второй сделает эту функцию доступной
Catalyst Expression
Файл, содержащий реализацию кода, должен быть создан в определенном пакете, а именно:
org.apache.spark.sql.catalyst.expressions
Следовательно, нам нужно создать файл для нашей функции в этой папке в нашем проекте Spark.
И здесь дела могут стать довольно сложными. Чтобы заставить Spark использовать наши функции, нам нужно расширить доступный интерфейс. Существует множество различных интерфейсов, которые мы могли бы реализовать, и найти подходящий может быть сложным, так как документация на этот счет недостаточна. Для упрощения вещей давайте проанализируем, что мы хотим достичь - функция без входных параметров, возвращающая строку.
Я выяснил, что мне нужно реализовать Leaf Expression, так что мой класс будет начинаться так:
case class Uuid() extends LeafExpression with CodegenFallback {
override def nullable: Boolean = ???
override def eval(input: InternalRow): Any = ???
override def dataType: DataType = ???
}
Итак, давайте заполним эту определенную часть, начиная с простых методов.
Для dataType, так как мы хотим вернуть строку:
override def dataType: DataType = StringType
А для nullable, поскольку мы не хотим возвращать значения null из нашей функции:
override def nullable: Boolean = false
Последний кусочек, "eval", - это фактическое выполнение функции, которая будет генерировать UUID.
override def eval(input: InternalRow): Any =
UTF8String.fromString(java.util.UUID.randomUUID().toString)
Вот и все! Единственное необычное, что вы можете заметить, - это UTF8String.fromString(). Если вы попытаетесь запустить код без этого, вы увидите:
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
Причина вызова этого метода заключается в том, что Spark использует его для преобразования "внешней строки" в "строку Spark" (https://github.com/apache/spark/…/UTF8String.java#L49)
Окончательный код выглядит следующим образом:
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
case class Uuid() extends LeafExpression with CodegenFallback {
override def nullable: Boolean = false
override def dataType: DataType = StringType
override def eval(input: InternalRow): Any = UTF8String.fromString(java.util.UUID.randomUUID().toString)
}
Просто, верно? Ну, в этом случае, да, реализация была легкой, но обычно это не так просто.
Function wrapper
Теперь, когда у нас есть выражение Catalyst, мы должны сделать его
доступным для использования в API Dataframe. Для этого нам нужно создать второй
файл. Местоположение этого файла не так важно, как у предыдущего, но
для порядка я обычно помещаю его в org.apache.spark.sql
И на этот раз я назвал его CustomFunctions, и он должен содержать следующее содержание:
object CustomFunctions {
private def withExpr(expr: Expression): Column = Column(expr)
def UUID_CUSTOM(): Column = withExpr {
Uuid()
}
}
С помощью этого кода мы делаем функцию Uuid доступной через объект CustomFunctions.
Использование
И последний вопрос: как мы используем эту функцию? Ответ довольно прост! Мы должны импортировать ее, как и любую другую функцию и использовать ее в нашем DataFrame:
import org.apache.spark.sql.CustomFunctions.UUID_CUSTOM
//....
val newDf = df.withColumn("uuid", UUID_CUSTOM())
Сравнение производительности
Вопрос, который вы могли бы себе задать, - стоит ли все это того? Ну, давайте взглянем на цифры. Я запустил один и тот же код с использованием UDF и выражения Catalyst на DataFrame разного размера, и результаты довольно интересны.
// UDF version
val data = spark.range(nRows).toDF("ID").withColumn("uuid_udf", expr("uuid_udf()"))
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/UDF/${runID}")
// --------
// Catalyst Version
val data = spark.range(nRows).toDF("ID").withColumn("uuid", UUID_CUSTOM())
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/catalyst/${runID}")
Я запустил каждую функцию с 4 разными значениями числа строк по 100 раз, затем получил среднее время для каждой, и вот результат:
Для небольших DataFrame разница не очень заметна... но когда вы увеличиваете размер DataFrame, можно начать замечать, насколько лучше Catalyst Expressions справляется с задачей по сравнению с UDF.
Заключение
Так что стоит ли мне перестать использовать UDF и начать писать Catalyst Expression?
Я не могу ответить на этот вопрос за вас, так как это будет зависеть от многих различных аспектов, таких как время, доступные ресурсы или знания. Но что ясно из результатов тестов, так это то, что если вам нужно быстродействие приложения или сократить время выполнения вашей задачи, вам стоит рассмотреть возможность изучения написания таких Catalyst Expression.
Комментарии (12)
sshikov
24.04.2024 09:44+1Реализация генератора UUID с использованием UDF проста.
Я бы хотел отметить, что на самом деле все не всегда так просто, даже для такой простой функции. Дело в том, что UDF сериализуются и передаются в executors, это во-первых (ну, те кто программирует на спарке, уже должны это обычно знать).
Но тут еще могут добавляться вопросы с класслоадерами. Скажем, мы как-то попытались создать экземпляр UDF и зарегистрировать его в groovy скрипте, который динамически выполнялся из кода на спарке. Так вот, ничего не получилось, потому что класслоадер оказался другой, и наша функция имела сигнатуру, отличную от нужной. А как устроены класслоадеры в спарке, описано примерно так же, как описано создание catalyst выражений из этой статьи - т.е. примерно никак.
sshikov
24.04.2024 09:44eval(input: InternalRow)
Ну вот хорошо что у автора 0-арная функция. А если мы заходим использовать аргументы? Я пытался как-то разобраться с expressions, но так и уперся в отсутствие документации, скажем, непонятно где вот эти
InternalRow взять, и что с ними можно делать.
Geckelberryfinn
Ну, допустим, а можно теперь эту кастомную функцию как-то заппакеджить и дать дата-сайнтистам, которые на PySpark в основном? Особенно, если речь идёт о облачных средах, таких как Databricks или Glue?
Ninil Автор
Интересный кейс. Спасибо! В рамках своих изысканий попробую.
Пока я хочу использовать подобноное для своих Spark-приложений.
mitgard
Вполне, вам нужно сделать джарник с таким кодом, а потом его добавить в зависимость spark-приложения, как условный драйвер для БД.
А в коде просто сделать импорт.
Но если мы говорим, про pySpark, скорее всего придется сделать питон враппер или вызывать через jvm
Ninil Автор
Еще интересный юзкейс для исследования - как сделать доступной эту функцию из Датабриск ноутбука, в котором дата-сайентисты исползуют PySpark
Geckelberryfinn
Посмотрел, да это может сработать через Jar и врапперы. Как посоветовал mitgard, нужно упаковать Jar, затем его указывать в spark-submit --jars. Враппер будет иметь вид вроде такого
В случае databricks тогда все даже чуть проще, созданный Jar можно загрузить в Libraries у кластера.