Databricks — это аналитическая платформа для облачных вычислений, работы с большими данными и машинного обучения. Компания разрабатывает data lake и работает с фреймворком Apache Spark. Приводим перевод статьи Databricks о нововведениях Apache Spark 3.4, который вошел в релиз Databricks Runtime 13.0.

В Apache Spark 3.4 представили ряд нововведений:

  • Подключение к Spark из любого приложения и в любом месте с помощью Spark Connect.

  • Повышение производительности за счет новых функций SQL: значения DEFAULT столбцов для нескольких форматов таблиц, метка времени без временной зоны, UNPIVOT, а также упрощение запросов с помощью ссылок на alias столбцов.

  • Улучшения для Python-разработчиков. Новый фреймворк сообщений об ошибках PySpark и Spark executor memory profiling.

  • Улучшена потоковая обработка данных для повышения производительности, снижения стоимости за счет уменьшения количества запросов и отсутствия необходимости в промежуточном хранилище, поддержки произвольных операций с состоянием для пользовательской логики, а также встроенной поддержки чтения и записи записей в формате Protobuf.

  • Пользователи PySpark могут проводить распределенное обучение с помощью PyTorch на кластерах Spark.

Spark Connect

В Apache Spark 3.4 Spark Connect представляет собой разделенную клиент-серверную архитектуру, которая позволяет удаленно подключаться к кластерам Spark из любого приложения, работающего в любом месте. Такое разделение клиента и сервера позволяет современным приложениям для работы с данными, IDE, блокнотам и языкам программирования получать интерактивный доступ к Spark. Spark Connect использует возможности Spark DataFrame API (SPARK-39375).

Благодаря Spark Connect клиентские приложения влияют только на свое окружение, поскольку могут работать вне кластера Spark. Устраняются конфликты зависимостей от драйвера Spark. Разработчикам не нужно вносить изменения в свои клиентские приложения при обновлении Spark, можно выполнять пошаговую отладку на стороне клиента непосредственно в своей среде разработки.

Spark Connect лежит в основе готовящегося к выпуску Databricks Connect.

Spark Connect обеспечивает удаленное подключение к Spark из любого клиентского приложения.Распределенное обучение ML-моделей PyTorch. Источник
Spark Connect обеспечивает удаленное подключение к Spark из любого клиентского приложения.Распределенное обучение ML-моделей PyTorch. Источник

В Apache Spark 3.4 в PySpark добавлен модуль TorchDistributor, позволяющий проводить распределенное обучение с помощью PyTorch на кластерах Spark. Модуль инициализирует среду и каналы связи между рабочими узлами и использует команду CLI torch.distributed.run для выполнения распределенного обучения на рабочих узлах. Модуль поддерживает распределение заданий обучения как на одноузловых многопроцессорных, так и на многоузловых GPU-кластерах. Ниже приведен примерный фрагмент кода для его использования:


from pyspark.ml.torch.distributor import TorchDistributor

def train(learning_rate, use_gpu):
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader

  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend)
  device = int(os.environ["LOCAL_RANK"]) if use_gpu  else "cpu"
  model = DDP(createModel(), **kwargs)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler)

  output = train(model, loader, learning_rate)
  dist.cleanup()
  return output

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)

Дополнительные сведения и примеры см. на странице

Повышение производительности

Поддержка значений DEFAULT для столбцов таблиц (SPARK-38334). SQL-запросы теперь поддерживают указание значений по умолчанию для столбцов таблиц в форматах CSV, JSON, ORC, Parquet. Данная функциональность работает как при создании таблицы, так и после. Последующие команды INSERT, UPDATE, DELETE и MERGE могут в дальнейшем ссылаться на значение по умолчанию любого столбца, используя явное ключевое слово DEFAULT. Или, если любое задание INSERT имеет явный список из меньшего числа столбцов, чем целевая таблица, то для оставшихся столбцов будут подставлены соответствующие значения по умолчанию (или NULL, если значение по умолчанию не указано).

Например, установка значения DEFAULT для столбца при создании новой таблицы:


CREATE TABLE t (first INT, second DATE DEFAULT CURRENT_DATE())
  USING PARQUET;
INSERT INTO t VALUES
  (0, DEFAULT), (1, DEFAULT), (2, DATE'2020-12-31');
SELECT first, second FROM t;

(0, 2023-03-28)
(1, 2023-03-28)
(2, 2020-12-31)

