Это мой вольный перевод статьи "Why You Should Start Writing Spark Custom Native Functions", которая вдохновила меня на некоторые собстенные изыскания по данной теме. Их результат я планирую опубликовать позже, а пока выношу на ваш суд этот перевод.

Один из первых способов, к которому люди обращаются, когда им нужно сделать что-то, что не поддерживается из коробки в Spark, это написание UDF (user-defined function), которая позволяет им получить нужный функционал. Но является ли это лучшим способом? Каковы последствия для производительности при написании UDF?

В этом посте мы рассмотрим пример по реализации функции, возвращающей UUID, с помощью двух разных подходов: с использованием UDF и написание собственного нативного кода для Spark, и сравним их производительность. Пост будет следовать следующей структуре:

  1. Введение

  2. Реализация UUID с использованием UDF

  3. Реализация UUID с использованием Catalyst Expressions

  4. Сравнение производительности

  5. Заключение

Примечание 1: Мы будем, естестенно, писать код на Scala

Примечание 2: Функция UUID() доступна в Spark с версии 3.0.0, но ее реализация все равно полезна в качестве упражнения из-за ее простоты.

Примечание 3: весь код из данной статьи вы можете найти тут: https://github.com/imdany/spark_catalyst_udf

Введение

Если вы работали с Spark, то знаете, что бывают случаи, когда сам Spark не предоставляет необходимого функционала, и вам приходится его расширять.

Обычно вы можете сделать это, написав UDF, которая выполняет нужную работу. Но знали ли вы, что есть и другой альтернативный способ? Он называется Catalyst Expressions, и, должен признаться, их не так просто написать, но они могут повысить производительность вашего приложения до нового уровня. Итак, приступим к делу!

Реализация UUID с помощью UDF

Реализация генератора UUID с использованием UDF проста. С некоторыми вариациями или другим синтаксисом, один из способов написания этой функции выглядит так:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

def uuid: UserDefinedFunction = udf(() => java.util.UUID.randomUUID().toString)

spark.sqlContext.udf.register("uuid", uuid)

Таким образом, мы можем использовать функцию UUID как с помощью SQL-выражений, так и с помощью API DataFrame.

Основная идея здесь заключается в том, что мы используем существующую функцию Java, но не в Spark. Чтобы использовать ее из Spark, необходимо обернуть этот код в UDF, а если мы хотим использовать его и в SQL API, то еще и зарегистрировать.

Просто, верно? И вы можете использовать этот подход для многих, многих разных вещей. Это легко сделать, и это работает, но есть и другие способы расширения функциональности Spark.

Реализация UUID с использованием Catalyst Expressions

Написание Catalyst Expressions определенно сложнее, чем UDF, но как вы увидите в следующем разделе, это имеет свои преимущества в части производительности.

Для написания пользовательской функции в Spark нам нужно как минимум два файла:

  • первый будет реализовывать саму логику, расширяя функционал Catalyst

  • второй сделает эту функцию доступной

Catalyst Expression

Файл, содержащий реализацию кода, должен быть создан в определенном пакете, а именно:

org.apache.spark.sql.catalyst.expressions

Следовательно, нам нужно создать файл для нашей функции в этой папке в нашем проекте Spark.

И здесь дела могут стать довольно сложными. Чтобы заставить Spark использовать наши функции, нам нужно расширить доступный интерфейс. Существует множество различных интерфейсов, которые мы могли бы реализовать, и найти подходящий может быть сложным, так как документация на этот счет недостаточна. Для упрощения вещей давайте проанализируем, что мы хотим достичь - функция без входных параметров, возвращающая строку.

Я выяснил, что мне нужно реализовать Leaf Expression, так что мой класс будет начинаться так:

case class Uuid() extends LeafExpression with CodegenFallback {
  override def nullable: Boolean = ???
  override def eval(input: InternalRow): Any = ???
  override def dataType: DataType = ???
}

Итак, давайте заполним эту определенную часть, начиная с простых методов.

Для dataType, так как мы хотим вернуть строку:

override def dataType: DataType = StringType

А для nullable, поскольку мы не хотим возвращать значения null из нашей функции:

override def nullable: Boolean = false

Последний кусочек, "eval", - это фактическое выполнение функции, которая будет генерировать UUID.

override def eval(input: InternalRow): Any = 
  UTF8String.fromString(java.util.UUID.randomUUID().toString)

Вот и все! Единственное необычное, что вы можете заметить, - это UTF8String.fromString(). Если вы попытаетесь запустить код без этого, вы увидите:

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

