В статье будет рассмотрен процесс экспорта данных в Hadoop из различных РСУБД посредством фреймворка Spark. Для взаимодействия с фреймворком Spark будет использован язык программирования Python с применением api pySpark.

Основные понятия

  • Реляционная СУБД (Relational Database Management System, RDBMS) – это система, обеспечивающая доступ к реляционной базе данных, в основе которой лежат данные, хранящиеся в табличном виде;

  • Apache Hadoop – это коллекция утилит, библиотек и фреймворк с открытым исходным кодом, предназначенных для разработки и выполнения распределенных программ, работающих на кластерах из множества узлов;

  • Apache Spark – фреймворк для распределенной пакетной и потоковой обработки больших неструктурированных и слабоструктурированных данных, с открытым исходным кодом, входящий в экосистему Hadoop.

Взаимодействие SPARK с Реляционными СУБД

В основе взаимодействия фреймворка Spark с реляционными базами данных лежит JDBC (Java Database Connectivity) Driver, реализованный на языке Java. При использовании JDBC происходит подключение к базе данных по специальному URL, который включает в себя адрес сервера и порт, по которому происходит отправка запросов.

Для того, чтобы приступить к реализации взаимодействия Spark и RDBMS, сначала необходимо сконфигурировать базовые свойства создаваемого application при создании SparkSession:

  • executors (количество исполнителей) – параллельные процессы, отвечающие за выполнение Spark Tasks.

  • executor cores (количество ядер для каждого исполнителя) – максимальное количество параллельно выполняемых задач, на одном исполнителе.

  • executor memory (память, выделяемая для каждого исполнителя) – количество ОЗУ, выделяемое системой на одного исполнителя.

  • driver memory (память, выделяемая для драйвера) – количество ОЗУ, выделяемое для драйвера Spark.

SparkSession – это способ инициализации базовой функциональности API (точка входа) для программного создания RDD, Dataframe и Dataset.

Рассмотрим пример, отвечающий за конфигурирование базовых свойств создаваемого application и создание SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .config("spark.executor.instances", "2") \
  .config("spark.executor.cores", "2") \
  .config("spark.executor.memory", "2g") \
  .config("spark.driver.memory", "4g") \
  .appName("application_name") \
  .getOrCreate()

Теперь, когда SparkSession создана, можно приступать к отправлению запросов в базу данных.

Основные опции JDBC для взаимодействия с базой данных:

Название опции

Описание

Ограничения

1

url

URL-адрес для подключения к базе данных по JDBC

2

dbtable

Таблица или запрос, обернутый в скобки, который может быть добавлен в блок FROM SQL- запроса

3

query

Запрос, который будет обернут в скобки и вставлен в блок FROM в качестве подзапроса.

4

partitionColumn

Описание того, как разделить таблицу при параллельном чтении. Название колонки по которой будет проводиться разделение объекта.

Идет в группе с опциями 5, 6 и 7. Столбец раздела должен быть цифровым столбцом, датой или меткой времени из рассматриваемой таблицы.

5

lowerBound

Описание того, как разделить таблицу при параллельном чтении. Нижняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 6 и 7

6

upperBound

Описание того, как разделить таблицу при параллельном чтении. Верхняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 5 и 7

7

numPartitions

Максимальное количество разделов, которые можно использовать для параллелизма при чтении таблицы. Это также определяет максимальное количество одновременных подключений JDBC. P.S.: более подробное описание опций будет рассмотрено на примерах в статье.

Идет в группе с опциями 4, 5 и 6

8

fetchsize

Размер выборки JDBC, который определяет, сколько строк нужно извлекать за один проход. Это может повысить производительность драйверов JDBC, которые по умолчанию имеют низкий размер выборки (например, для ORACLE с 10 строками)

9

user

Пользователь, из-под которого происходит подключение к базе данных.

10

password

Пароль к пользователю (из опции 9)

Для того, чтобы отправить запрос в базу, необходимо выбрать формат соединения. Форматом, отвечающим за соединения с базой данных является JDBC. После того, как был выбран формат, необходимо добавить опции, которые предоставят нужную информацию для подключения к БД, инициализируют требуемый запрос и, при необходимости, сообщат драйверу о том, каким образом необходимо произвести чтение данных из БД.

Рассмотрим пример, иллюстрирующий отправку запроса в базу данных:

dataframe = spark.read.format("jdbc") \
    .option("url", "jdbc:oracle:thin:@127.0.0.1:1521/testdb") \
  .option("dbtable", "myOwner.myTableName") \
  .option("partitionColumn", "ID") \
  .option("lowerBound", 1) \
  .option("upperBound", 1000) \
  .option("numPartitions", 10) \
  .option("fetchsize", 100) \
  .option("user", "myUser") \
  .option("password", "myPass") \
  .load()

Мы видим, что spark, используя JDBC драйвер, пытается подключиться к БД jdbc:oracle:thin:@127.0.0.1:1521/testdb и прочитать из нее myOwner.myTableName, при этом используя разделение таблицы по колонке ID на 10 разделов, для возможности параллельного чтения, где нижняя граница колонки ID = 1, верхняя граница = 1000. При этом размер выборки JDBC будет равняться 100 строкам.

После того, как выполнится выше представленный код, мы получим НЕ данные из этой таблицы, а фрейм данных (dataframe). Dataframe – это абстракция данных в SparkSQL, которые организованы в строки и типизированы по столбцам. Получая фрейм данных, мы можем узнать информацию о:

  • названиях полей объекта;

  • типах полей объекта;

  • свойстве nullable полей объекта.

fields = dataframe.schema.fields
names = [_.name.lower() for _ in fields] #названия полей объекта
types = [_.dataType for _ in fields] # типы полей объекта
nullable = [_.nullable for _ in fields] # свойство nullable полей объекта

Когда фрейм данных получен, мы можем перейти к записи данных в Hadoop Distributed File System (HDFS). Для этого необходимо вызвать у dataframe метод write, после чего объявить формат выходных данных (parquet, orc, csv, avro, etc), при необходимости добавить опции для записи и вызвать метод save().

Рассмотрим наиболее часто встречаемые опции/методы для записи:

Название

Тип

Описание

Ограничения

1

compression

опция

кодек сжатия, используемый при сохранении фрейма данных в файл. Наиболее распространенные кодеки: none, bzip2, gzip, snappy и др. Самым популярным кодеком для сжатия является snappy: он не нацелен на максимальное сжатие или совместимость с любой другой библиотекой сжатия. Вместо этого, он нацелен на увеличение скорости при работе со сжатыми файлами и разумное сжатие. Более подробно можно почитать по ссылке: http://google.github.io/snappy/

2

partitionBy(*cols)

метод

метод, применяемый для партицирования по заданным наименованиям колонок

3

mode(str)

метод

режим записи данных. Наиболее часто используемые: overwrite (перезапись)/append (дозапись)

4

delimiter

опция

задает один символ в качестве разделителя для каждого поля и значения. Если значение не задано, используется значение по умолчанию ","

Применяется для формата csv

5

header

опция

использует первую строку в качестве имен столбцов. Если значение не задано, используется значение по умолчанию false.

Применяется для формата csv

6

inferSchema

опция

автоматически формирует схему из входных данных. Для определения будет выполняться один дополнительный проход по данным. Если значение не задано, используется значение по умолчанию false.

Применяется для формата csv

7

repartitoin(numPartitions, *cols)

метод

возвращает новый фрейм данных, который разделен по хешу. Целочисленный параметр numPartitions указывает целевое количество разделов. Строковый параметр *cols задает столбец, по которому будет произведено деление данных. Метод используется для увеличения или уменьшения выходных разделов. При использовании будет произведено полное перемешивание данных и создаются разделы одинакового размера.

8

coalesce(numPartitions)

метод

возвращает новый фрейм данных, разделенный на numPartitions разделов. Целочисленный параметр numPartitions указывает целевое количество разделов. Метод используется для увеличения или уменьшения выходных разделов, но при этом создает разделы с разным объемом данных за счет объединения уже имеющихся разделов. При использовании не будет произведено полное перемешивание данных.

Когда дело доходит до выбора формата хранения данных, разработчики часто бросают свой взгляд в сторону parquet файлов. Parquet – это бинарный, колоночно-ориентированный формат хранения данных, изначально созданный для экосистемы hadoop.

Достоинства хранения данных в Parquet:

  • это файлы, а значит с ними легко работать, перемещать, бэкапить и реплицировать;

  • колончатый вид позволяет значительно ускорить работу аналитика, если ему не нужны все колонки сразу;

  • нативная поддержка в Spark из коробки обеспечивает возможность просто взять и сохранить файл в любимое хранилище;

  • эффективное хранение с точки зрения занимаемого места;

  • обеспечивает самую быструю работу чтения по сравнению с использованием других файловых форматов.

