Обработка массивов в Spark SQL

Сложные типы данных, такие как массивы (arrays), структуры (structs) и карты (maps), очень часто встречаются при обработке больших данных, особенно в Spark. Ситуация возникает каждый раз, когда мы хотим представить в одном столбце более одного значения в каждой строке, это может быть список значений в случае с типом данных массива или список пар ключ-значение в случае с картой.

Поддержка обработки этих сложных типов данных была расширена, начиная с версии Spark 2.4, за счет выпуска функций высшего порядка (HOFs). В этой статье мы рассмотрим, что такое функции высшего порядка, как их можно эффективно использовать и какие связанные с ними функции были выпущены в последних выпусках Spark 3.0 и 3.1.1. Для кода будем использовать Python API.

После агрегаций и оконных функций, которые мы рассмотрели в прошлой статье, HOF представляют собой еще одну группу более продвинутых преобразований в Spark SQL.

Давайте сначала посмотрим на разницу между тремя сложными типами данных, которые предлагает Spark.

ArrayType

l = [(1, ['the', 'quick', 'braun', 'fox'])]
df = spark.createDataFrame(l, schema=['id', 'words'])
df.printSchema()
root
 |-- id: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)

В примере выше у нас есть DataFrame с двумя столбцами, где столбец words имеет тип массива. Это означает, что в каждой из строк DataFrame мы можем представить список значений, который может быть разного размера. Кроме того, элементы массива имеют определенный порядок. Важным свойством является однородность массивов по типу элементов, все элементы должны иметь одинаковый тип. Для доступа к элементам массивов мы можем использовать индексы следующим образом:

df.withColumn('first_element', col('words')[0])

StructType

StructType используется для объединения некоторых субполей, которые могут иметь разный тип (в отличие от массивов). Каждое субполе имеет тип, а также имя, которое должно быть одинаковым для всех строк в DataFrame. Неожиданным может оказаться то, что субполя внутри структуры имеют определенный порядок, поэтому сравнение двух структур s1==s2, имеющих одинаковые поля, но расположенные в разном порядке, приводит к False.

Обратите внимание на фундаментальные различия между массивом и структурой:

  • массив: однороден по типам, допускается разный размер в каждой строке

  • структура: неоднородна по типам, требуется одинаковая схема для каждой строки

MapType

MapType можно рассматривать как смесь двух предыдущих типов: массив и структура. Представьте себе ситуацию, когда схема для каждой строки не задана, и вам необходимо поддерживать разное количество субполей в них. В таком случае вы не можете использовать структуру.  Но и массив не подходит из-за того, что каждый элемент имеет имя и значение (фактически это пара "ключ-значение") или по причине того, что элементы имеют разный тип - тогда в этом случае вам подойдет тип карта. С помощью типа карта вы можете хранить в каждой строке разное количество пар ключ-значение, но каждый ключ должен иметь один и тот же тип, а также все значения должны иметь один и тот же тип (который может отличаться от типа ключей). Порядок пар имеет значение.

Преобразование массивов

Прежде чем мы начнем говорить о преобразовании массивов, давайте сначала посмотрим, как можно его создать. Первый способ мы рассмотрели выше, где мы создали DataFrame из локального списка значений. С другой стороны, если у нас уже есть DataFrame и мы хотим сгруппировать некоторые столбцы в массив, можно использовать для этого функцию array(). Она позволяет создать массив из других существующих столбцов, поэтому, если у вас есть столбцы a, b, c и вы хотите, чтобы все значения были в массиве, а не в отдельных столбцах, вы можете сделать это следующим образом:

df.withColumn('my_arr', array('a', 'b', 'c'))

Кроме того, существуют функции, которые создают массив в результате преобразования. Это, например, функция split(), которая разбивает строку на массив слов. Другой пример - collect_list() или collect_set(), которые являются агрегатными функциями, также создающими массив.

А на практике наиболее распространенным способом получения массива в DataFrame является чтение данных из источника, поддерживающего сложные структуры данных, например, Parquet. В этом формате файлов некоторые столбцы могут храниться в виде массивов, поэтому Spark, естественно, будет считывать их в том же виде.

Теперь, когда мы знаем, как создать массив, давайте посмотрим, как его можно преобразовать.

Начиная с версии Spark 2.4 существует множество функций для преобразования массивов. Полный их список можно найти в документации PySpark. Например, все функции, начинающиеся с array_, могут использоваться для обработки массивов, вы можете находить min-max значения, дедуплицировать массивы, сортировать их, объединять и так далее. Далее, есть также concat(), flatten(), shuffle(), size(), slice(), sort_array(). Как видите, API в этом отношении достаточно развит, и существует множество операций, которые вы можете выполнять с массивами в Spark.