Причина вызова этого метода заключается в том, что Spark использует его для преобразования "внешней строки" в "строку Spark" (https://github.com/apache/spark/…/UTF8String.java#L49)

Окончательный код выглядит следующим образом:

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

case class Uuid() extends LeafExpression with CodegenFallback {
  override def nullable: Boolean = false
  override def dataType: DataType = StringType
  override def eval(input: InternalRow): Any = UTF8String.fromString(java.util.UUID.randomUUID().toString)
}

Просто, верно? Ну, в этом случае, да, реализация была легкой, но обычно это не так просто.

Function wrapper

Теперь, когда у нас есть выражение Catalyst, мы должны сделать его
доступным для использования в API Dataframe. Для этого нам нужно создать второй
файл. Местоположение этого файла не так важно, как у предыдущего, но
для порядка я обычно помещаю его в org.apache.spark.sql

И на этот раз я назвал его CustomFunctions, и он должен содержать следующее содержание:

object CustomFunctions { 
  private def withExpr(expr: Expression): Column = Column(expr) 
  def UUID_CUSTOM(): Column = withExpr { 
    Uuid() 
  } 
}

С помощью этого кода мы делаем функцию Uuid доступной через объект CustomFunctions.

Использование

И последний вопрос: как мы используем эту функцию? Ответ довольно прост! Мы должны импортировать ее, как и любую другую функцию и использовать ее в нашем DataFrame:

import org.apache.spark.sql.CustomFunctions.UUID_CUSTOM
//....
val newDf = df.withColumn("uuid", UUID_CUSTOM())

Сравнение производительности

Вопрос, который вы могли бы себе задать, - стоит ли все это того? Ну, давайте взглянем на цифры. Я запустил один и тот же код с использованием UDF и выражения Catalyst на DataFrame разного размера, и результаты довольно интересны.

// UDF version
val data = spark.range(nRows).toDF("ID").withColumn("uuid_udf", expr("uuid_udf()"))
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/UDF/${runID}")
// --------
// Catalyst Version
val data = spark.range(nRows).toDF("ID").withColumn("uuid", UUID_CUSTOM())
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/catalyst/${runID}")

Я запустил каждую функцию с 4 разными значениями числа строк по 100 раз, затем получил среднее время для каждой, и вот результат:

Для небольших DataFrame разница не очень заметна... но когда вы увеличиваете размер DataFrame, можно начать замечать, насколько лучше Catalyst Expressions справляется с задачей по сравнению с UDF.

Заключение

Так что стоит ли мне перестать использовать UDF и начать писать Catalyst Expression?

Я не могу ответить на этот вопрос за вас, так как это будет зависеть от многих различных аспектов, таких как время, доступные ресурсы или знания. Но что ясно из результатов тестов, так это то, что если вам нужно быстродействие приложения или сократить время выполнения вашей задачи, вам стоит рассмотреть возможность изучения написания таких Catalyst Expression.

Комментарии (12)


  1. Geckelberryfinn
    24.04.2024 09:44

    Ну, допустим, а можно теперь эту кастомную функцию как-то заппакеджить и дать дата-сайнтистам, которые на PySpark в основном? Особенно, если речь идёт о облачных средах, таких как Databricks или Glue?


    1. Ninil Автор
      24.04.2024 09:44

      Интересный кейс. Спасибо! В рамках своих изысканий попробую.
      Пока я хочу использовать подобноное для своих Spark-приложений.


    1. mitgard
      24.04.2024 09:44
      +2

      Вполне, вам нужно сделать джарник с таким кодом, а потом его добавить в зависимость spark-приложения, как условный драйвер для БД.

      А в коде просто сделать импорт.

      Но если мы говорим, про pySpark, скорее всего придется сделать питон враппер или вызывать через jvm


      1. Ninil Автор
        24.04.2024 09:44

        Еще интересный юзкейс для исследования - как сделать доступной эту функцию из Датабриск ноутбука, в котором дата-сайентисты исползуют PySpark


        1. Geckelberryfinn
          24.04.2024 09:44
          +1

          Посмотрел, да это может сработать через Jar и врапперы. Как посоветовал mitgard, нужно упаковать Jar, затем его указывать в spark-submit --jars. Враппер будет иметь вид вроде такого

          from pyspark.sql.column import Column
          from pyspark.sql.functions import expr
          
          def my_catalyst_expression(column):
              return Column(expr(f"MyCatalystExpression({column._jc.toString()})"))
          
          ...
          df.select(my_catalyst_expression(df.field)).show()

          В случае databricks тогда все даже чуть проще, созданный Jar можно загрузить в Libraries у кластера.


  1. druzyk
    24.04.2024 09:44

    Интересно!


  1. sshikov
    24.04.2024 09:44
    +1

    Реализация генератора UUID с использованием UDF проста. 

    Я бы хотел отметить, что на самом деле все не всегда так просто, даже для такой простой функции. Дело в том, что UDF сериализуются и передаются в executors, это во-первых (ну, те кто программирует на спарке, уже должны это обычно знать).

    Но тут еще могут добавляться вопросы с класслоадерами. Скажем, мы как-то попытались создать экземпляр UDF и зарегистрировать его в groovy скрипте, который динамически выполнялся из кода на спарке. Так вот, ничего не получилось, потому что класслоадер оказался другой, и наша функция имела сигнатуру, отличную от нужной. А как устроены класслоадеры в спарке, описано примерно так же, как описано создание catalyst выражений из этой статьи - т.е. примерно никак.


    1. Ninil Автор
      24.04.2024 09:44

      Да, про сериализацию верно подмечено. Плюсую


  1. sshikov
    24.04.2024 09:44

    eval(input: InternalRow)

    Ну вот хорошо что у автора 0-арная функция. А если мы заходим использовать аргументы? Я пытался как-то разобраться с expressions, но так и уперся в отсутствие документации, скажем, непонятно где вот эти InternalRow взять, и что с ними можно делать.


    1. Ninil Автор
      24.04.2024 09:44

      В рамках собственных "раскопок" мне уже удалось создать функцию 2х и 3х аргументов - на эту тему и хочу подготовить уже свою статью


      1. sshikov
        24.04.2024 09:44

        О, давайте-давайте, будем ждать.


  1. vadyushanovikov
    24.04.2024 09:44

    Спасибо автору за перевод