Дата-инженеры, работающие с PySpark, часто сталкиваются с задачей создать логику обработки данных. Обычно речь идет о тестировании набора классов и функций. Современные платформы данных на основе Spark подчас содержат сотни, а то и тысячи разных модульных тестов, которые помогают командам по работе с данными сохранять целостную базу кода. В ежедневные рабочие задачи дата-инженера входит создание небольших датафреймов, которые используются в модульных тестах как входные и выходные значения.
Команда VK Cloud перевела статью о разных способах создания ad-hoc-датафреймов, содержащих синтетические данные. Такие подходы основываются на ad-hoc и in-place входных данных. В них не используются исходные данные из озера, такие как Parquet, Delta или CSV, или из других источников, например баз данных или потоков Kafka. Так что речь пойдет о создании датафреймов in-place без внешних зависимостей.
Как сделать так, чтобы от пользовательских датафреймов была польза?
Новый in-place-датафрейм может пригодиться в нескольких случаях:
- В качестве входных данных для модульных тестов используются синтетические данные.
- Синтетические данные и мини-датафреймы также могут пригодиться во время разработки в качестве валидаторов логики обработки.
- Воспроизведение багов в определенных частях PySpark сводится к использованию определенных функциональных возможностей.
Пример из жизни
Недавно во время обновления нашей платформы до Spark 3.4.0 мы обнаружили, что небольшая доля наших модульных тестов не пройдена. Во всех них в качестве входных данных использовались JSON-файлы, а сессия выполнялась в режиме PERMISSIVE. Что-то пошло не так с этим новым релизом OSS Spark. Databricks, в котором используется та же версия Spark, выполняет эти модульные тесты корректно. Кроме того, в предыдущих версиях OSS и Databricks Spark обрабатывал их без ошибок. Мы быстренько смастерили небольшой сниппет и создали тикет в Spark JIRA, чтобы показать, что в обработчике JSON возникла новая проблема:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import json
data = """[{"a": "incorrect", "b": "correct"}]"""
schema = StructType(
[
StructField("a", IntegerType(), True),
StructField("b", StringType(), True),
StructField("_corrupt_record", StringType(), True),
]
)
(
spark.read.option("mode", "PERMISSIVE")
.option("multiline", "true")
.schema(schema)
.json(spark.sparkContext.parallelize([data]))
.show(truncate=False)
)
# returns:
# java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
В результате основная команда быстро создала и включила в релиз фикс бага, которое войдет в версию Spark 3.4.2.
Разные подходы к созданию датафреймов
Давайте для начала определим вспомогательный метод для извлечения содержимого и схемы датафрейма, чтобы в каждом примере была одна и та же необходимая информация:
from pyspark.sql import DataFrame
def check_dataframe(df: DataFrame):
"""Helper for checking dataframe content and schema"""
print('""" output:')
df.show()
df.printSchema()
print('"""')
Подход № 1. Создаем датафрейм с помощью Spark SQL
Это мой любимый способ, все-таки сказывается опыт работы администратором баз данных. Синтаксис SQL довольно простой, обеспечивает точный контроль над схемой и типами датафрейма и позволяет генерировать как простые, так и сложные структуры данных:
1.1. Создаем простой датафрейс с помощью Spark SQL
df = spark.sql(
"""
select * from
values
(1, 'abc'),
(2, 'def') as (id, value)
"""
)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: integer (nullable = false)
|-- value: string (nullable = false)
"""
1.2. Создаем датафрейм с вложенными атрибутами с помощью Spark SQL
df = spark.sql(
"""
select
'a' as col1,
struct(1 as child1, current_date as child2) as col2,
array(1,2,3) as col3
"""
)
check_dataframe(df)
""" output:
+----+---------------+---------+
|col1| col2| col3|
+----+---------------+---------+
| a|{1, 2023-07-29}|[1, 2, 3]|
+----+---------------+---------+
root
|-- col1: string (nullable = false)
|-- col2: struct (nullable = false)
| |-- child1: integer (nullable = false)
| |-- child2: date (nullable = false)
|-- col3: array (nullable = false)
| |-- element: integer (containsNull = false)
"""
Подход № 2. Создаем датафрейм из строк CSV или JSON с помощью RDD
Иногда команде по обработке данных попадается информация в распространенном формате вроде CSV или JSON, однако содержимое доступно в виде переменных. В этом случае можно генерировать RDD, а потом прочитать его с помощью подходящего обработчика формата.
2.1. Загружаем JSON из переменной строки с помощью RDD
json_string = """
[
{"id": 1, "value": "abc"},
{"id": 2, "value": "def"}
]"""
rdd_data = spark.sparkContext.parallelize([json_string])
df = spark.read.json(rdd_data)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
02.2. Подключаем CSV из переменной строки с помощью RDD
csv_string = """
id,value
1,abc
2,def
"""
rdd_data = spark.sparkContext.parallelize(csv_string.splitlines())
df = spark.read.options(header=True, inferSchema=True).csv(rdd_data)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true
"""
Подход № 3. Создаем датафрейм с помощью Pandas
Еще один вариант — воспользоваться Pandas. Немного напоминает подход с RDD, но будет удобнее для специалистов, которые активно используют этот пакет.
3.1. Подключаем JSON из переменной типа string с помощью Pandas
import pandas as pd
import io
json_string = """
[
{"id": 1, "value": "abc"},
{"id": 2, "value": "def"}
]
"""
pd_data = pd.read_json(io.StringIO(json_string))
df = spark.createDataFrame(pd_data)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
3.2. Подключаем CSV из переменной строки с помощью Pandas
import pandas as pd
import io
csv_string = """
id,value
1,abc
2,def
3,xyz
"""
data_io = io.StringIO(csv_string)
pd_df = pd.read_csv(data_io, sep=",")
df = spark.createDataFrame(pd_df)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
| 3| xyz|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
3.3. Создаем датафрейм PySpark из датафрейма Pandas
import pandas as pd
pd_df = pd.DataFrame(
{
"id": [1, 2],
"value": ["abc", "def"]
}
)
df = spark.createDataFrame(pd_df)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
Подход № 4. Создаем датафрейм из списка объектов Row
В этом подходе задействован класс
pyspark.sql.row
. Он используется в PySpark по умолчанию, представляя содержимое датафрейма. Подробнее о нем можно прочитать в моей статье «Скрипты для датафреймов PySpark».4.1. Создаем датафрейм из коллекции объектов Row
По умолчанию приводимые типы в PySpark автоматически выводятся из значений атрибутов объектов. Но это не всегда работает как надо. В примере ниже ID столбца имеет тип long, но указанные значения — это целые числа:
from pyspark.sql import Row
rows_data = [Row(id=1, value="abc"), Row(id=2, value="def")]
df = spark.createDataFrame(rows_data)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
4.2. Создаем датафрейм из коллекции объектов Row и явной схемы
В этом примере
id
столбца явным образом преобразуется в тип int
:from pyspark.sql import Row
rows = [Row(id=1, value="abc"), Row(id=2, value="def")]
schema = "id int, value string"
df = spark.createDataFrame(rows, schema)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
"""
Подход № 5. Создаем датафрейм с помощью типов данных Python
Стоит упомянуть и способ с использованием Python, когда датасет описывается с помощью структур данных Python, таких как словари, кортежи и списки.
5.1. Создаем датафрейм из списка словарей
data_dicts = [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}]
df = spark.createDataFrame(data_dicts)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: long (nullable = true)
|-- value: string (nullable = true)
"""
5.2. Создаем датафрейм из списка кортежей
Этот и следующий пример — тот самый случай, когда с помощью определения явной схемы задают правильный тип и имя столбца.
data_tuples = [(1, "abc"), (2, "def")]
schema = "id int, value string"
df = spark.createDataFrame(data_tuples, schema)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
"""
5.3. Создаем датафрейм из списка списков
data_lists = [[1, "abc"], [2, "def"]]
schema = "id int, value string"
df = spark.createDataFrame(data_lists, schema)
check_dataframe(df)
""" output:
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
"""
Сейчас наша команда готовит к релизу облачный сервис Spark в K8s, подписывайтесь на телеграм-канал «Данные на стероидах», чтобы не пропустить анонс!