Автор статьи: Вадим Опольский

Luxoft DXC Technology, Scala Big Data разработчик

В прошлой статье мы рассмотрели количество партиций, которое по умолчанию создается Apache Spark при инициализации DataFrame, DataSet:

  1. созданных на основе Scala коллекции;

  2. созданных из памяти; 

  3. созданных из RDD;

  4. созданных из файла на Hadoop Distributed File System;

  5. созданных из файла на AWS S3;

  6. полученного из джойна двух DataFrame или DataSet.

В текущей статье продолжим рассматривать количество партиций у Spark DataFrame и DataSet, созданных на основе таблицы в Relational Database 

Количество партиций в созданном DataFrame может дать ответ на вопрос: сколько Apache Spark создал ефлов для выполнения параллельной загрузки данных из вышеперечисленных стораджей. И так как основное преимущество использования Apache Spark это возможность организации параллельной обработки распределенных данных на машинах с ограниченной мощностью, то для Data ENgineer важно как распараллелить загрузку в самом начале.    

Давайте начнем с DataFrame созданного на основе таблицы Relational DataBase. 

Для таких Relational DataBase как:

  • DB2

  • MariaDB

  • MS Sql

  • Oracle

  • PostgreSQL

Самый простой вариант чтения таблицы Relational Database employee из схемы HR** с примерами выглядит как:

val parallelism = 3
val spark = SparkSession
  .builder()
  .appName("Integrating Postgres")
  .master(s"local[$parallelism]")
  .getOrCreate()
val jdbcEmployeesDF: DataFrame = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/spark")
  .option("dbtable", "public.employees")
  .option("user", "docker")
  .option("password", "docker")
  .load()
println(s"Partitions number is ${jdbcEmployeesDF.rdd.getNumPartitions}")
println(s"Raw Count of employee is ${jdbcEmployeesDF.count()}")

В результате такого чтения будет получено следующее:

"Partitions number is 1"

"Raw Count of employee is 3003"

Выходит, что по дефолту Apache Spark создает один task и читает данные в один поток. Что с точки зрения наличия кластера и дальнейшей распределенной обработки неадекватно.

Такой же результат мы получим, если будем читать данные из партиционированной таблицы Postgres.  

Т.е. Apache Spark, вычитывая данные, не маппит партиции Postgres на партиции Spark Dataframe.

Как же управлять параллельным чтением и количеством партиций?

Метод .repartition(n) и .repartition(col(colName)) не подходят, так как паралелизация будет происходить уже после того как данные из базы будут в памяти Spark. 

Apache Spark поддерживает специальные параметры для DataFrame, DataSet, созданных на основе таблице. Все параметры подробно описаны в документации

Для реализации параллелизма нам интересны следующие:

columnName или partitionColumn — колонка, которая будет использоваться для формирования фильтра в SQL запросах с различных брокеров;

lowerBound, upperBound — нижняя граница и верхняя граница;

numPartitions — количество частей на которое будет разбита таблица для параллельного чтения.

val jdbcEmployeesPartitionedByEmpNoDF: DataSet[Employee] =
  spark.read
  .jdbc(url = "jdbc:postgresql://localhost:5432/spark",
	table = "public.employees",
	columnName = "emp_no",
	lowerBound = 	110010,
	upperBound =  499990,
	numPartitions = 10, connectionProperties
  ).as[Employee]
println(s"Partitions number is ${jdbcEmployeesPartitionedByEmpNoDF.rdd.getNumPartitions}")
println(s"Raw Count of employee is ${jdbcEmployeesPartitionedByEmpNoDF.count()}")

В результате такого чтения будет получено следующее:

"Partitions count 0
min emp_no = 10010 and max emp_no = 111400
Partitions count 10003
min emp_no = 200000 and max emp_no = 227000
Partitions count 2701
min emp_no = 227010 and max emp_no = 266000
Partitions count 3900
min emp_no = 266010 and max emp_no = 299990
Partitions count 3399
Partitions count 0
Partitions count 0
min emp_no = 400000 and max emp_no = 421990
Partitions count 2200
min emp_no = 422000 and max emp_no = 460990
Partitions count 3900
min emp_no = 461000 and max emp_no = 499990
Partitions count 3900
Partitions number is 10
Raw Count of employee is 30003"