Также можно использовать значения столбцов по умолчанию в операторах UPDATE, DELETE и MERGE, как показано в этих примерах:


UPDATE t SET first = 99 WHERE second = DEFAULT;

DELETE FROM t WHERE second = DEFAULT;

MERGE INTO t FROM VALUES (42, DATE'1999-01-01') AS S(c1, c2)
USING first = c1
WHEN NOT MATCHED THEN INSERT (first, second) = (c1, DEFAULT)
WHEN MATCHED THEN UPDATE SET (second = DEFAULT);

Новый тип данных TIMESTAMP WITHOUT TIMEZONE (SPARK-35662). В Apache Spark 3.4 добавлен новый тип данных для представления значений временных меток без указания часового пояса. До сих пор значения, выраженные с помощью существующего в Spark типа данных TIMESTAMP, встраиваемые в SQL-запросы или передаваемые через JDBC, предполагались в локальном часовом поясе сессии и приводились к UTC перед обработкой. Хотя в некоторых случаях, например при работе с календарями, такая семантика желательна. Во многих других случаях пользователи предпочитают выражать значения временных меток независимо от часовых поясов, например в файлах журналов. С этой целью в Spark появился новый тип данных TIMESTAMP_NTZ.

Например:


CREATE TABLE ts (c1 TIMESTAMP_NTZ) USING PARQUET;
INSERT INTO ts VALUES
  (TIMESTAMP_NTZ'2016-01-01 10:11:12.123456');
INSERT INTO ts VALUES
  (NULL);
SELECT c1 FROM ts;

(2016-01-01 10:11:12.123456)
(NULL)

Ссылки на alias латеральных столбцов (SPARK-27561). В Apache Spark 3.4 появилась возможность использовать боковые ссылки на столбцы в списках SQL SELECT для перехода к предыдущим элементам. Эта возможность значительно упрощает составление запросов, часто заменяя необходимость написания сложных подзапросов и обычных табличных выражений.

Например:


CREATE TABLE t (salary INT, bonus INT, name STRING)
  USING PARQUET;
INSERT INTO t VALUES (10000, 1000, 'amy');
INSERT INTO t VALUES (20000, 500, 'alice');
SELECT salary * 2 AS new_salary, new_salary + bonus
  FROM t WHERE name = 'amy';

(20000, 21000)

Dataset.to(StructType) (SPARK-39625). В Apache Spark 3.4 появился новый API под названием Dataset.to(StructType) для преобразования всего исходного фрейма данных в указанную схему. Его поведение аналогично вставке таблицы, когда входной запрос корректируется в соответствии со схемой таблицы, но расширено для работы и с внутренними полями. Сюда входят:

  • изменение порядка столбцов и внутренних полей в соответствии с заданной схемой;

  • вытеснение столбцов и внутренних полей, не требуемых указанной схемой;

  • Приведение столбцов и внутренних полей в соответствие с ожидаемыми типами данных.

Например:


val innerFields = new StructType()
  .add("J", StringType).add("I", StringType)
val schema = new StructType()
  .add("struct", innerFields, nullable = false)
val df = Seq("a" -> "b").toDF("i", "j")
  .select(struct($"i", $"j").as("struct")).to(schema)
assert(df.schema == schema)
val result = df.collect()

("b", "a")

Параметризованные SQL-запросы (SPARK-41271, SPARK-42702). Apache Spark 3.4 теперь поддерживает возможность построения параметризованных SQL-запросов. Это делает запросы более многоразовыми и повышает безопасность, предотвращая атаки с использованием SQL-инъекций. API SparkSession теперь расширен переопределением метода sql, который принимает карту, где ключами являются имена параметров, а значениями — литералы Scala/Java:


def sql(sqlText: String, args: Map[String, Any]): DataFrame

Благодаря этому расширению текст SQL теперь может включать именованные параметры в любые позиции, где допускается использование констант, таких как литеральные значения.

Приведем пример параметризации SQL-запроса таким образом:


spark.sql(
    sqlText =
      "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows",
    args = Map(
      "startDate" -> LocalDate.of(2022, 12, 1),
      "maxRows" -> 100))