Помимо вышеупомянутых функций, существует и такие, которые принимают в качестве аргумента другую функцию, впоследствии применяемую к каждому элементу массива - они называются функциями высшего порядка ( HOFs). Важно знать о них то, что в Python API они поддерживаются с версии 3.1.1, а в Scala API они появились в версии 3.0. С другой стороны, в выражениях SQL их можно использовать начиная с версии 2.4.

Чтобы увидеть некоторые конкретные примеры, давайте рассмотрим следующий простой DataFrame:

l = [(1, ['prague', 'london', 'tokyo', None, 'sydney'])]
df = spark.createDataFrame(l, ['id', 'cities'])
df.show(truncate=False)
+---+-------------------------------------+
|id |cities                               |
+---+-------------------------------------+
|1  |[prague, london, tokyo, null, sydney]|
+---+-------------------------------------+

Допустим, мы хотим выполнить эти пять независимых задач:

  1. Преобразовать начальную букву каждого города в верхний регистр.

  2. Избавиться от нулевых значений в массиве.

  3. Проверить, есть ли элемент, начинающийся с буквы t.

  4. Проверить, есть ли в массиве нулевое значение.

  5. Суммировать количество символов (длину) каждого города в массиве.

Это несколько типичных примеров задач, которые можно решить с помощью HOFs. Поэтому давайте рассмотрим их по очереди:

TRANSFORM

Для решения первой задачи мы можем использовать HOF transform, которая просто берет анонимную функцию, применяет ее к каждому элементу исходного массива и возвращает другой преобразованный массив. Синтаксис следующий:

df \
.withColumn('cities', transform('cities', lambda x: initcap(x))) \
.show(truncate=False)
+---+-------------------------------------+
|id |cities                               |
+---+-------------------------------------+
|1  |[Prague, London, Tokyo, null, Sydney]|
+---+-------------------------------------+

Как видите, transform() принимает два аргумента, первый - массив, который должен быть преобразован, а второй - анонимная функция. Чтобы выполнить преобразование, используем initcap() внутри анонимной функции и применяем ее к каждому элементу массива - это именно то, что позволяет сделать transform HOF. С выражениями SQL его можно использовать следующим образом:

df.selectExpr("id", "TRANSFORM(cities, x -> INITCAP(x)) AS cities")

Обратите внимание, что анонимная функция в SQL выражается с помощью стрелочной (->) нотации.

FILTER

Во второй задаче мы хотим отфильтровать нулевые значения из массива. Для этого (как и для любой другой фильтрации) можно использовать filter HOF. Он позволяет нам применить анонимную функцию, которая возвращает булево значение (True/False) для каждого элемента, и она вернет новый массив, содержащий только элементы, для которых функция вернула True:

df \
.withColumn('cities', filter('cities', lambda x: x.isNotNull())) \
.show(truncate=False)
+---+-------------------------------+
|id |cities                         |
+---+-------------------------------+
|1  |[prague, london, tokyo, sydney]|
+---+-------------------------------+

Здесь в анонимной функции мы вызываем функцию PySpark isNotNull(). Синтаксис SQL выглядит следующим образом:

df.selectExpr("id", "FILTER(cities, x -> x IS NOT NULL) AS cities")

EXISTS

В следующей задаче мы хотим проверить, содержит ли массив элементы, удовлетворяющие некоторому определенному условию. Обратите внимание, что это более общий пример ситуации, в которой мы хотим проверить наличие определенного элемента. Например, если же мы хотим проверить, содержит ли массив элемент city prague (город Прага), можно просто вызвать функцию array_contains:

df.withColumn('has_prague', array_contains('cities', 'prague'))

С другой стороны, exists HOF позволяет нам применять более общее условие к каждому элементу. Результат теперь не массив, как это было в двух предыдущих HOF, а просто True/False:

df \
.withColumn('has_t_city', 
  exists('cities', lambda x: x.startswith('t'))) \
.show(truncate=False)
+---+-------------------------------------+----------+
|id |cities                               |has_t_city|
+---+-------------------------------------+----------+
|1  |[prague, london, tokyo, null, sydney]|true      |
+---+-------------------------------------+----------+

Здесь, в анонимной функции, мы использовали функцию PySpark startswith().

FORALL

Четвертый вопрос: мы хотим проверить, удовлетворяют ли все элементы массива какому-то условию. Для нашего примера нужно определить, что все они не равны нулю (null):

df \
.withColumn('nulls_free',forall('cities', lambda x: x.isNotNull()))\
.show(truncate=False)
+---+-------------------------------------+----------+
|id |cities                               |nulls_free|
+---+-------------------------------------+----------+
|1  |[prague, london, tokyo, null, sydney]|false     |
+---+-------------------------------------+----------+

