Привет, Хабр! Я Алексей Скахин, инженер данных в «ДАР» (ГК «КОРУС Консалтинг»). Apache Spark — это мощный фреймворк для распределённой обработки больших объёмов данных, позволяющий выполнять сложные вычисления на кластерах компьютеров с высокой производительностью и гибкостью.

Apache Spark 4 released
Apache Spark 4 released

И вот 23 мая 2025 года компания Apache выпустила новую версию Spark 4.

Стоит отметить, что Apache Spark — масштабный фреймворк с широким функционалом. В данной статье я сосредоточусь на нововведениях, которые в первую очередь затронут пользователей Spark SQL и PySpark.

Полный список 5100 изменений от 390 контрибьюторов представлен на официальном сайте.

Изменения в Spark SQL

С моей точки зрения, основные изменения произошли в Spark SQL. Появляются зачатки процедурного расширения SQL.

[SPARK-42849] Session Variables

SQL переменные, который можно использовать в течение всей spark сессии.

Данный функционал будет полезен для проектов, где в первую очередь используется Spark SQL для расчета витрин.

К примеру отчет, который хранится в виде Spark SQL файла. Теперь параметр расчета можно получить 1 раз и переиспользовать его в нескольких местах:

--переменная может быть как константой, так и результатом запроса:
SET start_date = (select value from settings where name = 'last_copy')
;
--далее фильтруем несколько запросов используя значение переменной
select ${start_date}
;
--обработка заказов
SELECT * 
FROM orders 
WHERE order_date > ${start_date}
;
--обработка продаж
SELECT * 
FROM sales 
WHERE sale_date > ${start_date}

Вызов данного SQL:

[spark.sql(s).show() for s in sql.split(";")]