Операция UNPIVOT / MELT (SPARK-39876, SPARK-38864). До версии 3.4 в Dataset API Apache Spark был доступен метод PIVOT, но не его обратная операция MELT. Теперь она включена, что дает возможность разворачивать DataFrame из широкого формата, сгенерированного PIVOT, в исходный длинный формат, по желанию оставляя столбцы-идентификаторы. Эта операция является обратной по отношению к groupBy(...).pivot(...).agg(...), за исключением агрегации, которая не может быть отменена. Эта операция полезна для приведения DataFrame к формату, в котором некоторые столбцы являются столбцами-идентификаторами, а все остальные столбцы ("значения") "разворачиваются" в строки, оставляя только два неидентификаторных столбца, названных так, как указано.

Пример:


val df = Seq((1, 11, 12L), (2, 21, 22L))
  .toDF("id", "int", "long")
df.show()
// output:
// +---+---+----+
// | id|int|long|
// +---+---+----+
// |  1| 11|  12|
// |  2| 21|  22|
// +---+---+----+

df.unpivot(
  Array($"id"),
  Array($"int", $"long"),
  "variable",
  "value")
  .show()
// output:
// +---+--------+-----+
// | id|variable|value|*
// +---+--------+-----+
// |  1|     int|   11|
// |  1|    long|   12|
// |  2|     int|   21|
// |  2|    long|   22|
// +---+--------+-----+

Предложение OFFSET (SPARK-28330, SPARK-39159). Теперь в Apache Spark 3.4 в SQL-запросах можно использовать предложение OFFSET. До этой версии можно было составлять запросы и ограничивать количество возвращаемых строк с помощью предложения LIMIT. Теперь вы можете не только сделать это, но и отбросить первые N строк с помощью предложения OFFSET! Apache Spark создаст и выполнит эффективный план запроса, чтобы минимизировать объем работ, необходимых для этой операции. Эта операция обычно используется для пагинации, но может применяться и в других целях.


CREATE TABLE t (first INT, second DATE DEFAULT CURRENT_DATE())
  USING PARQUET;
INSERT INTO t VALUES
  (0, DEFAULT), (1, DEFAULT), (2, DATE'2020-12-31');
SELECT first, second FROM t ORDER BY first LIMIT 1 OFFSET 1;

(1, 2023-03-28)

Табличные функции-генераторы значений в предложении FROM (SPARK-41594). Начиная с 2021 года в стандарте SQL в разделе ISO/IEC 19075-7:2021 - Part 7: Polymorphic table functions - приведен синтаксис вызова таблично-значимых функций. Apache Spark 3.4 теперь поддерживает этот синтаксис для упрощения запросов и преобразования коллекций данных стандартными способами. Существующие и новые встроенные таблично-знаковые функции поддерживают этот синтаксис.

Приведем простой пример:


SELECT * FROM EXPLODE(ARRAY(1, 2))

(1)
(2)

Официальная поддержка экземпляров NumPy (SPARK-39405). Экземпляры NumPy теперь официально поддерживаются в PySpark, поэтому вы можете создавать фреймы данных (spark.createDataFrame) с экземплярами NumPy и предоставлять их в качестве входных данных в SQL-выражениях и даже в ML.


spark.createDataFrame(np.array([[1, 2], [3, 4]])).show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

Улучшения для разработчиков

Ужесточение использования SQLSTATE для классов ошибок (SPARK-41994). В индустрии систем управления базами данных стало стандартом представлять состояния возврата SQL-запросов и команд с помощью пятибайтового кода, известного как SQLSTATE. Таким образом, несколько клиентов и серверов могут стандартизировать способы взаимодействия друг с другом и упростить их реализацию. Это особенно актуально для SQL-запросов и команд, передаваемых по соединениям JDBC и ODBC. Apache Spark 3.4 приводит значительное большинство случаев ошибок в соответствие с этим стандартом, обновляя их для включения значений SQLSTATE, соответствующих ожидаемым в сообществе. Например, значение SQLSTATE 22003 означает выход числового значения за пределы диапазона, а 22012 — деление на ноль.

Улучшены сообщения об ошибках (SPARK-41597, SPARK-37935). Больше исключений Spark было перенесено на новый фреймворк ошибок (SPARK-33539) с улучшенным качеством сообщений об ошибках. Кроме того, исключения PySpark теперь используют новый фреймворк и имеют классификацию классов и кодов ошибок, что позволяет пользователям определять желаемое поведение для конкретных случаев ошибок при возникновении исключений.

Пример:


from pyspark.errors import PySparkTypeError

df = spark.range(1)

try:
    df.id.substr(df.id, 10)
except PySparkTypeError as e:
    if e.getErrorClass() == "NOT_SAME_TYPE":
        # Error handling
        ...

