Ремарка:
В данной статье представлено, как автор решил свою проблему и не утверждает, что нельзя было сделать это лучше или красивее. Также автор не утверждает, что данную проблему или задачу следует решать именно таким методом и не настаивает на повторении данного подхода. Это лишь один из возможных способов решения, предложенный автором, и он представлен здесь исключительно для ознакомления.
Автор отмечает, что не смог найти информацию по данной проблеме ни на русскоязычных, ни на англоязычных ресурсах, включая Хабр и другие источники. Статья представляет собой дискуссионный материал, и автор будет рад увидеть альтернативные решения в комментариях.
Что я пытаюсь сделать:
Рассмотрим создание UDF blue
, который возвращает строку "0000FF" на Scala API."
object Example1 extends App with Configure {
spark.udf.register("blue", udf(() => "0000FF")) // создаем и регестрируем udf
spark.sql("select blue()").show()
/* вывод:
+------+
|blue()|
+------+
|0000FF|
+------+
*/
}
Рассмотрим ситуацию, когда сама функция UDF поступает в программу в виде строки. Например, у нас есть переменная code
, которая содержит "() => "0000FF""
. В рантайме необходимо получить объект типа Function0[String]
из этой строки и использовать его для создания UDF. Таким образом, моя цель заключается в написании программы, способной обрабатывать сложные лямбды с произвольным количеством аргументов и произвольными выходными данными в виде строк.
например:
"(num: Int) => num * num"
"(str: String) => str.reverse"
"(num1: Double, str: String) => Math.pow(num1, str,size)"
и тд
как не получилось делать и какая возникала ошибка:
расмотрим следующий код
import org.apache.spark.sql.functions.udf
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
object Example2 extends App with Configure {
def udfRegister(labdaCode: String): Unit = {
import universe._
val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()
val tree = toolbox.parse(labdaCode)
val compiledCode = toolbox.compile(tree)
val function = compiledCode().asInstanceOf[Function0[String]]
//сторока успешно конвертируеться в Function0(String)
println(function()) // вывод: 0000FF
val udfBlue = udf(() => function()) // успешное создание udf
spark.udf.register("blue", udfBlue)
}
udfRegister("() => \"0000FF\"")
spark.sql("select blue()").show() // возникает ошибка
}
При запуске задачи возникает следующая ошибка: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
Ошибка указывает на то, что в коде или при работе с Spark возникла попытка использовать SerializedLambda вместо ожидаемого типа scala.Function1 (функция с одним аргументом).
В Spark функции должны быть сериализуемыми, чтобы их можно было передать через кластер. SerializedLambda может возникать при попытке передачи некорректно сериализуемой функции.
В свою очередь SerializedLambda
есмь класс, используемый в Java для сериализации лямбда-выражений и методов ссылок. Этот интерфейс генерируется компилятором Java при сериализации лямбда-выражений. (судя по сему данный класс не предоставляет необходимого спарку инструмента для сериализации)
Особенно запутывает то, что данная ошибка не возникает при конвертации полученного объекта в Function0[String]
, который Spark полностью поддерживает, а также не возникает при создании объекта UDF. Ошибка проявляется уже в рантайме при запуске задачи на кластере.
Однако следующий код снова работает:
import org.apache.spark.sql.functions.udf
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
object Example3 extends App with Configure {
val labdaCode = "() => \"0000FF\""
import universe._
val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()
val tree = toolbox.parse(labdaCode)
val compiledCode = toolbox.compile(tree)
val function = compiledCode().asInstanceOf[Function0[String]]
//сторока успешно конвертируеться в Function0(String)
println(function()) // вывод: 0000FF
val udfBlue = udf(() => function()) // успешное создание udf
spark.udf.register("blue", udfBlue) // успешное регестрирование udf
spark.sql("select blue()").show()
/*вывод:
+------+
|blue()|
+------+
|0000FF|
+------+
*/
}
В сущности в Example3
происходит все тоже самое что и в Example2
, но первое работает, а второе нет что вводит еще в большее замешательство
import org.apache.spark.sql.functions.udf
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
object Example4 extends App with Configure {
if (true) {
val labdaCode = "() => \"0000FF\""
import universe._
val toolbox = universe.runtimeMirror(this.getClass.getClassLoader).mkToolBox()
val tree = toolbox.parse(labdaCode)
val compiledCode = toolbox.compile(tree)
val function = compiledCode().asInstanceOf[Function0[String]]
//сторока успешно конвертируеться в Function0(String)
println(function()) // вывод: 0000FF
val udfBlue = udf(() => function()) // успешное создание udf
spark.udf.register("blue", udfBlue) // успешное регестрирование udf
}
spark.sql("select blue()").show() // возникает ошибка
}
как и Example3 Example4 неделает тоже самое, но почему то блок if снова введет к ошибке
так же к ошибкам приводит и использование циклов и монад, Try и тд (но тут я это демонстрировать не буду)
таким образом все сводиться к тому чтобы правильным образом создать function
которая бы не являлась бы SerializedLambda
и есть следующие ограничения:
можно создавть лямбды/функции только в мкетоде main
нельзя из создовать в блоках if, try, for, монадах и тд.
хоть примеры и демонстрируют работу с совсем простой функцией blue, моя задача сделать универсальное приложение, которое будет принимать любую лямбду (на самом деле от 0 до 10 поскольку максимальный UDF10) в формате строки
Определимся с тем как программа получает информацию о функциях:
Я хочу чтобы информацию получало в формате JSON и путсь у него будет следующая структура:
[ //массив поскольку функций может прийти много или неприйти вообще
{
"name": "blue", //имя бля udf
"targetType": "String", //тип данных для возращаемый
"fun": "() => \"00ff00\"", //собственно сама лямбда
"imports": ["...", "..."] //опциональное, на случай если нужны дополнительные импорты
},
// остальные имеют такую же структуру
{
"name": "reverse",
"targetType": "String",
"fun": "(str:String) => str.reverse"
},
{
"name": "plus",
"targetType": "Int",
"fun": "(i1:Int, i2:Int) => i1 + i2"
},
{
"name": "sum3",
"types": "Int,Int,Int",
"targetType": "Int",
"fun": "(i1:Int, i2:Int, i3:Int) => i1 + i2 + i3"
}
]
Определимся с классом который будет отражать данную структуру Json:
// думаю тут не стоит расписывать какое поле за что отвечает
case class UDFConfig(name: String,
targetType: String,
fun: String,
imports: List[String] = List.empty)
Для парсинга json файла я буду использовать библиотеку json4s
и следующий класс
package Util
import org.json4s.Formats
import org.json4s.native.JsonMethods.parse
//небольшой обьект для ковертации строки в формате json в список обьектов типа JsonType
object JsonHelper {
private def jsonToListConvert[JsonType](json: String)(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]] = {
parse(json)
.extract[List[JsonType]]
.map(Right(_))
}
implicit class JsonConverter(json: String) {
def jsonToList[JsonType]()(implicit mf: Manifest[JsonType], formats: Formats): List[Either[Throwable, JsonType]]= {
jsonToListConvert[JsonType](json)
}
}
}
И создадим трейт Configure
в котором и определи переменые которые понадобяться для дальнейшей работы
Иimport config.UDFConfig
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats
import scala.io.Source
trait Configure {
lazy val spark = SparkSession
.builder()
.appName("Example UDF runtime compile")
.master("local[*]")
.getOrCreate()
lazy val sc = spark.sparkContext
// создали спрак сессию и контекст
lazy val pathUdfsJson = "./UDFs.json" //путь до json c функциями
implicit lazy val formats = DefaultFormats // список форматов для json4s, в моем случае хватеает дефолтных форматов
import Util.JsonHelper._
lazy val udfJson = getJson(pathUdfsJson) // получаем сам json в виде строки
lazy val udfsConfigs: List[UDFConfig] = udfJson
.jsonToList[UDFConfig]
.filter(_.isRight)
.map {
case Right(udfconfig) => udfconfig
} // получаем все корректные udfconfig
// этот обект мне и понадобиться
//метод для чтения файлов
private def getJson(path: String): String = {
val file = Source.fromFile(path)
try {
file.getLines().mkString("\n")
} finally {
file.close()
}
}
}
Определим последний обьект который будет udfconfig переводить в строку которую будет динамически в ратайме превращаться в FunctionN в последствии
package Util
import config.UDFConfig
object UDFHelper {
private def configToStringConvert(udfConfig: UDFConfig): String = {
//создадим все импорты для итоговой строки
val imports = udfConfig
.imports // достаем импорты из конфига
.map(_.trim.stripPrefix("import ").trim) // удаляем с лева ключевое слово если оно есть
.distinct // оставляем уникальные
.map(imp => f"import ${imp}") // конкатенируем слева ключевое слово
.mkString("\n") // все соеденяем через отступ
val Array(params, functionBody) = udfConfig.fun.split("=>", 2).map(_.trim) // Отделяем функциональную часть от переменных
val paramsTypes: Seq[(String, String)] = params
.stripPrefix("(") // Убираем с лева скобку
.stripSuffix(")") // Убираем с права скобку
.split(",") // Разделяем параметры
.map(_.trim) // Убираем лишние пробелы
.map {
case "" => null
case param =>
val Array(valueName, valueType) = param.split(":").map(_.trim)
(valueName, valueType)
} // разделяем имя переменой от типа данных этой переменной
.filter(_ != null) // отвильтровываем null-ы
val funcTypes: String = paramsTypes.size match {
case 0 => udfConfig.targetType
case _ => f"${List
.fill(paramsTypes.size)("Any")
.mkString("", ", ", ", ")}${udfConfig.targetType}"
} //здесь получаем перечисление через запятую типов данных Function
val anyParams = paramsTypes.map(_._1.trim + "_any").mkString(", ") //парметры лямда вырожения
val instances = paramsTypes.map {
case (valueName, valueType) =>
f" val ${valueName}: ${valueType} = ${valueType}_any.asInstanceOf[${valueType}]"
}.mkString("\n") // тут определяем конвертации парметров люмбды в необхадимые типы
// собираем все вместе в итоговый стринг
f"""
|${imports}
|
|val func: Function${paramsTypes.size}[${funcTypes}] = (${anyParams}) => {
|
|${instances}
|
| (
|${functionBody}
| ).asInstanceOf[${udfConfig.targetType}]
|
|}
|
|func
|""".stripMargin
}
implicit class Converter(udfConfig: UDFConfig) {
def configToString(): String = {
configToStringConvert(udfConfig)
}
}
}
По поводу использования типа Any
: как я уже упоминала, моя задача заключается в создании универсальной логики, где передаваемая через JSON лямбда или функция может иметь любое количество параметров (на практике от 0 до 10). В условиях, где невозможно динамически преобразовать строку в FunctionN
, использование типа Any
является единственным выходом (в дальнейшем будет ясно, почему). Иначе пришлось бы создавать множество объектов для каждого возможного числа параметров.
что весьма много.
9 поскольку я решила ограничеться 9-тью типами данных: String, Int, Boolean, Byte, Short, Long, Float, Double, Date, Timestamp
Создаем обьекты регистраторы:
Поскольку мне не разрешено использовать конструкции типа if
или match
, я не могу динамически определять количество параметров в одном объекте UDFRegN
и соответственно настраивать его. В результате мне пришлось создавать несколько отдельных объектов UDFRegN
, каждый из которых поддерживает определённое количество параметров. Например, для случая с нулевым количеством параметров я создал соответствующий объект UDFRegN
.
Это подход необходим из-за ограничений, которые не позволяют динамически адаптировать логику UDF в зависимости от передаваемого числа параметров. Таким образом, в данном случае было бы необходимо создать 11 отдельных объектов UDFRegN
для обработки всех возможных комбинаций параметров.
import org.apache.spark.sql.functions.udf
import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
object UDFReg0 extends App with Configure { // как говорилось можно только функцию main использовать
val indexcode = Integer.parseInt(args(0)) // это единственная возможность получить парметр извне и предполагается что это тндекс в списке UDFsConfig
import Util.UDFHelper._
val udfConfig = udfsConfigs(indexcode) //получаем udfConfig (из Configure)
val functionCode = udfConfig.configToString() // преобразовываем его в код
val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val tree = toolbox.parse(functionCode)
val compiledCode = toolbox.compile(tree)
val function = compiledCode().asInstanceOf[Function0[Any]] // получаем функцию из кода
val myUDF = udfConfig.targetType match { // выбираем instanceOF в зависимости от типа данных который должна возращать функция
case "String" => udf(() => function().asInstanceOf[String])
case "Int" => udf(() => function().asInstanceOf[Int])
case "Boolean" => udf(() => function().asInstanceOf[Boolean])
case "Byte" => udf(() => function().asInstanceOf[Byte])
case "Short" => udf(() => function().asInstanceOf[Short])
case "Long" => udf(() => function().asInstanceOf[Long])
case "Float" => udf(() => function().asInstanceOf[Float])
case "Double" => udf(() => function().asInstanceOf[Double])
case "Date" => udf(() => function().asInstanceOf[Date])
case "Timestamp" => udf(() => function().asInstanceOf[Timestamp])
case _ => throw new IllegalArgumentException(f"Неизвестный тип")
}
spark.udf.register(udfConfig.name, myUDF) // регистраця UDF
}
И осталось написать еще 10 однотипных почти точно таких же "регестраторов"
и для этого напишем последний скрипт, чтоб не делать жто в ручную)
import java.io.{File, PrintWriter}
object GenerateRegestrators extends App {
(0 to 10).toList.map {
num =>
val types = (0 to num).toList.map(_ => "Any").mkString(", ")
val lambdaVaues = (0 until num).toList.map(n => f"value${n}: Any").mkString(", ")
val functionValues = (0 until num).toList.map(n => f"value${n}").mkString(", ")
f"""
|import org.apache.spark.sql.functions.udf
|
|import java.sql.{Date, Timestamp}
|import scala.reflect.runtime.universe
|import scala.tools.reflect.ToolBox
|
|object UDFReg${num} extends App with Configure {
|
| val indexcode = Integer.parseInt(args(0))
|
| import Util.UDFHelper._
| val udfConfig = udfsConfigs(indexcode)
| val functionCode = udfConfig.configToString()
|
| val toolbox = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
|
| val tree = toolbox.parse(functionCode)
| val compiledCode = toolbox.compile(tree)
| val function = compiledCode().asInstanceOf[Function${num}[${types}]]
|
| val myUDF = udfConfig.targetType match {
| case "String" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[String])
| case "Int" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Int])
| case "Boolean" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Boolean])
| case "Byte" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Byte])
| case "Short" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Short])
| case "Long" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Long])
| case "Float" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Float])
| case "Double" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Double])
| case "Date" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Date])
| case "Timestamp" => udf((${lambdaVaues}) => function(${functionValues}).asInstanceOf[Timestamp])
| case _ => throw new IllegalArgumentException(f"Неизвестный тип")
| }
|
| spark.udf.register(udfConfig.name, myUDF)
|
|}
|""".stripMargin
}
.zipWithIndex
.foreach {
case (str, index) =>
val file = new File(f"./src/main/scala/UDFReg${index}.scala")
val writer = new PrintWriter(file)
try {
writer.write(str)
} finally {
writer.close()
}
}
}
тут пояснять нечего, оно просто генерит остальные обьекты регистраторы по аналогии
после запуска скрипта в проекте появляются недомтоющие классы UDFRegN
.
собираем все вместе:
Итого остается лишишь написать последний пример где наконец динамическая компиляция UDF из строки заработает:
object Example5 extends App with Configure {
import Util.UDFHelper._
udfsConfigs.zipWithIndex.foreach { //индекс нужен чтоб передать в args
case (udfConfig, index) =>
udfConfig.fun.split("=>", 2)(0).count(_ == ':') match {
//тут и происходит выбор какому регестратору передать
case 0 => UDFReg0.main(Array( index.toString ))
case 1 => UDFReg1.main(Array( index.toString ))
case 2 => UDFReg2.main(Array( index.toString ))
case 3 => UDFReg3.main(Array( index.toString ))
case 4 => UDFReg4.main(Array( index.toString ))
case 5 => UDFReg5.main(Array( index.toString ))
case 6 => UDFReg6.main(Array( index.toString ))
case 7 => UDFReg7.main(Array( index.toString ))
case 8 => UDFReg8.main(Array( index.toString ))
case 9 => UDFReg9.main(Array( index.toString ))
case 10 => UDFReg10.main(Array( index.toString ))
case _ => Unit
}
}
spark.sql("select blue(), reverse('namoW tobor ociP ociP') AS rev, plus(1, 2), sum3(1, 1, 1)").show
/* вывод
+------+--------------------+----------+-------------+
|blue()| rev|plus(1, 2)|sum3(1, 1, 1)|
+------+--------------------+----------+-------------+
|00ff00|Pico Pico robot W...| 3| 3|
+------+--------------------+----------+-------------+
*/
}
и таки получилось.
P.S. код можно глянуть тут
sshikov
По-моему, вот ваш случай: https://issues.apache.org/jira/browse/SPARK-29497