Дата-инженеры, работающие с PySpark, часто сталкиваются с задачей создать логику обработки данных. Обычно речь идет о тестировании набора классов и функций. Современные платформы данных на основе Spark подчас содержат сотни, а то и тысячи разных модульных тестов, которые помогают командам по работе с данными сохранять целостную базу кода. В ежедневные рабочие задачи дата-инженера входит создание небольших датафреймов, которые используются в модульных тестах как входные и выходные значения.

Команда VK Cloud перевела статью о разных способах создания ad-hoc-датафреймов, содержащих синтетические данные. Такие подходы основываются на ad-hoc и in-place входных данных. В них не используются исходные данные из озера, такие как Parquet, Delta или CSV, или из других источников, например баз данных или потоков Kafka. Так что речь пойдет о создании датафреймов in-place без внешних зависимостей.

Как сделать так, чтобы от пользовательских датафреймов была польза?


Новый in-place-датафрейм может пригодиться в нескольких случаях:

  1. В качестве входных данных для модульных тестов используются синтетические данные.
  2. Синтетические данные и мини-датафреймы также могут пригодиться во время разработки в качестве валидаторов логики обработки.
  3. Воспроизведение багов в определенных частях PySpark сводится к использованию определенных функциональных возможностей.

Пример из жизни


Недавно во время обновления нашей платформы до Spark 3.4.0 мы обнаружили, что небольшая доля наших модульных тестов не пройдена. Во всех них в качестве входных данных использовались JSON-файлы, а сессия выполнялась в режиме PERMISSIVE. Что-то пошло не так с этим новым релизом OSS Spark. Databricks, в котором используется та же версия Spark, выполняет эти модульные тесты корректно. Кроме того, в предыдущих версиях OSS и Databricks Spark обрабатывал их без ошибок. Мы быстренько смастерили небольшой сниппет и создали тикет в Spark JIRA, чтобы показать, что в обработчике JSON возникла новая проблема:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import json


data = """[{"a": "incorrect", "b": "correct"}]"""
schema = StructType(
    [
        StructField("a", IntegerType(), True),
        StructField("b", StringType(), True),
        StructField("_corrupt_record", StringType(), True),
    ]
)

(
    spark.read.option("mode", "PERMISSIVE")
    .option("multiline", "true")
    .schema(schema)
    .json(spark.sparkContext.parallelize([data]))
    .show(truncate=False)
)

# returns:
# java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1

В результате основная команда быстро создала и включила в релиз фикс бага, которое войдет в версию  Spark 3.4.2.

Разные подходы к созданию датафреймов


Давайте для начала определим вспомогательный метод для извлечения содержимого и схемы датафрейма, чтобы в каждом примере была одна и та же необходимая информация:

from pyspark.sql import DataFrame

def check_dataframe(df: DataFrame):
    """Helper for checking dataframe content and schema"""
    
    print('""" output:')
    df.show()
    df.printSchema()
    print('"""')

Подход № 1. Создаем датафрейм с помощью Spark SQL


Это мой любимый способ, все-таки сказывается опыт работы администратором баз данных. Синтаксис SQL довольно простой, обеспечивает точный контроль над схемой и типами датафрейма и позволяет генерировать как простые, так и сложные структуры данных:

1.1. Создаем простой датафрейс с помощью Spark SQL


df = spark.sql(
    """
    select * from 
    values 
        (1, 'abc'), 
        (2, 'def') as (id, value)
    """
)
check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: integer (nullable = false)
 |-- value: string (nullable = false)
"""

1.2. Создаем датафрейм с вложенными атрибутами с помощью Spark SQL


df = spark.sql(
    """
    select 
      'a' as col1,        
      struct(1 as child1, current_date as child2) as col2,
      array(1,2,3) as col3
    """
)
check_dataframe(df)

""" output:
+----+---------------+---------+
|col1|           col2|     col3|
+----+---------------+---------+
|   a|{1, 2023-07-29}|[1, 2, 3]|
+----+---------------+---------+

root
 |-- col1: string (nullable = false)
 |-- col2: struct (nullable = false)
 |    |-- child1: integer (nullable = false)
 |    |-- child2: date (nullable = false)
 |-- col3: array (nullable = false)
 |    |-- element: integer (containsNull = false)