Memory profiler для пользовательских функций PySpark (SPARK-40281). Memory profiler для пользовательских функций PySpark изначально не поддерживал Spark executors. Память, как один из ключевых факторов производительности программы, отсутствовала в профилировании PySpark. Программы PySpark, работающие на драйвере Spark, могут быть легко профилированы другими профилировщиками, как и любой процесс Python, но не было простого способа профилировать память на Spark executors. Теперь PySpark включает профилировщик памяти, и пользователи могут построчно профилировать свои UDF и проверять потребление памяти.

Пример:


from pyspark.sql.functions import *

@udf("int")
def f(x):
   return x + 1

_ = spark.range(2).select(f('id')).collect()
spark.sparkContext.show_profiles()

============================================================
Profile of UDF<id=11>
============================================================
Filename: <command-1010423834128581>

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     3    116.9 MiB    116.9 MiB           2   @udf("int")
     4                                         def f(x):
     5    116.9 MiB      0.0 MiB           2       return x + 1

Улучшения в потоковой передаче данных

Проект Lightspeed: Более быстрая и простая обработка потоков с помощью Apache Spark внёс дополнительные улучшения в Spark 3.4:

Управление смещениями (Offset Management). Профилирование рабочих нагрузок клиентов и эксперименты с производительностью показывают, что операции управления смещениями могут занимать до 30-50% времени выполнения некоторых конвейеров. Если сделать эти операции асинхронными и выполняемыми с настраиваемой периодичностью, то время их выполнения можно значительно увеличить.

Поддержка нескольких операций с состоянием. Теперь пользователи могут выполнять операции с состоянием (агрегация, дедупликация, объединение потоков и т.д.) несколько раз в одном и том же запросе, включая цепочечные агрегации временных окон. Благодаря этому пользователям больше не нужно создавать несколько потоковых запросов с промежуточным хранением данных между ними, что влечет за собой дополнительные расходы на инфраструктуру и обслуживание, а также не отличается высокой производительностью. Обратите внимание, что это работает только в режиме append.

Обработка произвольных состояний в Python. До версии Spark 3.4 PySpark не поддерживал обработку произвольных состояний, что вынуждало пользователей использовать Java/Scala API, если им требовалось выразить сложную и пользовательскую логику обработки состояний. Начиная с Apache Spark 3.4, пользователи могут напрямую выражать сложные функции с состоянием в PySpark. Более подробную информацию можно найти в статье блога Python "Произвольная обработка с сохранением состояния в структурированной потоковой передаче".

Поддержка Protobuf. Нативная поддержка Protobuf была очень востребована, особенно в потоковых приложениях. В Apache Spark 3.4 пользователи теперь могут читать и делать записи в формате Protobuf с помощью встроенных функций from_protobuf() и to_protobuf().

Другие улучшения в Apache Spark 3.4

Помимо появления новых возможностей, в последнем выпуске Spark особое внимание уделяется удобству использования, стабильности и доработке, в результате чего было решено около 2600 проблем. Вклад в это достижение внесли более 270 разработчиков, как частных лиц, так и компаний, таких как Databricks, LinkedIn, eBay, Baidu, Apple, Bloomberg, Microsoft, Amazon, Google и многие другие. В этой стетье основное внимание уделено заметным улучшениям в области SQL, Python и потоковой передачи данных в Spark 3.4, однако в этой версии есть и другие усовершенствования, которые здесь не рассматриваются. Подробнее об этих дополнительных возможностях можно узнать из примечаний к выпуску, включая общедоступность объединений с фильтрами bloom, масштабируемый бэкенд Spark UI, улучшенное покрытие API pandas и многое другое.


Освойте Apache Spark на новом курсе Слёрма Spark-инженер. Это продвинутый курс об инструменте Apache Spark. Он подойдёт тем, кто:

  • проектирует и создает пайплайны поставки данных;

  • делает бэкенд на проекте с большими данными;

  • проектирует архитектуру проекта с большими данными;

  • занимается эксплуатацией системы, в которой работают дата-саентисты;

  • является дата-инженером, но хочет повысить квалификацию.

В курсе будет блок по работе со Scala, Python. Вы получите навыки по работе с данными (сбор, загрузка, агрегация и т.д.), научитесь работать с облаками, кластерами, хранилищами и брокерами, узнаете архитектуру Spark. В конце курса — финальный проект с собственным кодом и данными.

Подробная программа и описание курса смотрите на нашем сайте.

Комментарии (0)