Привет, Хабр!
Сегодня разберём фичу из PySpark — UDTF. Если раньше мы писали UDF и UDAF, то UDTF — это про функцию, которая запускается в секции FROM
запроса и возвращает как бы несколько строк для каждой входной записи. Звучит круто.
UDTFs пригодятся, когда на один входной объект нужно получить множество выходных строк. Простой пример: у нас есть строка текста и мы хотим разделить её на слова так, чтобы каждое слово вышло отдельной строкой. Со стандартным UDF такое не сделать (он возвращает одно значение, например конкатенацию или длину). Но UDTF может делать цикл yield
внутри и выдавать сколько угодно строк. Итак, приступим к делу.
Что такое UDTF и зачем оно нужно
Кратко говоря, UDTF — это пользовательская табличная функция, при каждом вызове возвращающая целую таблицу (множество строк) вместо одного скалярного значения. В Spark 3.5 вводится поддержка Python-UDTF. Главное отличие от обычного UDF в том, что:
UDTF всегда вызывается в секции
FROM
SQL-запроса (или как функция DataFrame вselectExpr
).Она может принимать ноль или более аргументов — эти аргументы могут быть обычными скалярами или даже таблицами (Table Argument).
Для каждого входного вызова UDTF может выдать 0, 1 или много строк. Причём каждая строчка — это кортеж, соответствующий определённой схеме (returnType) функции.
Думаю, уже можно угадывать несколько сценариев. Например, UDTF удобно использовать для:
разбиения текста на слова, списков на элементы и т.п,
разворачивания коллекций в столбце,
распаковки всех возможных комбинаций (cartesian функции над полями),
повышения гибкости трансформаций, когда нужно вернуть несколько строк из одного.
Убедимся, как это работает.
Как создать UDTF в PySpark
По аналогии с UDF, у UDTF есть Python-класс, внутри которого надо реализовать методы eval
и опционально init
и terminate
. Структура примерно такая:
from pyspark.sql.functions import udtf
@udtf(returnType="schema")
class MyUDTF:
def __init__(self):
# можно инициализировать поля экземпляра (вызывается один раз на воркер)
self.counter = 0
def eval(self, arg1: тип, arg2: тип):
# метод будет вызываться на каждый входной ряд
# нужно делать yield кортежей (каждый yield = новая строка результата).
# аргументы arg1, arg2 — это значения из колонки или скалярных выражений.
...
def terminate(self):
# опционально: вызывается после обработки всех входных строк в текущем партиции.
# можно вернуть доп. строчки или очистить ресурсы.
...
Нюансы (а как без них:
-
Метод
eval(self, ...)
— ядро функции. Каждый раз, когда Spark даёт вашей UDTF новую входную строку, вызываетсяeval
, и можноyield
-ить ноль или больше кортежей. Каждыйyield
это одна строка в выходной таблице. Например:def eval(self, x: int): yield (x, x*x) # возвращаем одну строчку (два столбца) на каждое число
Метод
terminate(self)
— вызывается в конце (после всех входных). Например, если нужно добавить итого или доп. вычисления. На практике редко нужен, но Spark его поддерживает. Главное помнить, что вterminate
уже нет входных аргументов, и он можетyield
доп. строк, если нужно.Метод
init
— вызывается один раз при создании экземпляра на воркере. Здесь можно инициализировать состояние. Нельзя обращаться к SparkSession или другим Spark-объектам внутри UDTF!
И ещё необходимо задать схему выходной таблицы, например через параметр returnType
у декоратора @udtf
. Это можно сделать строкой DDL "col1: type, col2: type"
либо StructType
.
Пример: разбиваем строку на слова
Сценарий простой и понятный: есть текстовое поле, нужно получить слова. В SQL это обычно делается функциями explode+split, но тут мы покажем, как UDTF облегчает задачу. Определим UDTF, который будет принимать строку и yield
-ить каждое слово отдельно:
from pyspark.sql.functions import udtf
@udtf(returnType="word: string")
class SplitWords:
def eval(self, text):
# В целях безопасности проверим None
if text is None:
return
for word in text.split():
# yield берёт кортеж, даже из одного слова
yield (word,)
На каждый входной аргумент text
делим строку по пробелам и для каждого слова делаем yield (word,)
. Это означает на каждой итерации получаем одну строку выходной таблицы с одним столбцом word
. Никаких сложностей — обычный генератор в Python.
Теперь можно вызвать этот UDTF в PySpark:
from pyspark.sql.functions import lit
# Предположим, SparkSession уже создан как spark
SplitWords(lit("пyspark is powerful")).show()
Ожидаемый результат:
+---------+
| word|
+---------+
| pyspark|
| is|
| powerful|
+---------+
Каждое слово попало в отдельную строку. Так UDTF развернула одну входную строку в несколько выходных.
Или ещё вариант: можно зарегистрировать UDTF и вызывать через SQL. Например:
# Регистрируем функцию в Spark SQL
spark.udtf.register("split_words", SplitWords)
# Теперь в SQL-запросе
spark.sql("SELECT * FROM split_words('Hello world from PySpark')").show()
Результат аналогичный — появятся отдельные строки с каждым словом.
И можно даже использовать такую функцию вместе с LATERAL JOIN для более сложных запросов:
SELECT t.text, w.word
FROM VALUES ('Hello world'), ('Привет мир') AS t(text),
LATERAL split_words(t.text) AS w
Результат:
+------------+------+
| text| word|
+------------+------+
| Hello world| Hello|
| Hello world| World|
| Привет мир |Привет|
| Привет мир | мир |
+------------+------+
То есть LATERAL
даёт возможность каждой исходной строке развернуться в несколько, как бы встроенно вызвав наш SplitWords для каждого text
.
Пример: вычисляем числа и их квадраты
Ещё один пример — UDTF, который для диапазона чисел возвращает их квадраты. Создадим UDTF SquareNumbers
, берущий два числа (start и end) и выдающий все (n, n*n)
для n
от start до end:
from pyspark.sql.functions import udtf
@udtf(returnType="num: int, squared: int")
class SquareNumbers:
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
Мы объявили returnType="num: int, squared: int"
, то есть две колонки num
и squared
, обе типа int
. Метод eval
делает простой цикл и yield
-ит кортеж (num, num*num)
на каждой итерации. Всё просто и близко к тому, как вы бы это писали в обычном Python-генераторе.
Вызываем UDTF так же, как и раньше, через DataFrame API:
from pyspark.sql.functions import lit
square_udtf = SquareNumbers() # можно использовать класс напрямую, если вы декорировали его
square_udtf(lit(3), lit(5)).show()
Или вариант с udtf()
:
from pyspark.sql.functions import udtf
square_udtf = udtf(SquareNumbers, returnType="num: int, squared: int")
square_udtf(lit(3), lit(5)).show()
В обоих случаях вывод будет:
+---+-------+
|num|squared|
+---+-------+
| 3| 9|
| 4| 16|
| 5| 25|
+---+-------+
То есть для входных (start=3, end=5)
мы получили три строки: для 3
, 4
и 5
. Код оказался на удивление лаконичным благодаря UDTF. Если бы вы пытались это сделать в одном UDF, пришлось бы генерировать список или explode, а так всё понятно.
Регистрация и использование в SQL
Как уже упоминалось, UDTF можно регистрировать в Spark SQL через spark.udtf.register("имя", КлассUDTF)
. После этого можно использовать функцию в SQL-запросах. Например:
# Допустим, у нас уже есть класс WordSplitter (как в примере SplitWords)
spark.udtf.register("split_words", SplitWords)
spark.sql("SELECT * FROM split_words('hello world')").show()
Это выведет:
+-----+
| word|
+-----+
|hello|
|world|
+-----+
Более того, можно комбинировать UDTF с JOIN-ами. В SQL для того, чтобы работать с функцией, возвращающей множество строк на одну входную, используется ключевое слово LATERAL
. Вот пример:
SELECT x.text, w.word
FROM (SELECT 'Hello World' AS text UNION ALL SELECT 'Apache Spark') x
LATERAL split_words(x.text) AS w
Результат:
+------------+------+
| text| word|
+------------+------+
| Hello World| Hello|
| Hello World| World|
| Apache Spark|Apache|
| Apache Spark| Spark|
+------------+------+
После LATERAL функция видит поля из предыдущих частей запроса, и для каждой записи сразу выбрасывает свои строки в общий результат.
Оптимизация: Apache Arrow
UDTF, как и другие Python-функции в Spark, может быть ресурсоёмкой, если они интенсивно взаимодействуют между Python и JVM. Хорошая новость: Spark поддерживает Apache Arrow для ускорения UDTF. По умолчанию для Python UDTF Arrow выключен (так устроено, вероятно, для стабильности). Но если у вас:
небольшой вход и
очень много выходных данных (много строк на каждую входную),
то Arrow может сильно ускорить обмен данными. Чтобы включить Arrow для UDTF, нужно при объявлении функции указать useArrow=True
и/или установить конфиг spark.sql.execution.pythonUDTF.arrow.enabled=true
. Например:
@udtf(returnType="x: int, x1: int", useArrow=True)
class PlusOneArrow:
def eval(self, x: int):
yield (x, x+1)
После этого UDTF будет передавать данные по Arrow-блоку, что обычно быстрее, когда много строк.
Табличный аргумент UDTF
Ещё один штрих: UDTF может принимать табличный аргумент (TABLE(...)
) — целый набор строк. По дефолту допускается только один таблиный аргумент для производительности, но его можно включить для нескольких. Смысл в том, что вы можете написать UDTF, которая внутри себя обходит несколько входных строк, представленных как Row
-объекты.
Пример (из документации Spark):
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
# Если значение столбца "id" больше 5, возвращаем эту строку
if row["id"] > 5:
yield (row["id"],)
Тогда в SQL можно вызвать:
SELECT * FROM filter_udtf(
TABLE(SELECT * FROM range(10) AS t(id))
);
Выход:
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Иными словами, в eval
придёт целая таблица (как один аргумент типа Row
), и вы можете фильтровать или преобразовывать её построчно.
Нюансы
Несколько замечаний для применения UDTF:
Типизация и returnType. Указывайте
returnType
(схему) обязательно, иначе Spark не узнает, как склеивать столбцы из вашихyield
-ов. Лучше явно задавать, а не полагаться на автоматический анализ, чтобы избежать ошибок. Например,returnType="word: string"
или черезStructType
.Безопасность данных. Не делайте ничего, что меняет SparkSession или создаёт побочные эффекты. В
eval
старайтесь писать чистый код: все операции происходит в рамках одного входа.Управление памятью. Если вы из
eval
генерируете очень много строк, помните, что всё хранится пока UDTF работает. Слишком большие списки/генераторы могут съесть кучу памяти. Если нужно аккумулировать данные по всем строкам, лучше делать это постепенно, не собирая всё сразу.Работа с null. Покажем на примере: мы проверили вход на
None
в первом примере. Так нужно делать, потому что если в столбце вдруг будет null,eval
может выбросить ошибку. Безопаснее просто пропуститьreturn
, чем вызывать метод уNone
.Массовый ввод (Bulk). Если в UDTF хочется обрабатывать пакет сразу (например, считать статистику по всему столбцу), можно использовать
terminate
. Например, считать общее количество через инкремент счётчика вeval
, а вterminate
выдать итоговую строку. Но чаще UDTF нацелен на трансформацию строка в строки, а не агрегирование.Версии PySpark. UDTF доступен, начиная с Spark 3.5.0 и далее. Если вы работаете с более ранней версией, таких функций у вас не будет.
Убедитесь, что кластер Spark хотя бы 3.5+, иначе код с udtf
даже не поднимется.
Вывод
В итоге UDTF — это полезный инструмент для тех случаев, когда на одну входную запись нужно развернуть несколько выходных строк. Так что берите PySpark, пробуйте UDTF на практике, и пусть ваши запросы становятся гибче. Уверен, многим эта возможность понравится.
Если вы работаете с высоконагруженной инфраструктурой, то наверняка сталкивались с ситуациями, когда привычные подходы перестают выдерживать рост нагрузки. Кластеры и репликация MySQL, отказоустойчивые балансировщики, стабильность при сбоях — всё это вопросы, которые невозможно закрыть «на коленке». Ошибки здесь обходятся слишком дорого.
Чтобы разобраться в рабочих решениях и избежать типовых ловушек, приходите на эти бесплатные уроки:
2 сентября в 20:00 — Архитектура MySQL-кластера в высоконагруженных системах
15 сентября в 19:00 — Создание отказоустойчивого кластера с VRRP и HAProxy
23 сентября в 20:00 — Как построить устойчивую репликацию в MySQL
Эти занятия — лишь небольшая часть программы курса «Инфраструктура высоконагруженных систем». За 5 месяцев вы на практике разберёте кластеризацию, оркестрацию, виртуализацию и научитесь строить отказоустойчивую инфраструктуру, которая выдерживает реальные нагрузки. Пройдите вступительный тест — он покажет, подойдет ли программа курса именно вам.
Ninil
Не соваем удачные примеры. Мне кажется обрабатовать что-то в ЦИКЛЕ если мы работаем с большими данными (а иначе зачем вам Спарк) - так себе идея. Куда лучше аналогия с табличными функциями реляционной БД