"""

Подход № 2. Создаем датафрейм из строк CSV или JSON с помощью RDD


Иногда команде по обработке данных попадается информация в распространенном формате вроде CSV или JSON, однако содержимое доступно в виде переменных. В этом случае можно генерировать RDD, а потом прочитать его с помощью подходящего обработчика формата.

2.1. Загружаем JSON из переменной строки с помощью RDD


json_string = """
[
    {"id": 1, "value": "abc"},
    {"id": 2, "value": "def"}
]"""

rdd_data = spark.sparkContext.parallelize([json_string])
df = spark.read.json(rdd_data)

check_dataframe(df)


""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

02.2. Подключаем CSV из переменной строки с помощью RDD


csv_string = """
id,value
1,abc
2,def
"""

rdd_data = spark.sparkContext.parallelize(csv_string.splitlines())
df = spark.read.options(header=True, inferSchema=True).csv(rdd_data)

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: integer (nullable = true)
 |-- value: string (nullable = true
"""

Подход № 3. Создаем датафрейм с помощью Pandas


Еще один вариант — воспользоваться Pandas. Немного напоминает подход с RDD, но будет удобнее для специалистов, которые активно используют этот пакет.

3.1. Подключаем JSON из переменной типа string с помощью Pandas


import pandas as pd
import io

json_string = """
[
    {"id": 1, "value": "abc"},
    {"id": 2, "value": "def"}
]
"""

pd_data = pd.read_json(io.StringIO(json_string))
df = spark.createDataFrame(pd_data)

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

3.2. Подключаем CSV из переменной строки с помощью Pandas


import pandas as pd
import io

csv_string = """
id,value
1,abc
2,def
3,xyz
"""

data_io = io.StringIO(csv_string)
pd_df = pd.read_csv(data_io, sep=",")
df = spark.createDataFrame(pd_df)

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
|  3|  xyz|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

3.3. Создаем датафрейм PySpark из датафрейма Pandas


import pandas as pd

pd_df = pd.DataFrame(
    {
        "id": [1, 2], 
        "value": ["abc", "def"]
    }
)
df = spark.createDataFrame(pd_df)   

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

Подход № 4. Создаем датафрейм из списка объектов Row


В этом подходе задействован класс pyspark.sql.row. Он используется в PySpark по умолчанию, представляя содержимое датафрейма. Подробнее о нем можно прочитать в моей статье «Скрипты для датафреймов PySpark».

4.1. Создаем датафрейм из коллекции объектов Row


По умолчанию приводимые типы в PySpark автоматически выводятся из значений атрибутов объектов. Но это не всегда работает как надо. В примере ниже ID столбца имеет тип long, но указанные значения — это целые числа:

from pyspark.sql import Row

rows_data = [Row(id=1, value="abc"), Row(id=2, value="def")]
df = spark.createDataFrame(rows_data)

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

4.2. Создаем датафрейм из коллекции объектов Row и явной схемы


В этом примере id столбца явным образом преобразуется в тип int:

from pyspark.sql import Row

rows = [Row(id=1, value="abc"), Row(id=2, value="def")]
schema = "id int, value string"
df = spark.createDataFrame(rows, schema)

check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: integer (nullable = true)
 |-- value: string (nullable = true)
"""

Подход № 5. Создаем датафрейм с помощью типов данных Python


Стоит упомянуть и способ с использованием Python, когда датасет описывается с помощью структур данных Python, таких как словари, кортежи и списки.

5.1. Создаем датафрейм из списка словарей


data_dicts = [{"id": 1, "value": "abc"}, {"id": 2, "value": "def"}]
df = spark.createDataFrame(data_dicts)
check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)
"""

5.2. Создаем датафрейм из списка кортежей


Этот и следующий пример — тот самый случай, когда с помощью определения явной схемы задают правильный тип и имя столбца.

data_tuples = [(1, "abc"), (2, "def")]
schema = "id int, value string"
df = spark.createDataFrame(data_tuples, schema)
check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: integer (nullable = true)
 |-- value: string (nullable = true)
"""

5.3. Создаем датафрейм из списка списков


data_lists = [[1, "abc"], [2, "def"]]
schema = "id int, value string"
df = spark.createDataFrame(data_lists, schema)
check_dataframe(df)

""" output:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

root
 |-- id: integer (nullable = true)
 |-- value: string (nullable = true)
"""
Сейчас наша команда готовит к релизу облачный сервис Spark в K8s, подписывайтесь на телеграм-канал «Данные на стероидах», чтобы не пропустить анонс!

Комментарии (0)