+----------+--------------------+
|       key|               value|
+----------+--------------------+
|start_date|(select value fro...|
+----------+--------------------+

+----------------+
|scalarsubquery()|
+----------------+
|      2023-06-02|
+----------------+

+---+----------+
| id|order_date|
+---+----------+
|  3|2023-06-03|
|  4|2023-06-04|
|  5|2023-06-05|
+---+----------+

+---+----------+
| id| sale_date|
+---+----------+
| 30|2023-06-03|
| 40|2023-06-04|
| 50|2023-06-05|
|  3|2025-06-03|
+---+----------+

Стоит заметить, что параметр не рассчитывается, а передается каждый раз выражением в запрос, что снижает возможности использования, если в расчете переменной сложное выражение.
Это видно по плану запросе, где вместо конкретного значения появляется subquery

explain SELECT * 
FROM sales 
WHERE sale_date > ${start_date}
;

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(sale_date#3) AND (sale_date#3 > Subquery subquery#201, [id=#465]))
   :  +- Subquery subquery#201, [id=#465]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [value#5]
   :           +- Filter (isnotnull(name#4) AND (name#4 = last_copy))
   :              +- Scan ExistingRDD[name#4,value#5]
   +- Scan ExistingRDD[id#2L,sale_date#3]

[SPARK-46246] EXECUTE IMMEDIATESQL support

Вызов SQL с передачей подготовленных параметров:

DECLARE sqlStr1 = 'SELECT SUM(col1) FROM VALUES(?), (?)';
DECLARE arg10 = 5;
DECLARE arg20 = 6;
EXECUTE IMMEDIATE sqlStr1 USING arg10, arg20;

+---------+
|sum(col1)|
+---------+
|       11|
+---------+

Declare и set можно комбинировать

DECLARE sqlStr1 = 'SELECT SUM(col1) FROM VALUES(?), (?)';
SET test_arg = 123;
DECLARE arg1 = ${test_arg};
EXECUTE IMMEDIATE sqlStr1 USING ${test_arg}, arg1;

+---------+
|sum(col1)|
+---------+
|      246|
+---------+

В бинд переменную нельзя передать сессионую переменную с выражением:

SET test_arg = (select 1);

EXECUTE IMMEDIATE sqlStr1 USING ${test_arg}, arg1;

# A query parameter contains unsupported expression.
# Parameters can either be variables or literals

Из-за того что SQL и переменные в EXECUTE IMMEDIATE должны быть статическими значениями, пока что это выражение затруднительно использовать для динамического формирования запросов.

Дополнительно напомню, что именованные бинды появились в PySpark 3.4 (SPARK-41271, SPARK-42702, SPARK-40281)

spark.sql(
    sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows",
    args = Map(
        "startDate" -> "DATE'2022-12-01'",
        "maxRows" -> "100"))

а позиционные в 3.5

spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}"
          , args=[5, 2], df=mydf).show()

[SPARK-46057] Support SQL user-defined functions

Добавлена возможность создания и использования фунций в Spark SQL.

Это могут быть как простые математические выражения

CREATE FUNCTION to_hex(x INT COMMENT 'Any number between 0 - 255')
  RETURNS STRING
  COMMENT 'Converts a decimal to a hexadecimal'
  CONTAINS SQL DETERMINISTIC
  RETURN lpad(hex(least(greatest(0, x), 255)), 2, 0)
;
SELECT to_hex(id) FROM range(2)
;
+--------------------------------+
|spark_catalog.default.to_hex(id)|
+--------------------------------+
|                              00|
|                              01|
+--------------------------------+

EXPLAIN SELECT to_hex(id) FROM range(2);
== Physical Plan ==
*(1) Project [lpad(hex(cast(least(greatest(0, cast(id#225L as int)), 255) as bigint)), 2, 0) AS spark_catalog.default.to_hex(id)#226]
+- *(1) Range (0, 2, step=1, splits=20)

Так и обращения к другим таблицам за результатом.

Результатом может быть таблица:

CREATE OR REPLACE TEMPORARY FUNCTION 
     get_order_name(order_id INT COMMENT 'Order ID') 
   RETURNS TABLE(name STRING COMMENT 'order name')
   READS SQL DATA SQL SECURITY DEFINER
   COMMENT 'Get order name by ID'
   RETURN SELECT order_name FROM orders WHERE id = get_order_name.order_id
;

SELECT id, get_order_name.name 
    FROM VALUES(1),
               (3) AS orders(id),
         LATERAL get_order_name(orders.id)
;

+---+------+
| id|  name|
+---+------+
|  1|name 1|
|  3|name 3|
+---+------+

Но стоит заметить, что как и сессионные переменные по сути это подставляемые шаблоны подготовленных SQL:

explain SELECT id, get_order_name.name 
    FROM VALUES(1),
               (3) AS orders(id),
         LATERAL get_order_name(orders.id)
;

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#212, name#218]
   +- BroadcastHashJoin [cast(id#212 as bigint)], [id#86L], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=532]
      :  +- LocalTableScan [id#212]
      +- Project [order_name#88 AS name#218, id#86L]
         +- Filter isnotnull(id#86L)
            +- Scan ExistingRDD[id#86L,order_date#87,order_name#88]

На текущий момент нет циклов, условий или динамического SQL.

Другие вариации функций можно видеть в тестах: https://github.com/apache/spark/blob/master/sql/core/src/test/resources/sql-tests/inputs/sql-udf.sql

[SPARK-44444] Use ANSI SQL mode by default

Теперь Spark SQL по умолчанию использует ANSI синтаксис SQL

Множество запросов, которые раньше давали NULL на части запросов, теперь возвращают exception:

spark.conf.set("spark.sql.ansi.enabled", "true") #default

spark.sql("select 1/0").show()
# ArithmeticException: [DIVIDE_BY_ZERO] Division by zero. 
# Use try_divide to tolerate divisor being 0 and return NULL instead. 
# If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. 
# SQLSTATE: 22012

spark.sql("select cast('test' as int)").show()
# NumberFormatException: [CAST_INVALID_INPUT] The value 'test' of the type "STRING" cannot be cast to "INT" because it is malformed. 
# Correct the value as per the syntax, or change its target type. 
# Use try_cast to tolerate malformed input and return NULL instead. 
# SQLSTATE: 22018

Как было раньше:

spark.conf.set("spark.sql.ansi.enabled", "false")

spark.sql("select 1/0").show()
+-------+
|(1 / 0)|
+-------+
|   NULL|
+-------+

spark.sql("select cast('test' as int)").show()
+-----------------+
|CAST(test AS INT)|
+-----------------+
|             NULL|
+-----------------+

[SPARK-49555] SQL Pipe syntax

Расширение возможностей синтаксиса SQL. Это не новый язык, а переосмысление существующего GoogleSQL

Основные преимущества:

  • multi-level aggregations without subqueries

  • Filtering anywhere

  • starting from - “inside-out” structure

Пример обычного SQL содержит вложенный подзапросы и начинается с секции select

SELECT c_count, COUNT(*) AS custdist
FROM
( SELECT c_custkey, COUNT(o_orderkey) c_count
FROM customer
LEFT OUTER JOIN orders ON c_custkey = o_custkey
AND o_comment NOT LIKE '%unusual%packages%'
GROUP BY c_custkey
) AS c_orders
GROUP BY c_count
ORDER BY custdist DESC, c_count DESC;

SQL Pipe syntax начинается с выборки данных и применяет инструкции последовательно без подзапросов:

FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;

Подробней о мотивации, преимуществах и возможностях в paper от Google: https://research.google/pubs/sql-has-problems-we-can-fix-them-pipe-syntax-in-sql/

[SPARK-46908] Support star clause in WHERE clause

расширение функционала выборки всех колонок *

-- возврат всех колонок, кроме TA.c1 and TB.cb
SELECT * EXCEPT (c1, cb)  FROM VALUES(1, 2) AS TA(c1, c2), VALUES('a', 'b') AS TB(ca, cb);
+---+---+
| c2| ca|
+---+---+
|  2|  a|
+---+---+

-- получить первую не пустую колонку в  TA
SELECT coalesce(TA.*)  FROM VALUES(1, 2) AS TA(c1, c2), VALUES('a', 'b') AS TB(ca, cb);
+----------------+
|coalesce(c1, c2)|
+----------------+
|               1|
+----------------+


-- развернуть структуру в колонки
SELECT c1.* FROM VALUES(named_struct('x', 1, 'y', 2)) AS TA(c1);
+---+---+
|  x|  y|
+---+---+
|  1|  2|
+---+---+

-- проверка всех колонок в строке на соответствие
SELECT * FROM VALUES(1, 2, 3, 4, 5),(1, 2, NULL, 4, 5) AS TA(c1, c2, c3, c4, c) 
WHERE array(*) = array(1, 2, NULL, 4, 5);
+---+---+----+---+---+
| c1| c2|  c3| c4|  c|
+---+---+----+---+---+
|  1|  2|NULL|  4|  5|
+---+---+----+---+---+

-- наличие конкретного значения в любой колонке строки
SELECT * FROM VALUES(1, 2, 3, 4, 5),(1, 2, NULL, -4, 5) AS TA(c1, c2, c3, c4, c) 
WHERE 4 IN (*);
+---+---+---+---+---+
| c1| c2| c3| c4|  c|
+---+---+---+---+---+
|  1|  2|  3|  4|  5|
+---+---+---+---+---+

[SPARK-36680] Support dynamic table options via WITH OPTIONS syntax

как это выглядит в PySpark

spark.read.format("jdbc").option("fetchSize", 0).load()

Как это можно теперь использовать в SQL

SELECT * FROM jdbcTable WITH OPTIONS(fetchSize = 0)

Изменения в PySpark

[SPARK-46858] Upgrade Pandas to 2

Увеличена версия Pandas до 2

pdf = pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
sdf = spark.createDataFrame(pdf)
result_pdf = sdf.toPandas()
print(result_pdf)

[SPARK-49530] Introducing PySpark Plotting API

Отрисовка графиков используя pyspark.pandas

import pandas as pd
import numpy as np
import pyspark.pandas as ps

spark.conf.set("spark.sql.ansi.enabled", "false")
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))

psser = ps.Series(pser)
psser = psser.cummax()
psser.plot()

[SPARK-43797] Python User-defined Table Functions

Развитие начатого в 3.5 SPARK-43798 - возможность создания UDTF (пользовательская функция, возвращающая таблицу)

from pyspark.sql.functions import udtf

@udtf(returnType="word: string")
class WordSplitter:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word.strip(),)

spark.udtf.register("split_words", WordSplitter)

# Example: Using the UDTF in SQL.
spark.sql("SELECT * FROM split_words('hello world')").show()

+-----+
| word|
+-----+
|hello|
|world|
+-----+

from pyspark.sql.functions import lit
WordSplitter(lit('hello world')).show()

+-----+
| word|
+-----+
|hello|
|world|
+-----+

[SPARK-50075] DataFrame APIs for table-valued functions

Возможность генерации данных есть в SQL

SELECT * FROM range(5, 10);

Теперь есть аналогичный функционал tvf в Spark Dataframe

spark.tvf.range(10).show()

[SPARK-44076] SPIP: Python Data Source API

Появилась возможность создания источников данных в PySpark без Scala

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

# описание источника: его схемы и класса читателя
class FakeDataSource(DataSource):  
    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "id int, name string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

# читатель данных с генерацией фейковых 3 строк
class FakeDataSourceReader(DataSourceReader):
    def init(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        for r in range(3):
            yield tuple((r, f"row_{r}"))

# регистрируем дата сорс
spark.dataSource.register(FakeDataSource)

# читаем данные из датасорса
spark.read.format("fake").load().show()

+---+-----+
| id| name|
+---+-----+
|  0|row_0|
|  1|row_1|
|  2|row_2|
+---+-----+

[SPARK-51079] Support large variable types in pandas UDF, createDataFrame and toPandas with Arrow

Развитие SPARK-40307 в Spark 3.5 поддержка большего числа Arrow типов при работе PySpark и Pandas

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

SPARK-50130 Add scalar and exists DataFrame APIs

Развитие PySpark диалекта DF поддержкой корреляционных подзапросов.

Как это выглядит в SQL

select  from orders where exists (select  from sales where orders.id = sales.id)

и как это теперь можно написать в PySpark

from pyspark.sql import functions as sf

df_result = df_orders.where(
    df_sales.where(sf.col("o.id").outer() == sf.col("s.id")).exists()
)

df_result.show()

+---+----------+
| id|order_date|
+---+----------+
|  3|2023-06-03|
+---+----------+

Добавление новых типов данных

[SPARK-45827] Add VARIANT data type

Специализированный тип данных для хранения сложных структур.

В первую очередь используется для хранения JSON в распознанном виде и быстрой обратной конвертации в объект python или JSON:

spark.sql("""SELECT PARSE_JSON('{"a": 1}') variant_col""").first().variant_col.toPython()
{'a': 1}

spark.sql("""SELECT PARSE_JSON('{"a": 1}') variant_col""").first().variant_col.toJson()
'{"a":1}'

Работает в 8 раз быстрей, чем текстовое представление JSON.

Не все операции поддерживаются, выборка ключей по пути пока в разработке.

На текущий момент будет ошибка:

spark.sql("""select variant_col:a from (SELECT PARSE_JSON('{"a": 1}') variant_col)""").show()
# ParseException: 
# [PARSE_SYNTAX_ERROR] Syntax error at or near ':'. SQLSTATE: 42601 (line 1, pos 18)

[SPARK-44265] Built-in XML data source support

XML теперь поддерживается на том же уровне как JSON:

парсинг XML:

spark.sql("SELECT from_xml('<p><a>1</a><b>0.8</b></p>', 'a INT, b DOUBLE');").show()

+-----------------------------------+
|from_xml(<p><a>1</a><b>0.8</b></p>)|
+-----------------------------------+
|                           {1, 0.8}|
+-----------------------------------+

Обратная операция получения XML из объекта:

print(spark.sql("""SELECT to_xml(named_struct('a', 1, 'b', 2)) as xml""").first().xml)

<ROW>
    <a>1</a>
    <b>2</b>
</ROW>

Поиск значений в дереве XML:

spark.sql("""SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()')""").show()

+-----------------------------------------------------------------------+
|xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text())|
+-----------------------------------------------------------------------+
|                                                           [b1, b2, b3]|
+-----------------------------------------------------------------------+

XML можно использовать как формат для записи, так и для чтения:

df = spark.read.format("xml").option("rowTag", "row").load("/path/to/data.xml")
df.write.format("xml").option("rootTag", "root").option("rowTag", "row").save("/path/to/output.xml")

[SPARK-46830] String Collation support

в Spark строках всегда теперь присутствует неявно кодировка и используется в сортировках и хэшировании.

Пример сортировки строк с учетом положения букв в алфавите

# как было - бинарное представление строк
spark.sql("""select * FROM VALUES('а'),
               ('Я') AS strcol(binary_str) order by binary_str""").show()

+----------+
|binary_str|
+----------+
|         Я|
|         а|
+----------+

# строка с заданной русской кодировкой
spark.sql("""select * FROM VALUES('а' COLLATE RU),
               ('Я' COLLATE RU) AS strcol(ru_str) order by ru_str""").show()

+------+
|ru_str|
+------+
|     а|
|     Я|
+------+

Часть операций сравнения теперь работают на порядки быстрей, если указать кодировку.

Пример кодировки UTF8_LCASE - для хранения данных в нижнем регистре, дает возможность фильтровать данные без дополнительной функции приведения регистра:

spark.sql("""select * FROM VALUES('а' COLLATE UTF8_LCASE),
               ('Я' COLLATE UTF8_LCASE) AS strcol(ru_str) where ru_str='я'""").show()

+------+
|ru_str|
+------+
|     Я|
+------+

Что ускоряет такие выборки в 22 раза.

Заключение

Релиз Apache Spark 4 приносит значительные улучшения в части Spark SQL, что делает обработку данных более эффективной и удобной. Добавлены новые возможности процедурного языка, улучшенная поддержка ANSI SQL, введен новый синтакис Pipe SQL. Несмотря на это, на мой взгляд, встроенный процедурный диалект SQL пока имеет слишком мало функций, чтобы полностью заменить PySpark.

PySpark в свою очередь продолжил укреплять взаимосвязи с Pandas и Arrow. Появилась поддержка Variant и XML типов данных.

Реальный потенциал этих и других нововведений Spark 4 в скором времени увидим на новых проектах.

Комментарии (0)