В предыдущей своей статье Почему стоит начать писать собственные Spark Native Functions? (Часть 1), которая является переводом и которая вдохновила меня на собственные изыскания, был разобран пример, как написать свою Spark Native Function по генерации UID
. Это, конечно, здорово, но вот только данная функция не принимает аргументы на вход, в то время как в реальной практике нам требуются обычно функции, которым надо передать на вход 1, 2 или 3 аргумента. Такие случаи не рассматриваются в упомянутой выше переводной статье - ну что ж, попробуем восполнить этот пробел!
Ниже я предлагаю вашему вниманию результаты своих изысканий по созданию собственных Spark Native Functions, которые бы принимали на вход несколько аргументов. Содержание статьи:
Супер-краткая матчасть
"Препарирование" станадртной функций
Написание своей функции
Альтернативный вариант реализации
Сравнение производительности
Заключение
Супер-краткая матчасть
Для начала полезно попытаться понять, как работает Spark SQL на концептуальном уровне. Желающим, я бы порекомендовал прочитать статью Deep Dive into Spark SQL's Catalyst Optimizer. А для ленивых, краткая выжимка, которая релевантна нашей теме:
Catalyst использует абстрактные синтаксические деревья (AST) для представления логического и физического планов запроса. Эти деревья обрабатываются с помощью различных преобразований и оптимизаций для улучшения выполнения запроса. Процесс включает применение правил для преобразования деревьев, тем самым уточняя план выполнения. Этот древовидный подход позволяет Catalyst выполнять сложную оптимизацию и обеспечивает гибкость и расширяемость при обработке сложных преобразований запросов и пользовательских правил.
-
Фазы оптимизации включают в себя:
Анализ: На этом этапе выполняется проверка логического плана запроса на корректность и определение атрибутов.
Логическая оптимизация: включает применение стандартных правил для упрощения и реструктуризации плана запроса для повышения производительности.
Физическое планирование: преобразование оптимизированного логического плана в один или несколько физических планов с выбиром наиболее эффективную стратегию выполнения.
Генерация кода: используя платформу генерации кода, создается байт-код Java во время выполнения, что значительно ускоряет выполнение запросов.
Начинаем препарировать
Чтобы понять, как написать свою Spark Native Function лучше всего изучить код уже существующих функций. Начнем с функций одного аргмента. Для этого просто открываем в IntelliJ или любой другой IDE пакет org.apache.spark.sql
, находим там любую известную функцию одного аргумента, например unbase64
и используем "метод пристального взгляда". Итак, что мы видим (на самом деле там намного всего больше, но я оставил только то, что относится к "теме"):
object functions {
private def withExpr(expr: Expression): Column = Column(expr)
// ...
def unbase64(e: Column): Column = withExpr { UnBase64(e.expr) }
// ...
}
Именно с помощью этого кода функция становится доступной нам для импорта в нашем проекте с помощью import org.apache.spark.sql.functions.unbase64
. "Провалимся" в функцию UnBase64
. Она находится в org.apache.spark.sql.catalyst.expressions.stringExpressions
и представляет собой Case Class (кстати, как вы думаете, почему именно Case Class а не обычный класс? Ответ несложный - оптимизатор запросов может легко их использовать в patten matching в отличии от обычного класса):
case class UnBase64(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {
override def dataType: DataType = BinaryType
override def inputTypes: Seq[DataType] = Seq(StringType)
protected override def nullSafeEval(string: Any): Any =
CommonsBase64.decodeBase64(string.asInstanceOf[UTF8String].toString)
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""
${ev.value} = ${classOf[CommonsBase64].getName}.decodeBase64($child.toString());
"""})
}
override protected def withNewChildInternal(newChild: Expression): UnBase64 =
copy(child = newChild)
}
На что нам тут важно обратить внимание?
Этот класс наследуется от
UnaryExpression
, который является абстрактным классом, представляющим унарное выражение, то есть выражение, которое содержит только один операнд. Как раз наш случай с функцией одного аргумента.Наследование от
ImplicitCastInputTypes
используется для неявного приведения типов данных во время выполнения операций над данными. Этот механизм позволяет автоматически преобразовывать типы данных, если это необходимо для выполнения операции.Наследование от
NullIntolerant
говорит нам, что любой Null на входе приведет к Null на выходе.Функции
dataType
иinputTypes
- тут все тривиально-
Функция
nullSafeEval
- именно эта функция реализует основную логику и будет использоваться в режиме интерпретации. В контексте Spark и Catalyst интерпретируемый режим относится к способу выполнения запросов без их компиляции в байт-код. Вместо генерации и запуска оптимизированного байт-кода запросы интерпретируются и выполняются непосредственно на основе их логических планов.Обратите внимание на "говорящее" название - функция должна быть null-safe.
Для "ядра" же логики просто переисползуется
decodeBase64
из классаCommonsBase64
из Java (зачем же изобретать велосипед?).Ну и надо так же обратить внимание на то, что Spark "под капотом" для представления строк использует класс
UTF8String
, что тоже учтено в коде.
-
Функция
doGenCode
отвечает за генерацию quasiquotes (не знаю, как красиво перевести на русский; если кратко, то это мощная фича Scala, которая позволяет разработчикам встраивать фрагменты кода внутрь другого кода; особенно полезна в контексте метапрограммирования, когда код генерирует другой код или манипулирует им), которые будут использоваться для создания байт-кода JVM. Эти quasiquites, по сути, содержат Java-код, который мы хотим выполнить. Здесь так же используя методdecodeBase64
из Java-классаCommonsBase64
как и дляnullSafeEval
Кстати, эта фича согласно предупреждению на оф.сайте была существенно переработана и интегрирована в новую систему метапрограммирования. Может быть в том числе и поэтому тоже мы все еще ждем официальной версии Spark для Scala 3?
Функция
withNewChildInternal
является неотъемлемой частью работы с AST и оптимизатором Catalyst в Apache Spark. При реализации преобразований или оптимизации планов запросов его использование помогает гарантировать, что новые экземпляры узлов в дереве создаются с обновленными дочерними элементами, сохраняя неизменность. Поэтому в большинстве существующих функций это и будет создаение копии, чтобы обеспечить неизменность изначального экземпляра.
Любопытные так же могут "провалиться" в реализацию класса CommonsBase64
, но так как это стандартный Java-класс, то смысла в этом особого нет.
Пишем свою функцию одного аргумента
Как и в прошлой статье для написания собственной Spark Native Function нам нам нужно создать в нашем проекте дополнительные файлы и поместить их в src/main/scala/org/apache/spark/sql/
(но можно и в другое удобное вам место):
первый файл будет содержать Catalyst Expression, разместим его в
src/main/scala/org/apache/spark/sql/catalyst/expressions/
второй файл сделает эту функцию доступной, разместим его в
src/main/scala/org/apache/spark/sql/
Для реализации вы можете выбрать любую, вам интересную, а я выбрал по ряду причин написание своей weekOfYear
(на самом деле все просто - надо было реализовать по запросам аналитиков вычисление номера недели именно таким вот образом, поэтому я и решил параллельно попробовать реализовать аналогичную функцию в учебных целях и для статьи)
Для нашей собственной функции одного аргумента (назовем ее myWeekOfYear
) мы создаем в проекте файл src/main/scala/org/apache/spark/sql/customFunctions.scala
(это "второй файл" по нумерации выше). Именно отсюда мы будем импортировать ее в секции импортов для дальнейшего использования нашим Spark-приложением.
object customFunctions {
private def withExpr(expr: Expression): Column = Column(expr)
def myWeekOfYear(col: Column): Column = withExpr {
MyWeekOfYear(col.expr)
}
}
Теперь самое главное. "Первый" файл с Catalyst Expression. Я поместил его вообще в отдельный файл src/main/scala/org/apache/spark/sql/catalyst/expressions/myWeekOfYearExpression.scala
. Но если функций много, то их можно объединить "по смыслу", как это сделано для уже упомянутых выше stringExpressions
.
case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {
override def inputTypes: Seq[DataType] = Seq(StringType)
override def dataType: DataType = StringType
override def nullSafeEval(dateString: Any): Any = {
if (dateString == null) null
else {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ENGLISH)
val date =
LocalDate.parse(dateString.asInstanceOf[UTF8String].toString, formatter)
val weekNumber = date.get(WeekFields.ISO.weekOfWeekBasedYear)
val year = date.get(WeekFields.ISO.weekBasedYear)
// Use formatted string with leading zero for weeks 1-9 (01-09)
UTF8String.fromString(f"$year-W$weekNumber%02d")
}
}
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode
): ExprCode = {
val dateFormatter = classOf[DateTimeFormatter].getName
val localDate = classOf[LocalDate].getName
val weekFields = classOf[WeekFields].getName
val utf8String = classOf[UTF8String].getName
val formatter = ctx.addMutableState(
dateFormatter,
"formatter",
v =>
s"""$v = $dateFormatter.ofPattern("yyyy-MM-dd", java.util.Locale.ENGLISH);"""
)
nullSafeCodeGen(
ctx,
ev,
(dateString) => {
s"""
|if ($dateString == null) {
| ${ev.isNull} = true;
|} else {
| $localDate date = $localDate.parse($dateString.toString(), $formatter);
| int weekNumber = date.get($weekFields.ISO.weekOfWeekBasedYear());
| int year = date.get($weekFields.ISO.weekBasedYear());
| ${ev.value} = $utf8String.fromString(String.format("%d-W%02d", year, weekNumber));
|}
""".stripMargin
}
)
}
override def prettyName: String = "MyWeekOfYear"
override protected def withNewChildInternal(newChild: Expression): myWeekOfYearExpression =
copy(child = newChild)
}
Итак, что же мы сделали (по сравнению/аналогии с UnBase64
)?
inputTypes
иdataType
- без комментариев, так как тривиальноnullSafeEval
- здесь реализована основная логика нашей функции на Scala. В принципе, ничего не мешает нам реализовать саму функцию вообще "где угодно" в нашем проекте, а здесть просто "обернуть" ее вызов. Но об этом ниже. Не забываем при этом так же проUTF8String
, если работает со строками.doGenCode
- здесь по сути реализуем (переписываем) нашу функцию изnullSafeEval
на JavaprettyName
- "бантики", даем красивое имя :-) Обычно это значение должно совпадать с именем функции в SQL.withNewChildInternal
- просто создаем копию, как и в разобранной выше функции из стандартной функциональности Spark.
Кстати, ремарка. Никто не мешает нам вместо:
case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant l{
...
написать более понятное (на мой субъективный взгля; здесь мы заменияем менее понятное "child" на более семантически релевантное "dateString" во входных параметрах case class'а):
case class myWeekOfYearExpression(dateString: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {
override def child: Expression = dateString
...
Альтернативный вариант реализации
Мы с вами выше реализовали требуемую нам функцию. Но есть пара моментов:
-
В
nullSafeEval
иdoGenCode
мы по сути дублировали код нашей функции на Scala и на Java, что не очень "хорошо":хотелось бы избежать дублирования кода
хотелось бы не использовать два разных языка программирования, или использовать по минимуму.
Хотелось бы переиспользовать уже существующие в нашем проекте функции, просто "обернув" их в Catalyst Expressions - то есть просто "как-то" переиспользовать их в
nullSafeEval
иdoGenCode
Чтож, это тоже возможно! Представим, что наша функция уже реализована в объекте MyFunctions
, например так:
object MyFunctions {
def myWeekOfDate(dateString: String): String = {
if (dateString.isEmpty) null.asInstanceOf[String] // Null safe
else {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ENGLISH)
val date =
LocalDate.parse(dateString.toString, formatter)
val weekNumber = date.get(WeekFields.ISO.weekOfWeekBasedYear)
val year = date.get(WeekFields.ISO.weekBasedYear)
// Use formatted string with leading zero for weeks 1-9 (01-09)
f"$year-W$weekNumber%02d"
}
}
}
Тогда мы можем резко упростить наш case class myWeekOfYearExpression
, переиспользовав эту функцию. В этом случае он будет выглядеть примерно так:
case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {
//...
override def nullSafeEval(dateString: Any): Any = {
val date = dateString.asInstanceOf[UTF8String].toString
UTF8String.fromString(MyFunctions.myWeekOfDate(date))
}
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode
): ExprCode = {
nullSafeCodeGen(
ctx,
ev,
dateString => {
val className = MyFunctions.getClass.getName.stripSuffix("$")
s"""
${ev.value} = UTF8String.fromString($className.myWeekOfDate($dateString.toString()));
"""
}
)
}
//...
}
Тут по сравнению с первым вариантом меняется только nullSafeEval
и doGenCode
- по сути мы только "адаптируем" вызов имеющейся функции MyFunctions.myWeekOfDate
под использование UTF8String
вместо String
.
На первый взгляд это и проще, и изящнее, но не будем забегать вперед.
Сравнение производительности
Ну и перейдем к самому интересному - к сравнению производительсности! Для этого давайте "обернем" точно такой же код в Spark UDF, чтобы иметь "точку отсчета". Например так (не забываем про null-safe):
val myWeekOfYearUDF = udf((dateString: String) => {
if (dateString == null) null
else MyFunctions.myWeekOfDate(dateString)
}
)
Прежде чем запускать тесты, давайте взглянем на планы запросов применения этих функций к колонке с данными (в которую поместим сгенерированнуйю случайным образом дату в виде строки).
// Вариант 1
== Physical Plan ==
*(1) Project [random_date#6, MyWeekOfYear1(random_date#6) AS weekOfYear#8]
+- *(1) Project [...]
+- *(1) Range (...)
// Вариант 2
== Physical Plan ==
*(1) Project [random_date#17, MyWeekOfYear2(random_date#17) AS weekOfYear#19]
+- *(1) Project [...]
+- *(1) Range (...)
// UDF
== Physical Plan ==
*(1) Project [random_date#28, UDF(random_date#28) AS weekOfYear#30]
+- *(1) Project [...]
+- *(1) Range (...)
Видим, что в первых двух случаях Catalyst "видит" наши функции. В третьем случае UDF для него является "черным ящиком", что и отражает план запроса.
Я делал сравнение производительности на своем скромном MacBook Pro на M2 с настройками "по умолчанию", которые я всегда использую для работы с нашими проектами, и версиями Spark 3.2.0 и 3.5.0 (но это оказалось излишним, так как версия Spark принципиально не повлияла на результат, поэтому ниже привожу графики в одном экземпляре, без разделения по версиям Spark).
Каждый вариант реализации запускаем для 1, 10 и 100 миллионов строк.
Каждый эксперимент повторяем по 50 раз
Для каждого эксперимент генерим DataFrame c нужным количеством строк со случайными датами, применяем нашу функцию и сохраняем данные в формате parquet.
Я не стал запускать полноценный эксперимент для 1 млрд строк, так как тенденция в целом и так понятна и не хотелось ждать много времени. Поэтому для 1 млрд. строк я запустил эксперименты только по 5 раз и привожу "ящик с усами" скорее для справки.
Результаты вы можете увидеть на графиках ниже (сорри за "красоту", чукча - не художник :-)). По оси Y - время отработки тестового сценария в миллисекундах, по оси X - разные варианты реализации: вариант 1 с inline-кодом на Java, вариант 2 с просто "оберткой" существующей Scala-функции и вариант реализации через UDF.
Графики с результатами
Что мы видем?
Вариант 1 (c inline-кодом) самый производительный, а реализация через UDF ожидаемо наиболее медленная.
Вариант 2 (с "оберткой" Scala-функции) и вариант с UDF проигрывают от 8-14% в зависимости от объема данных.
Выигрышь в производительности от использования Варианта 1 имеет тенденцию увеличиваться с ростом объема данных.
По всей видимости наша функция
myWeekOfDate
достаточно простая (кто бы сомневался :-)), что при любом варианте реализации мы не получаем настолько большого прироста производительности, как дляUUID
из первой части статьи (где для 100M строк UDF отработала за 190% времени, а для 1000M - аж за 250%)
То, что UDF проигрывает Spark Native Functions - было ожидаемо. Но вот разница между двумя реализациями Catalyst Expression и одновременно значительно меньшая разница между Вариантом 2 и UDF на первый взгляд может удивлять. Я лично скорее этому не удивился, а расстроился, так как этот факт делал нецелесообразным "обертывания" существующих функций в Catalyst Expression - то есть "малой кровью" заметно улучшить производительность уже использующихся на проекте функций.
Так почему же мы наблюдаем заметную разницу в производительности двух вариантов реализации Catalyst Expression: inline-код vs. объекты Scala? Я вижу несколько причин:
Сложность генерации кода: внедрение объектов и классов Scala в
doGenCode
может увеличить сложность генерируемого кода Java. Это может привести к дополнительным затратам на генерацию, компиляцию и выполнение кода.Оптимизация компилятора: возможно, оптимизатору не удастся полностью оптимизировать вызов внешней функции, что так же приводит в потере производительности.
Дополнительная диспетчеризация функций которая влечет за собой дополнительные издержки во время выполнения.
Затраты на сборку мусора: может увеличиться использование памяти, что потенциально может привести к более частой сборке мусора, что повлияет на общую производительность.
Заключение
Итак, теперь мы умеем писать собственные Spark Native Functions. Начиная писать статью, я думал, что мы затронем и примеры функций и двух, и трех аргументов, но ... оказалось, что и без того статья получилась достаточно объемной. Тем, кому действительно интересно, без труда смогут, надеюсь, после прочтения данной статьи по образу и подобию разработать их. Просто наследуйте ваш case class
от BinaryExpression
или TernaryExpression
.
Наверное, один из самых важных вопросов - а оно вообще надо, кроме как в академических целях? Вопрос очень интересный, ИМХО :-)
Безусловно, использование Spark Native Functions за счет выполениея операций на уровне движка Spark может значительно ускорить обработку данных за счет оптимизации запросов и уменьшения накладных расходов. Кроме того, написание таких функций дает большую гибкость и возможность создавать специализированные операции для конкретных наших потребностей. С другой стороны, надо понимать, что это требует большей экспертности от команды, а так же может усложнить дальнейшую поддержку кода "на долгой дистаниции", так как найти квалицифированные кадры становится, по моим ощущениям, все сложнее и сложнее. Мое личное мнение через "прикладную" призму: адекватно оценить потенциальный прирост произовдительности с учетом ваших объемов данных, квалицикацию команды (в том числе среднесрочной и долгосрочной перспективах, что уже намного сложнее), и насколько этот прирост вам критичен с точки зрения решаемой бизнес-задачи. Возможно, есть куда более простые способы ускорения ваших Spark-job'ов
Например, мы сейчас запускаем наши Spark-job'ы в облаке на on-demand Databricks-кластерах, поднятие которых занимает 4-5 минут + 5-6 минут после этого работает само Spark-приложение. Выигрышь 10-20% времени (~1 минуты) за счет использования Spark Native Functions в таком сценарии при наших объемах данных не даст, ИМХО, значимого для бизнеса эффекта.
sshikov
Ну погодите, каталист же вряд ли многое понимает о нашем/вашем Java коде. И уж тем более о коде функций, которые взяты из существующих классов. Да и какие физические оптимизации ему даст это знание? Ну т.е. какие свойства выражений каталист позволяют применять к ним те оптимизации, которые нельзя применять к UDF, и что это за оптимизации?
Ninil Автор
Я думаю, тут разница в генерации кода. Предположу, что если код на Java, то Catalyst генерит Java байт-код из нашего inline-кода. Если же используем объекты Scala, то байт-код объектов Scala генериться компилятором Scala, а не самим Catalyst'ом (то есть Catalyst переисползует байт-код, полученный от копилятора Scala), что может помешать Catalyst'у провести часть оптимизаций.
Например, при манипуляции с колонками нам надо их найти, извлечь значния из различных "мест" памяти и только потом применить преобразования. Catalyst же при генерирации Java-кода для выражений, вроде как объединяет его с кодом для доступа к столбцам из представления InternalRow и компилирует все это в единый байт-код JVM. В том числе и поэтому, думаю, такой код более производительный.
sshikov
Ну то есть, эти оптимизации - они на уровне кода функции, скажем так? Т.е. это не оптимизация нашего плана, в виде скажем проталкивания предикатов (потому что предикат в форме Catalyst Expression все равно не сильно-то протолкнешь). Хотя в теории, если оптимизатор знает структуру выражения, то он его может попытаться пропихнуть и в JDBC источник, и в паркет (с ограничениями, разумеется).
Ninil Автор
Да, именно так.
Я хочу, как будет время, еще поглубже капнуть эту тему.