Как видно на логах в результате выполнения кода загрузка происходит в 7 потоков, каждый из которых читает свою часть данных из базы. Наличие записей с номерами меньше 110010 говорит о том, что lowerBound  и upperBound из параметров используются только  для определения шага раздела, а не для фильтрации строк в таблице. Таким образом, все строки в таблице будут разделены и возвращены.

Не последовательно идущие номера emp_no  в employees приводит к том, что есть перекос в партициях. 

Решить эту проблему можно несколькими способами: 

Первое, например, подобрать параметры lowerBound  и upperBound для определения оптимального шага. 

Также можно использовать DataSource v2 и написать собственный кастомный коннектор. Кастомный коннектор позволяется также реализовать например маппинг партиций таблицы Postgres на партиции DataFrame. Чтобы реализовать написать кастомный коннектор, необходимы базовые знания Scala (так как Apache Spark написан на Scala) и знания работы RDD, DataFrame, DataSet под капотом. На курсе ОТУСа Spark Developer можно быстро освоить все необходимые знания и написать кастомный коннектор.

Параметр для DataFrame массив предикатов это еще один способ разбить исходную таблицу в реляционной базе данных, когда, например, у нас нет колонки с последовательно идущими неповторяющимся числовыми значениями, например id. 

В простом случае мы можем разделить DataFrame на две части, например, все записи с gender male будут в одном дата фрейме, все остальные в другом. Код, который позволит загружать данные по половому признаку, будет выглядеть так:

val jdbcEmployeesPartitionedByGenderDF =
  spark.read
	.jdbc(url = "jdbc:postgresql://localhost:5432/spark",
  	         table = "public.employees",
   	         predicates = Array("gender = 'M'", "gender = 'F'"),
  	         connectionProperties = connectionProperties)
jdbcEmployeesPartitionedByGenderDF.rdd.foreachPartition { it =>
   println(s"Partitions count = ${it.length}")
}

Partitions count = 12091
Partitions count = 17912
Partitions number is 2
Raw Count of employee is 30003

Такой вариант немного лучше, чем загрузка данных в один поток. 

Если запустить код с таким параметром:

predicates = Array("gender = 'M'", "gender = 'F'", "gender = 'M'")

Результатом будет 

Partitions count = 12091
Partitions count = 17912
Partitions count = 17912
Partitions number is 3
Raw Count of employee is 47915

Как видно из результата, была создана еще одна партиция и данные были дуплицированные. Именно поэтому необходимо аккуратно выбирать предикаты для работы.

В статье мы разобрали, как увеличить скорость загрузки данных из реляционной базы в Spark в параллельном режиме, а также проблемы, с которыми мы можем встретиться. 

** созданная таблица

CREATE TABLE employees (
	emp_no  	INT         	NOT NULL,
	birth_date  DATE        	NOT NULL,
	first_name  VARCHAR(14) 	NOT NULL,
	last_name   VARCHAR(16) 	NOT NULL,
	gender  	gender      	NULL,
	hire_date   DATE        	NOT NULL,
	PRIMARY KEY (emp_no)
);

** созданная таблица

CREATE TABLE employees (
	emp_no  	INT         	NOT NULL,
	birth_date  DATE        	NOT NULL,
	first_name  VARCHAR(14) 	NOT NULL,
	last_name   VARCHAR(16) 	NOT NULL,
	gender  	gender      	NULL,
	hire_date   DATE        	NOT NULL,
	PRIMARY KEY (emp_no)
) PARTITION BY HASH (emp_no);

В заключение приглашаю всех желающих на открытое занятие «Spark в Kubernetes». На уроке мы:

  • Кратко рассмотрим, что такое Spark и что такое Kubernetes;

  • Рассмотрим особенности и варианты запуска Spark в Kubernetes;

  • Сравним работу Spark в Kubernetus и Hadoop Yarn.

Регистрация на урок.

Комментарии (0)