Как видите, forall очень похож на exists, только теперь мы проверяем, выполняется ли условие в отношении всех элементов, а раньше искали хотя бы один.

AGGREGATE

В последней задаче мы хотим просуммировать длины каждого слова в массиве. Это является примером ситуации, в которой необходимо свести весь массив к одному значению, и для такого рода задач можно использовать aggregate HOF.


df \
.withColumn('cities', filter('cities', lambda x: x.isNotNull())) \
.withColumn('cities_len', 
  aggregate('cities', lit(0), lambda x, y: x + length(y))) \
.show(truncate=False)
+---+-------------------------------+----------+
|id |cities                         |cities_len|
+---+-------------------------------+----------+
|1  |[prague, london, tokyo, sydney]|23        |
+---+-------------------------------+----------+

И с помощью SQL:

df \
.withColumn("cities", filter("cities", lambda x: x.isNotNull())) \
.selectExpr(
    "cities", 
    "AGGREGATE(cities, 0,(x, y) -> x + length(y)) AS cities_len"
)

Как вы можете видеть, синтаксис немного сложнее по сравнению с предыдущими HOF. aggregate принимает больше аргументов, первый - это массив, который мы хотим преобразовать, второй - это исходное значение, с которого собираемся начать. В нашем случае начальное значение равно нулю (lit(0)), и мы будем добавлять к нему величину длины каждого города. Третий аргумент - анонимная функция, и теперь она сама принимает два аргумента - первый (в нашем примере x) - это подвижный буфер, к которому мы добавляем длину следующего элемента, представленного вторым аргументом (в нашем примере y).

Дополнительно может быть предоставлен четвертый аргумент, который представляет собой еще одну анонимную функцию, преобразующую конечный результат. Это полезно, если мы хотим сделать более сложную агрегацию. Например, если мы хотим вычислить среднюю длину, нам нужно иметь два значения, сумму и количество, чтобы разделить их в последнем преобразовании следующим образом:

(
    df
    .withColumn('cities', filter('cities', lambda x: x.isNotNull()))
    .withColumn('cities_avg_len', 
        aggregate(
            'cities', 
            struct(lit(0).alias('sum'), lit(0).alias('count')), 
            lambda x, y: struct(
                (x.sum + length(y)).alias('sum'), 
                (x.count + 1).alias('count')
            ),
            lambda x: x.sum / x.count
        )
    )
).show(truncate=False)
+---+-------------------------------+--------------+
|id |cities                         |cities_avg_len|
+---+-------------------------------+--------------+
|1  |[prague, london, tokyo, sydney]|5.75          |
+---+-------------------------------+--------------+

Как вы видите, это более продвинутый пример, в котором нам нужно будет иметь около двух значений во время агрегации, и мы представляем их с помощью struct(), которая имеет два субполя sum и count. С помощью первой анонимной функции мы вычисляем конечную сумму всех длин, а также count — количество просуммированных элементов. Во второй анонимной функции делим эти два значения, чтобы получить окончательное среднее значение. Также обратите внимание, что перед использованием aggregate мы сначала отфильтровали нулевые значения, потому что если мы сохраним нулевое значение в массиве, то сумма (а также среднее значение) станет нулевой.

Чтобы ознакомиться с другим примером применения aggregate HOF в выражениях SQL, посмотрите этот вопрос на Stack Overflow.

Помимо этих пяти вышеупомянутых HOF, существует также zip_with, который можно использовать для объединения двух массивов в один. Кроме этого, существуют и другие HOF, такие как map_filter, map_zip_with, transform_keys и transform_values, которые используются с картами, и мы рассмотрим их в одной из следующих статей.

Заключение

В этой статье были рассмотрены функции высшего порядка (HOFs), которые появились в версии Spark 2.4. Сначала она поддерживалась только в выражениях SQL, но начиная с версии 3.1.1 доступна и в Python API. Также привели примеры пяти HOFs, которые позволяют преобразовывать, фильтровать, проверять существование и объединять элементы в массивах Spark. До появления HOF большинство этих задач приходилось решать с помощью функций, определяемых пользователем. Однако подход HOF более эффективен с точки зрения производительности, и чтобы увидеть некоторые контрольные показатели эффективности, обратитесь к другой моей недавней статье, в которой приведены конкретные цифры.


Материал подготовлен в рамках курса «Spark Developer». Если вам интересно узнать подробнее о формате обучения и программе, познакомиться с преподавателем курса — приглашаем на день открытых дверей онлайн. Регистрация здесь.

Комментарии (0)