При работе с распределенными базами данных, возникают задачи, которые ввиду технических ограничений сложно или невозможно решить с помощью всем привычного пакета Pandas на Python. Решением может стать использование распределенных вычислений Spark и его собственных DataFrame.

В этой статье рассмотрим практические вопросы использования Spark DataFrame. Коснемся таких аспектов работы как: создание DataFrame из различных источников данных, фильтрация, сортировка, создание новых столбцов и другое.

Стоит напомнить или отметить, что Spark поддерживает написание команд на трех языках программирования: Scala, Java и Python. В данной статье будет представлена реализации на языке Python. Мы не затрагиваем многих нюансов, с которыми вы можете столкнуться, поэтому наиболее развернутую информацию как всегда можно найти только в Гайде (https://spark.apache.org/docs/latest/sql-programming-guide.html )

Создание DataFrame

Spark позволяет создавать DataFrame различными способами:

1.      Из файла (JSON, CSV, ORC, Parquet и др.);

2.      Из RDD;

3.      Из таблицы на HDFS;

4.      Из pandas DF.

Рассмотрим на примерах создание DF перечисленными способами.

Создание DF из файла

Рассмотрим первый способ на примере чтение CSV файла.

Прежде чем осуществлять чтение файла необходимо создать схему, которая отражает типы данных значений. Для начала выполним импорт типов данных:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

Создадим схему:
schema = StructType(fields=[
    StructField("col_1", IntegerType()),
    StructField("col_2", StringType ())
])
Создадим DF:
df = spark.read.csv("/path/file_name", schema=schema, sep=";")

В случаях если есть уверенность, что файл не содержит в своих столбцах смешанных типов данных, можно задать условие, чтобы при записи DataFrame типы данных определялись самостоятельно, для этого достаточно установить параметр: inferSchema=True.

В случае, когда в таблице присутствует заголовок необходимо добавить: header=True.

Собирая все вместе получаем следующий запрос для создания DF:

df = spark.read.csv("/path/file_name", sep=";", inferSchema=True, header=True.)

Для предпросмотра DataFrame можно использовать следующие функции:

·       show(5, vertical=True, truncate=False)

·       take()

·       collect()

Посмотрим содержимое таблицы:

df.show()

Создание DataFrame из RDD и Pandas DataFrame

Создание из этих структур довольно простое и выполняется в одно действие:

df = spark.createDataFrame(rdd)

df = spark.createDataFrame(pandasDF)

Возможно отдельно задать параметр schema

Создание из таблицы HDFS

Создание Spark DF из таблиц HDFS возможно 2-мя способами, отличающихся формой написания (при этом скорость выполнения команды неизменна):

·       Pyspark запрос;

·       SQL запрос.

Приведем примеры написания запросов.

Pyspark API:

df = spark.table(‘schema_name.table_name’)

SQL API:
df = spark.sql(“””
		SELECT col_1,
			col_2
		FROM schema_name.table_name
”””)

Операции с DataFrame

Spark DataFrame имеет обширный (хоть и более скромный чем Pandas) функционал. Далее рассмотрим пример выполнения следующих операций:

·       выбор колонок,

·       фильтрация,

·       сортировка,

·       создание новых столбцов,

·       соединение таблиц,

·       сохранение результата.

 

Для выполнения предложенных команд необходимо осуществить импорт необходимых функций:

from pyspark.sql.functions import col, asc, desc, lenght

Выбор нужных колонок

Выберем из df столбцы с названием “col_1” и “col_2”:

df2 = df.select(col(“col_2”))

df2.show()

Переименование столбцов

Переименуем столбец ”col_2” на “new_col”:

df2 = df.select(col(“col_1”), col(“col_2”).alias(“new_col”))

df2.show()

Фильтрация

Фильтрацию можно выполнить, используя команды where или filter:

df2 = df.filter(col(“col_1”) > 5)

df2.show()

Также поддерживаются логические операторы: и (&), или (|), не (~) и др.

Сортировка

Осуществим сортировку по двум столбцам, по первому – по убыванию, по второму – по возрастанию:

df2 = df.orderBy(col("col_1").desc(), col("col_2").asc())

Создание новых столбцов

Создадим новый столбец, в котором будут хранится значения длины строки столбца “col_2”:

df2 = df.withColumn(“new_col”, length(“col_2”))

Если нам необходимо произвести сложные математические операции можно написать необходимую функцию и применить ее к RDD.

Например, сложим “col_1” с длиной “col_2”. Напишем функцию:

def summator(a_int, b_str):
return a_int + len(b_str)

rdd = df.rdd.map(lambda x: x[0], summator(x[0], x[1],))
df2 = rdd.toDF([“col_1”, ”summa”])

Соединение таблиц

Для соединения двух таблиц используется команда join. Соединим таблицы df и df2 (из прошлого примера), ключом для соединения – является поле “col_1”.

df_rez = df.join(df2, df.col_1 == df2.col_1, how=”inner”)

Сохранение результата

Если объем данных небольшой возможно преобразовать Spark DF в Pandas DataFrame, а затем уже сохранить в нужно формате:

pandas_df = df.toPandas()

Если объем данных большой, можно сохранить информацию либо в файл на HDFS, либо в таблицу в формате Parquet или ORC.

Сохранение в таблицу в формате ORC:

t_f.write\
.mode("overwrite")\
.format("orc")\
.saveAsTable(‘schema_name.table_name’)

Сохранение в файл в формате csv:
df.coalesce(1)\
.write.format("com.databricks.spark.csv")\
.mode("append")\
.option("header", "true")\
.option("delimiter", "~")\
.option("quoteMode", "false")\
.save("path")

Заключение

Мы разобрали основы работы со Spark DataFrame, научились создавать и выполнять несложные операции. В заключении отмечу, что все предложенные методы можно комбинировать с собой для сокращения промежуточных вычислений. Например:

df_final = df.select(col("col_1"), col("col_2").alias("new_col"))\
.join(df2.select(col("col_1"), col("summa")), df.col_1 == df2.col_1, how="inner").select(df.col_1, col("new_col"), df2.summa)

Комментарии (0)