![](https://habrastorage.org/getpro/habr/upload_files/52d/c34/6e7/52dc346e7cb0291595c2076db66b2e64.png)
Благодаря недавнему релизу spark3.2.0 у нас появилась возможность масштабировать данные с помощью pandas.
![](https://habrastorage.org/getpro/habr/upload_files/e5d/eb0/5d4/e5deb05d498f292d29a26794f13437cb.jpeg)
1. Введение
13 октября 2021 г. команда Apache Spark зарелизила spark3.2.0. На этот раз в Spark, помимо все прочего, был добавлен API Pandas. Pandas — мощный и хорошо известный среди дата-сайентистов пакет. Однако у Pandas есть свои ограничения при работе с большими объемами данных, потому что он обрабатывает данные на одной машине. Несколько лет назад databricks выпустили библиотеку ‘Koalas’, чтобы решить эту проблему.
Добавление API Pandas в spark3.2.0 избавляет нас от необходимости использовать сторонние библиотеки. Пользователи Pandas теперь могут не отказывать себе в удовольствии использовать Pandas и масштабировать процессы до многоузловых кластеров Spark.
2. Цель
В этой статье говорится непосредственно о способах использования API Pandas в Spark:
Чтение данных в виде датафреймов pandas-spark;
Чтение данных в виде датафреймов spark и преобразование в датафреймы pandas-spark;
Создание датафреймов pandas-spark;
Применение SQL-запросов непосредственно к датафреймам pandas-spark;
Построение графиков на основе датафреймов pandas-spark;
Переход от koalas к API pandas в Spark.
3. Данные
CSV-файл и Jupyter Notebook, упомянутые в этой статье, можно найти на моей странице GitHub. Наборы данных там небольшие, однако проиллюстрированные здесь подходы могут быть применимы в больших наборах.
4. Требуется установка
Прежде чем продолжить, сначала скачайте spark3.2.0 (установочный файл можно найти здесь) и правильно настройте PySpark. Вам также понадобятся библиотеки pyarrow и plotly, которые можно установить через интерфейс jupyter notebook, как показано ниже:
pyarrow (!conda install -c conda-forge — yes pyarrow)
plotly (!conda install — yes plotly)
Прекрасно! Если ваш PySpark готов к труду и обороне, то давайте перейдем к следующему разделу.
5. Импорт библиотек и запуск сессий Spark
Теперь начнем импортировать PySpark и запустим сессию с помощью блока кода, приведенного ниже.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
Spark info сообщает нам, что используется версия 3.2.0.
![Изображение 1. Spark info. Изображение 1. Spark info.](https://habrastorage.org/getpro/habr/upload_files/061/719/de1/061719de119201b81b1147d529e221d8.png)
Также будет не лишним проверить версию python и pyspark, как показано ниже. В моем случае, я использовал Spark версии 3.2.0 и python версии 3.8.8.
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
![Изображение 2: Используемые версии. Изображение 2: Используемые версии.](https://habrastorage.org/getpro/habr/upload_files/6d3/696/e65/6d3696e653a1aaabe1b38e6317fe6661.png)
Хорошо! Теперь с помощью pyspark.pandas
импортируем функцию read_csv
для чтения данных CSV файла в виде датафрейма pandas-spark.
Если появится варнинг, как показано на изображении 3, то можно перед запуском pyspark.pandas import read_csv
установить для переменной среды (т.е. PYARROW_IGNORE_TIMEZON
) значение 1.
from pyspark.pandas import read_csv
![Изображение 3: Варнинг при импорте pyspark.pandas. Изображение 3: Варнинг при импорте pyspark.pandas.](https://habrastorage.org/getpro/habr/upload_files/c39/d2f/58f/c39d2f58f73df74fcd07d296766abcc3.png)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv
# Чтобы избавиться от сообщения об ошибке, установите значение для переменной среды,как показано ниже
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv
6.1 Чтение данных из csv-файла в виде датафрейма pandas-spark
Для того, чтобы продемонстрировать различные варианты использования API pandas spark, мы воспользуемся файлом ‘example_csv.csv’. Функция read_csv возвращает датафрейм pandas-spark (назовите его: psdf).
# Читаем в качестве датафрейма pandas-spark
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
![Изображение 4: Датафрейм Pandas-Spark. Изображение 4: Датафрейм Pandas-Spark.](https://habrastorage.org/getpro/habr/upload_files/192/093/2ce/1920932ce6d6f8b8a9000d548ae4ea15.png)
Замечательно! Мы только что создали с вами датафрейм pandas-spark и теперь можем перейти к использованию функций pandas для выполнения последующих задач. Например, psdf.head(2)
и psdf.shape
можно использовать для получения двух верхних строк и размерности данных соответственно. В отличие от стандартного датафрейма pandas в python, здесь мы располагаем очень крутой возможностью распараллеливания, что является неоспоримым преимуществом.
# получаем тип данных
# получаем размерность данных
# получаем имена столбцов данных
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
![Изображение 5. Использование функций pandas на датафремах pandas-spark. Изображение 5. Использование функций pandas на датафремах pandas-spark.](https://habrastorage.org/getpro/habr/upload_files/c04/d75/4d5/c04d754d50323142f4f4faf07c2e2ae8.png)
Более того, если вы хотите преобразовать датафрейм pandas-spark в датафрейм spark, это можно осуществить с помощью функции to_spark()
. На выходе мы получим датафрейм spark (назовите его: sdf), и теперь можем использовать все функции pyspark на этом датафрейме. Например, sdf.show(5)
и sdf.printSchema()
выводят пять верхних строк и схему данных датафрейма spark соответственно.
# Преобразование из датафрейма pandas-spark в датафрейм spark
# Вывод пяти верхних строк датафрейма spark
sdf = psdf.to_spark()
sdf.show(5)
![Изображение 6: Вывод пяти верхних строк датафрейма spark. Изображение 6: Вывод пяти верхних строк датафрейма spark.](https://habrastorage.org/getpro/habr/upload_files/4f1/4f3/686/4f14f3686e9e7ae9d02a1b8861b7f567.png)
# Вывод схемы
sdf.printSchema()
![Изображение 7: Вывод схемы датафрейма spark. Изображение 7: Вывод схемы датафрейма spark.](https://habrastorage.org/getpro/habr/upload_files/389/8ee/141/3898ee141582fe99bbbbafeb1ee6811f.png)
6.2. Чтение данных из csv-файла в виде датафрейма spark и их преобразование в датафрейм pandas-spark
Мы можем преобразовать датафрейм spark в датафрейм pandas-spark с помощью команды to_pandas_on_spark()
. Она принимает на вход датафрейм spark, на выходе мы получаем датафрейм pandas-spark (как вы могли и сами догадаться). Ниже, мы читаем данные в виде датафрейма spark (назовем его: sdf1). Чтобы подтвердить, что это датафрейм spark, мы можем использовать type(sdf1)
, который определяет, что это точно датафhейм spark, т.е. ‘pyspark.sql.dataframe.DataFrame’
.
# Чтение данных с помощью spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
![Изображение 8. Результирующий тип - датафрейм spark. Изображение 8. Результирующий тип - датафрейм spark.](https://habrastorage.org/getpro/habr/upload_files/3f6/730/a08/3f6730a08ffb3457f27ef862fe803314.png)
После преобразования в датафрейм pandas-spark (psdf1) результирующим типом будет “pyspark.pandas.frame.DataFrame”
. Мы можем использовать функцию pandas
, например, .head()
, чтобы убедиться, что это все таки датафрейм pandas-spark.
# Преобразование в датафрейм pandas-spark
psdf1 = sdf1.to_pandas_on_spark()
# Вывод двух верхних строк
psdf1.head(2)
![Изображение 9. Вывод двух верхних строк датафрейма pandas-spark. Изображение 9. Вывод двух верхних строк датафрейма pandas-spark.](https://habrastorage.org/getpro/habr/upload_files/aab/d48/72c/aabd4872c1c99686e8afc07cd9bb0907.png)
# Проверка типа psdf1
type(psdf1)
![Изображение 10. Результирующий тип - датафрейм pandas-spark. Изображение 10. Результирующий тип - датафрейм pandas-spark.](https://habrastorage.org/getpro/habr/upload_files/62d/8b4/3e0/62d8b43e02f213b8e96cf0299fde1114.png)
6.3 Создание датафрейма pandas-spark
В этом разделе мы разберем, как вместо создания датафрейма pandas-spark из CSV-файла сделать это напрямую, импортировав pyspark.pandas
как ps
. Ниже с помощью ps.DataFrame()
мы создали датафрейм pandas-spark (psdf2). У psdf2 два признака и три строки.
import pandas as pd
import pyspark.pandas as ps
# Создание датафрейма pandas-spark
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
![Изображение 11. Созданный нами датафрейм pandas -spark. Изображение 11. Созданный нами датафрейм pandas -spark.](https://habrastorage.org/getpro/habr/upload_files/c57/ad4/617/c57ad46173da00a83ef3ae62c51bc045.png)
Если мы хотим преобразовать датафрейм pandas-spark (psdf2) обратно в датафрейм spark, то для этого у нас есть функция to_spark()
, о которой мы ранее уже упоминали. Синтаксис обеспечивает гибкость при смене типов датафрейм, что может оказаться довольно полезным в зависимости от функций (pandas или spark), которые вы хотите использовать в своем анализе.
# Обратное преобразование датафрейма pandas-spark в датафрейм spark
sdf2 = psdf2.to_spark()
sdf2.show(2)
![Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark Изображение 12. Датафрейм spark, преобразованный из датафрейма pandas-spark](https://habrastorage.org/getpro/habr/upload_files/a1c/669/54f/a1c66954f535bd217aa22ffa14a8d63f.png)
7. Применение SQL-запросов непосредственно к датафреймам pandas-spark
Еще одна замечательная тема для обсуждения в рамках pandas-spark API — это функция sql
. Давайте используем эту функцию на созданном ранее датафрейме pandas-spark (psdf2) для извлечения некоторой информации. По сути для выполнения SQL-запроса нам просто нужно запустить функцию ps.sql()
поверх датафрейма pandas-spark. Как показано ниже, функция count(*)
для данных psdf2 результат равный трем. Точно так же второй запрос выводит отфильтрованные данные со score болеьше 80.
# Реализация SQL-запроса. Входные данные: датафрейм pandas-spark (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
![Изображение 13. Отображение результатов sql-запроса: всего три записи. Изображение 13. Отображение результатов sql-запроса: всего три записи.](https://habrastorage.org/getpro/habr/upload_files/353/31a/0bb/35331a0bb2e361afd4cbf8f546e2c70c.png)
# Возвращает датафрейм pandas-spark
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
![Изображение 14. Отображение результатов sql-запроса: score> 80 Изображение 14. Отображение результатов sql-запроса: score> 80](https://habrastorage.org/getpro/habr/upload_files/89a/ee8/92f/89aee892f7823ebe0d556e9e77a613a8.png)
8. Графики датафреймов pandas и pandas-spark
Супер! Рад, что вы дошли до этого момента. Теперь предлагаю кратко затронуть возможности построения графиков нашего нового API pandas-spark. В отличие от статического графика по умолчанию в стандартном API Python pandas, график по умолчанию в API pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Сейчас мы импортируем данные в виде датафрейма pandas и pandas-spark и построим гистограмму по переменной зарплаты (salary) для каждого из типов данных.
# Чтение данных в виде датафрейма pandas
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)
![Изображение 15. Чтение данные в виде датафрейма pandas. Изображение 15. Чтение данные в виде датафрейма pandas.](https://habrastorage.org/getpro/habr/upload_files/636/5ea/37e/6365ea37e7d0baa58c6170d1b46bdbcf.png)
На изображении ниже показана гистограмма зарплаты из датафрейма pandas.
# Чтение данных в виде датафрейма pandas-spark
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
# pyspark.pandas.frame.DataFrame
# постороение гистограмма по датафрейму pandas
pddf['salary'].hist(bins=3)
![Изображение 16. Дефолтная python-гистограмма датафрейма pandas Изображение 16. Дефолтная python-гистограмма датафрейма pandas](https://habrastorage.org/getpro/habr/upload_files/e47/2d1/6f4/e472d16f4898008aaf34d44d03e4becc.png)
Ниже пример гистограммы по той же переменной на основе датафрейма pandas-spark, которая в сущности является интерактивным графиком.
Примечание: Приведенный ниже график вставлен как изображение, поэтому он статичен. Если вы запустите приведенный ниже синтаксис в jupyter notebook у вас будет возможность увеличивать/уменьшать масштаб (сделать его интерактивным).
# построение гистограммы по датафрейму pandas-spark
import plotly
pdsdf['salary'].hist(bins=3)
![Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark. Изображение 17. Скрин интерактивного графика по датафрему pandas-pyspark.](https://habrastorage.org/getpro/habr/upload_files/809/c57/652/809c57652f5aff34f63890a837d250df.png)
9. Переход от Koalas в API Pandas
Напоследок давайте поговорим о том, какие изменения требуются при переходе от библиотеки Koalas в API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса: что было в Koalas, и как это выглядит в новом API pandas-spark.
![Таблица 1. Переход от Koalas в API pandas-spark Таблица 1. Переход от Koalas в API pandas-spark](https://habrastorage.org/getpro/habr/upload_files/93c/ba4/b99/93cba4b9995d80e7b2c501df98a5e970.png)
10. Заключение
В этой статье вы узнали о способах использования недавно добавленной API pandas в spark3.2.0 с целью чтения данных, создания датафрейма, использования SQL непосредственно во фреймворке pandas-spark и перехода от существующей библиотеки Koalas в API pandas-spark.
Спасибо внимание! Подписывайтесь на мой аккаунт в LinkedIn, чтобы быть в курсе обновлений по полезным навыкам работы в датасайенс.
Ссылки
https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html
https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/
https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html
Материал подготовлен в рамках курса «Spark Developer».
Всех желающих приглашаем на бесплатное demo-занятие «Написание коннекторов для Spark». На занятии мы разберем подключение к внешним системам из коробки и создание кастомного коннектора для подключения к нестандартным БД.
→ РЕГИСТРАЦИЯ
Ananiev_Genrih
Отличная копипаста из гугл-транслэйт