Более подробную информацию о формате PARQUET можно почитать по ссылке: https://parquet.apache.org/documentation/latest/

Рассмотрим пример записи полученного фрейма данных в HDFS в формат Parquet:

dataframe.repartition(2).write \
  .mode("overwrite") \
  .partitionBy("myColumn") \
  .format("parquet") \
  .option("compression", "snappy") \
  .save("/path/to/target/dir")

Из примера выше мы видим, что полученный dataframe будет разделен на 2 равномерных раздела, после чего перед записью сотрет все данные (overwrite), имеющиеся по пути /path/to/target/dir, произведет партицирование по колонке myColumn и сохранит данные в формате parquet по пути /path/to/target/dir, применив сжатие snappy.

Анализ процесса миграции данных из различных RDBMS в HADOOP

На наших проектах часто приходится решать задачи переноса больших данных между системами.
Например, есть таблицы, содержащие десятки Тб данных, на основании которых необходимо построить различные аналитические витрины. Первым шагом является вынос данных из продуктивной среды бизнес-приложений в среду аналитических хранилищ, чтобы работа с этими данными не повлекла деградацию сервиса бизнес-приложений. Самый распространенный пример, когда бизнес приложения работают с некоторой РСУБД, а анализируются в Hadoop. Как можно выполнить процесс выноса данных в Hadoop с помощью Spark, мы увидим ниже.

Проанализируем процесс выполнения следующего кода:

Запрос создания/структура таблицы "GRIGOREVDS.myTable" в БД:

create table GRIGOREVDS.myTable (
        id         NUMBER,
        field1     VARCHAR2(1000),
        field2     NUMBER,
        field3     VARCHAR2(1000),
        field4     NUMBER,
        field5     VARCHAR2(1000),
        field6     NUMBER
);

Заполнение таблицы "GRIGOREVDS.myTable" 100 строками, где колонка "ID" имеет целочисленный тип данных с шагом 1:

INSERT INTO GRIGOREVDS.myTable VALUES (1,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (2,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (3,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (4,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (5,'field1',2,'field3',4,'field5',6);
...
INSERT INTO GRIGOREVDS.myTable VALUES (99,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (100,'field1',2,'field3',4,'field5',6);

Python скрипт "migration.py" экспорта данных из RDBMS в HDFS, использующий API pyspark:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.executor.instances", "1") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "1g") \
    .appName("test_migration") \
    .getOrCreate()

df = spark.read.format("jdbc") \
    .option("url", "jdbc:oracle:thin:@localhost:1521/oradb") \
    .option("user", "GRIGOREVDS") \
    .option("password", "***") \
    .option("dbtable", "GRIGOREVDS.myTable") \
    .option("partitionColumn", "id") \
    .option("lowerBound", 1) \
    .option("upperBound", 100) \
    .option("numPartitions", 10) \
    .load()

df.repartition(2).write \
    .mode("overwrite") \
    .partitionBy("field1") \
    .format("parquet") \
    .option("compression", "snappy") \
    .save("/tmp/migration/test")

После того, как скрипт будет запущен через YARN (например: spark-submit --master yarn ~/migration.py) создастся Spark application, где можно будет отследить процесс выполнения запущенного скрипта в Spark History (Web сервер для визуализации создаваемых Jobs, Stages, Tasks, а также отображения логов выполнения приложений Spark в удобном виде).

Любое приложение в момент своей работы, состоит из драйвера – программы, которая выполняет основную функцию приложения (точка входа в приложение (main)), и исполнителей, которым на вход поступают данные и инструкция, по которой будет производиться выполнение на узлах кластера. После чего, исполнители отдают результат драйверу, чтобы получить новую задачу. При этом каждый исполнитель может выполнять обработку более чем в один поток, где каждый поток будет работать со своим блоком данных независимо друг от друга. Таким образом, если посмотреть на наш код выше, то можно заметить, что при запуске нашего приложения мы запросили у менеджера кластера одного исполнителя с двумя ядрами (потоками). Это говорит о том, что нам будет доступно (spark.executor.instances * spark.executor.cores) 1 * 2 = 2 потока и в лучшем случае мы сможем обрабатывать два блока данных одновременно. При необходимости данные показатели можно увеличить.

Ниже приведен снимок экрана, на котором видна общая информация о создаваемых Jobs, после запуска нашего скрипта:

Вкладка JOBS
Вкладка JOBS
Прогресс по выполнению задач рассматриваемого Job
Прогресс по выполнению задач рассматриваемого Job

Можно заметить, что запущенный процесс состоит из 1-ого Job (save), который, в свою очередь, делится на 12 задач.

Детализация рассматриваемого Job
Детализация рассматриваемого Job

На скриншоте выше, можно увидеть, что созданный Job был разбит на 2-а этапа:

  1. Чтение данных из базы данных (Stage Id: 0), который состоит из 10 задач;

  2. Запись данных в HDFS (Stage Id: 1), который состоит из двух задач;

Перейдя в этап чтения данных можно увидеть, что были созданы и успешно выполнены 10 Tasks (tasks: 0 9).

Детализация этапа чтения данных
Детализация этапа чтения данных

1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях, в рамках рассматриваемого этапа

Разберемся, откуда взялись 10 задач, если чтение производилось одной таблицы (GRIGOREVDS.myTable). Посмотрев внимательно на код, можно заметить, что были применены дополнительные опции (partitionColumn, lowerBound, upperBound, numPartitions), которые позволили распараллелить чтение данных из нашей таблицы. В результате, после применения данных опций мы получили 10 (numPartitions), сгенерированных JDBC драйвером, запросов в БД.

Посмотрим детальнее на процесс генерации запросов. Когда задаются данные опций, часто возникает вопрос: А что же делают опции lowerBound и upperBound? Могу ли я наложить таким образом фильтр на данные и выгрузить только те, которые попали в диапазон данных опций? Ответ - нет. При указании данных опций мы лишь задаем крайние границы значений выбранной колонки partitionColumn, чтобы в дальнейшем получить наиболее равномерное распределение данных, с помощью запросов выборки. В рассматриваемом примере мы увидим, что у нас имеется в таблице 100 записей, которые должны быть разбиты на 10 блоков по колонке "ID". В результате мы получим:

Блок

Запрос выборки

Накладываемый фильтр на данные

1

SELECT
"ID",
"FIELD1",
"FIELD2",
"FIELD3",
"FIELD4",
"FIELD5",
"FIELD6"
FROM GRIGOREVDS.myTable

WHERE "ID" < 11 or "ID" is null

2

WHERE "ID" >= 11 and "ID" < 21

3

WHERE "ID" >= 21 and "ID" < 31

4

WHERE "ID" >= 31 and "ID" < 41

5

WHERE "ID" >= 41 and "ID" < 51

6

WHERE "ID" >= 51 and "ID" < 61

7

WHERE "ID" >= 61 and "ID" < 71

8

WHERE "ID" >= 71 and "ID" < 81

9

WHERE "ID" >= 81 and "ID" < 91

10

WHERE "ID" >= 91

Примеры отправленных запросов в Oracle представлены на скриншотах ниже:

Снимок детализации создаваемой сессии в Oracle (session 1)
Снимок детализации создаваемой сессии в Oracle (session 1)
Снимок детализации создаваемой сессии в Oracle (session 2)
Снимок детализации создаваемой сессии в Oracle (session 2)

Посмотреть информацию о том, было ли применено параллельное чтение данных, сколько строк было обработано в процессе работы каждого из этапов, количество выходных файлов, размер выходных данных и др., можно на вкладке SQL, перейдя в интересующий Query ID:

Вкладка SQL
Вкладка SQL

1 - SQL блок
2 - колонка для идентификации Queries
3 - ссылка на конкретный Query ID

Детализация рассматриваемого Query
Детализация рассматриваемого Query

1 - время выполнения блока
2 - параллельное чтение данных (разбиение на разделы) + количество разделов
3 - количество выходных файлов
4 - размер выходных данных
5 - количество выходных строк

После того, как фрейм данных получен, запускается блок записи в HDFS. В рамках рассматриваемого приложения в Spark History - это Stage Id: 1.

Так как все выходные данные решено было поделить на две, близкие к равенству по размеру части repartition(2), то в Spark History мы, соответственно, увидим две созданные задачи на запись. См. скриншот ниже:

Детализация этапа записи данных
Детализация этапа записи данных

1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях в рамках рассматриваемого этапа

В итоге, после успешного завершения работы скрипта, в директории HDFS (/tmp/migration/test) мы увидим
файл "_SUCCESS", который будет сигнализировать пользователю о том, что запись была произведена успешно. А также директорию, которая была создана в результате выполнения партицирования по колонке "field1" (partitionBy("field1")), в которой будут лежать два файла repartition(2), содержащие в себе экспортируемые из БД данные в формате parquet с применением кодека сжатия snappy.

1 - файл, сигнализирующий успешную запись
2 - директория, создавшаяся в результате партицирования
3 - два файла с данными

Название директории, по которой проводится партицирование, будет состоять из двух частей: название колонки, по которой проводится партицирование и сгруппированные значения из этой колонки.

Заключение

В рамках данной статьи мы рассмотрели вариант экспорта данных из реляционной базы данных в HDFS, с применением фреймворка Apache Spark. Кроме того, проанализировали поэтапно процесс выполнения исполняемого кода, а также рассмотрели возможные методы и опции для оптимизации и ускорения процесса выгрузки данных.

Комментарии (8)


  1. sshikov
    28.10.2021 19:09

    Такое впечатление, что автор остановился на полдороги. Как правило, миграция не заканчивается на выполнении одного запроса. Чаще по опыту нужно синхронизировать СУБД и Hadoop регулярно, и чем чаще (и чем ближе они друг к другу), тем лучше. Как минимум, бизнесу хочется иметь Т-1. А как только вы хотите периодичность — вам как правило нужно точно знать, на какой момент вы имеете копию. И тут начинается много чего интересного… Скажем, у нас бизнес хочет иметь в хадупе копию через час после закрытия опердня (если не раньше), чтобы успеть в этот же календарный день построить в хадупе отчетность для регулятора.

    BTW, раз уж вы такое делаете, можете рассказать, с какой примерно скоростью удается доставать данные по JDBC? В идеале с указанием на то, сколько примерно ресурсов в наличии в СУБД и хадупе.


    1. neoflex Автор
      29.10.2021 11:07

      Вы правы, отметив, что миграция не заканчивается на выполнении одного запроса, особенно, если бизнес хочет иметь в hadoop копию через час после закрытия опердня для построения отчетности. Для данных задач, может потребоваться разработка различных систем и сервисов, с развернутой архитектурой для реализации конкретных задач. В статье хотелось отразить концепцию взаимодействия СУБД и HADOOP, а также отразить некоторые важные нюансы протекания данного процесса.

      Рассматривая вопрос скорости доставления данных по JDBC, тут есть множество посторонних факторов, которые могут повлиять на данный процесс (например: скорость передачи данных по сети, характеристики жестких дисков и др.). Рассматривая системы, с которыми мне приходилось работать, средняя скорость доставки данных для различных систем варьировалась от 70 до 100 Мбит/c.


      1. sshikov
        29.10.2021 16:37

        >некоторые важные нюансы
        ну вот у нас нюанс простой — если вы хотите предикаты, или тем более, конкретные ключи, то надо учитывать, что спарк не умеет bind variables. Поэтому все условия — в запросе. А если ключей вам нужно много — то запрос быстро растет, достигает того размера, когда парсеру становится плохо. Ну т.е. у нас скажем были случаи, когда запрос получался размером в мегабайты, и этот запрос тупо ронял Oracle, или приводил к его деградации.


  1. Ninil
    28.10.2021 22:16

    Хорошо бы добавить в статью, каким образом будет физически выполняться экспорт, если размер таблицы намного больше размера ОЗУ?


    1. sshikov
      29.10.2021 16:44

      Спарк не держит весь датасет в памяти. Зачем? Если у вас 10 партиций, то для каждой из них одновременно в памяти будет fetchSize строк. А дальше вполне можно скидывать данные на диск в паркеты. Ну, по крайней мере если вы их сразу не обрабатываете в процессе экспорта — тогда да, потребности могут быть другие.


  1. Tellamonid
    29.10.2021 09:11

    А почему Spark, а а не sqoop? Sqoop же как раз сделан для миграции между реляционными базами и Хадупом, и обратно.


    1. sshikov
      29.10.2021 16:38

      Sqoop тупой как валенок. В частности, кто запрос-то будет строить для него? Ну в смысле, если там хоть какие-то условия нужны, которые меняются время от времени? А если запрос уже нужно построить — то зачем Sqoop нужен, когда спарк сам может выполнить?


    1. neoflex Автор
      29.10.2021 19:46

      Apache Sqoop также является одним из решений миграции данных между РСУБД и Hadoop, но при миграции данных, может возникнуть потребность в индивидуальной обработке при извлечении данных и загрузкой в Hadoop. В таком случае Sqoop плохо подходит для реализации такого рода задач, и тогда предпочтительней будет использовать утилиты JDBC Apache Spark.