В статье будет рассмотрен процесс экспорта данных в Hadoop из различных РСУБД посредством фреймворка Spark. Для взаимодействия с фреймворком Spark будет использован язык программирования Python с применением api pySpark.
Основные понятия
Реляционная СУБД (Relational Database Management System, RDBMS) – это система, обеспечивающая доступ к реляционной базе данных, в основе которой лежат данные, хранящиеся в табличном виде;
Apache Hadoop – это коллекция утилит, библиотек и фреймворк с открытым исходным кодом, предназначенных для разработки и выполнения распределенных программ, работающих на кластерах из множества узлов;
Apache Spark – фреймворк для распределенной пакетной и потоковой обработки больших неструктурированных и слабоструктурированных данных, с открытым исходным кодом, входящий в экосистему Hadoop.
Взаимодействие SPARK с Реляционными СУБД
В основе взаимодействия фреймворка Spark с реляционными базами данных лежит JDBC (Java Database Connectivity) Driver, реализованный на языке Java. При использовании JDBC происходит подключение к базе данных по специальному URL, который включает в себя адрес сервера и порт, по которому происходит отправка запросов.
Для того, чтобы приступить к реализации взаимодействия Spark и RDBMS, сначала необходимо сконфигурировать базовые свойства создаваемого application при создании SparkSession:
executors (количество исполнителей) – параллельные процессы, отвечающие за выполнение Spark Tasks.
executor cores (количество ядер для каждого исполнителя) – максимальное количество параллельно выполняемых задач, на одном исполнителе.
executor memory (память, выделяемая для каждого исполнителя) – количество ОЗУ, выделяемое системой на одного исполнителя.
driver memory (память, выделяемая для драйвера) – количество ОЗУ, выделяемое для драйвера Spark.
SparkSession – это способ инициализации базовой функциональности API (точка входа) для программного создания RDD, Dataframe и Dataset.
Рассмотрим пример, отвечающий за конфигурирование базовых свойств создаваемого application и создание SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "2g") \
.config("spark.driver.memory", "4g") \
.appName("application_name") \
.getOrCreate()
Теперь, когда SparkSession создана, можно приступать к отправлению запросов в базу данных.
Основные опции JDBC для взаимодействия с базой данных:
№ |
Название опции |
Описание |
Ограничения |
1 |
url |
URL-адрес для подключения к базе данных по JDBC |
|
2 |
dbtable |
Таблица или запрос, обернутый в скобки, который может быть добавлен в блок FROM SQL- запроса |
|
3 |
query |
Запрос, который будет обернут в скобки и вставлен в блок FROM в качестве подзапроса. |
|
4 |
partitionColumn |
Описание того, как разделить таблицу при параллельном чтении. Название колонки по которой будет проводиться разделение объекта. |
Идет в группе с опциями 5, 6 и 7. Столбец раздела должен быть цифровым столбцом, датой или меткой времени из рассматриваемой таблицы. |
5 |
lowerBound |
Описание того, как разделить таблицу при параллельном чтении. Нижняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. |
Идет в группе с опциями 4, 6 и 7 |
6 |
upperBound |
Описание того, как разделить таблицу при параллельном чтении. Верхняя граница колонки по которой будет проводить разделение объекта. Используется для задания шага раздела, а не фильтрации данных. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. |
Идет в группе с опциями 4, 5 и 7 |
7 |
numPartitions |
Максимальное количество разделов, которые можно использовать для параллелизма при чтении таблицы. Это также определяет максимальное количество одновременных подключений JDBC. P.S.: более подробное описание опций будет рассмотрено на примерах в статье. |
Идет в группе с опциями 4, 5 и 6 |
8 |
fetchsize |
Размер выборки JDBC, который определяет, сколько строк нужно извлекать за один проход. Это может повысить производительность драйверов JDBC, которые по умолчанию имеют низкий размер выборки (например, для ORACLE с 10 строками) |
|
9 |
user |
Пользователь, из-под которого происходит подключение к базе данных. |
|
10 |
password |
Пароль к пользователю (из опции 9) |
Для того, чтобы отправить запрос в базу, необходимо выбрать формат соединения. Форматом, отвечающим за соединения с базой данных является JDBC. После того, как был выбран формат, необходимо добавить опции, которые предоставят нужную информацию для подключения к БД, инициализируют требуемый запрос и, при необходимости, сообщат драйверу о том, каким образом необходимо произвести чтение данных из БД.
Рассмотрим пример, иллюстрирующий отправку запроса в базу данных:
dataframe = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@127.0.0.1:1521/testdb") \
.option("dbtable", "myOwner.myTableName") \
.option("partitionColumn", "ID") \
.option("lowerBound", 1) \
.option("upperBound", 1000) \
.option("numPartitions", 10) \
.option("fetchsize", 100) \
.option("user", "myUser") \
.option("password", "myPass") \
.load()
Мы видим, что spark
, используя JDBC
драйвер, пытается подключиться к БД jdbc:oracle:thin:@127.0.0.1:1521/testdb
и прочитать из нее myOwner.myTableName
, при этом используя разделение таблицы по колонке ID
на 10
разделов, для возможности параллельного чтения, где нижняя граница колонки ID = 1
, верхняя граница = 1000
. При этом размер выборки JDBC будет равняться 100
строкам.
После того, как выполнится выше представленный код, мы получим НЕ данные из этой таблицы, а фрейм данных (dataframe). Dataframe – это абстракция данных в SparkSQL, которые организованы в строки и типизированы по столбцам. Получая фрейм данных, мы можем узнать информацию о:
названиях полей объекта;
типах полей объекта;
свойстве nullable полей объекта.
fields = dataframe.schema.fields
names = [_.name.lower() for _ in fields] #названия полей объекта
types = [_.dataType for _ in fields] # типы полей объекта
nullable = [_.nullable for _ in fields] # свойство nullable полей объекта
Когда фрейм данных получен, мы можем перейти к записи данных в Hadoop Distributed File System (HDFS). Для этого необходимо вызвать у dataframe метод write, после чего объявить формат выходных данных (parquet, orc, csv, avro, etc), при необходимости добавить опции для записи и вызвать метод save().
Рассмотрим наиболее часто встречаемые опции/методы для записи:
№ |
Название |
Тип |
Описание |
Ограничения |
1 |
compression |
опция |
кодек сжатия, используемый при сохранении фрейма данных в файл. Наиболее распространенные кодеки: none, bzip2, gzip, snappy и др. Самым популярным кодеком для сжатия является snappy: он не нацелен на максимальное сжатие или совместимость с любой другой библиотекой сжатия. Вместо этого, он нацелен на увеличение скорости при работе со сжатыми файлами и разумное сжатие. Более подробно можно почитать по ссылке: http://google.github.io/snappy/ |
|
2 |
partitionBy(*cols) |
метод |
метод, применяемый для партицирования по заданным наименованиям колонок |
|
3 |
mode(str) |
метод |
режим записи данных. Наиболее часто используемые: overwrite (перезапись)/append (дозапись) |
|
4 |
delimiter |
опция |
задает один символ в качестве разделителя для каждого поля и значения. Если значение не задано, используется значение по умолчанию "," |
Применяется для формата csv |
5 |
header |
опция |
использует первую строку в качестве имен столбцов. Если значение не задано, используется значение по умолчанию false. |
Применяется для формата csv |
6 |
inferSchema |
опция |
автоматически формирует схему из входных данных. Для определения будет выполняться один дополнительный проход по данным. Если значение не задано, используется значение по умолчанию false. |
Применяется для формата csv |
7 |
repartitoin(numPartitions, *cols) |
метод |
возвращает новый фрейм данных, который разделен по хешу. Целочисленный параметр numPartitions указывает целевое количество разделов. Строковый параметр *cols задает столбец, по которому будет произведено деление данных. Метод используется для увеличения или уменьшения выходных разделов. При использовании будет произведено полное перемешивание данных и создаются разделы одинакового размера. |
|
8 |
coalesce(numPartitions) |
метод |
возвращает новый фрейм данных, разделенный на numPartitions разделов. Целочисленный параметр numPartitions указывает целевое количество разделов. Метод используется для увеличения или уменьшения выходных разделов, но при этом создает разделы с разным объемом данных за счет объединения уже имеющихся разделов. При использовании не будет произведено полное перемешивание данных. |
Когда дело доходит до выбора формата хранения данных, разработчики часто бросают свой взгляд в сторону parquet файлов. Parquet – это бинарный, колоночно-ориентированный формат хранения данных, изначально созданный для экосистемы hadoop.
Достоинства хранения данных в Parquet:
это файлы, а значит с ними легко работать, перемещать, бэкапить и реплицировать;
колончатый вид позволяет значительно ускорить работу аналитика, если ему не нужны все колонки сразу;
нативная поддержка в Spark из коробки обеспечивает возможность просто взять и сохранить файл в любимое хранилище;
эффективное хранение с точки зрения занимаемого места;
обеспечивает самую быструю работу чтения по сравнению с использованием других файловых форматов.
Более подробную информацию о формате PARQUET можно почитать по ссылке: https://parquet.apache.org/documentation/latest/
Рассмотрим пример записи полученного фрейма данных в HDFS в формат Parquet:
dataframe.repartition(2).write \
.mode("overwrite") \
.partitionBy("myColumn") \
.format("parquet") \
.option("compression", "snappy") \
.save("/path/to/target/dir")
Из примера выше мы видим, что полученный dataframe
будет разделен на 2
равномерных раздела, после чего перед записью сотрет все данные (overwrite
), имеющиеся по пути /path/to/target/dir, произведет партицирование по колонке myColumn
и сохранит данные в формате parquet
по пути /path/to/target/dir
, применив сжатие snappy
.
Анализ процесса миграции данных из различных RDBMS в HADOOP
На наших проектах часто приходится решать задачи переноса больших данных между системами.
Например, есть таблицы, содержащие десятки Тб данных, на основании которых необходимо построить различные аналитические витрины. Первым шагом является вынос данных из продуктивной среды бизнес-приложений в среду аналитических хранилищ, чтобы работа с этими данными не повлекла деградацию сервиса бизнес-приложений. Самый распространенный пример, когда бизнес приложения работают с некоторой РСУБД, а анализируются в Hadoop. Как можно выполнить процесс выноса данных в Hadoop с помощью Spark, мы увидим ниже.
Проанализируем процесс выполнения следующего кода:
Запрос создания/структура таблицы "GRIGOREVDS.myTable" в БД:
create table GRIGOREVDS.myTable (
id NUMBER,
field1 VARCHAR2(1000),
field2 NUMBER,
field3 VARCHAR2(1000),
field4 NUMBER,
field5 VARCHAR2(1000),
field6 NUMBER
);
Заполнение таблицы "GRIGOREVDS.myTable" 100 строками, где колонка "ID" имеет целочисленный тип данных с шагом 1:
INSERT INTO GRIGOREVDS.myTable VALUES (1,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (2,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (3,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (4,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (5,'field1',2,'field3',4,'field5',6);
...
INSERT INTO GRIGOREVDS.myTable VALUES (99,'field1',2,'field3',4,'field5',6);
INSERT INTO GRIGOREVDS.myTable VALUES (100,'field1',2,'field3',4,'field5',6);
Python скрипт "migration.py" экспорта данных из RDBMS в HDFS, использующий API pyspark:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.memory", "1g") \
.appName("test_migration") \
.getOrCreate()
df = spark.read.format("jdbc") \
.option("url", "jdbc:oracle:thin:@localhost:1521/oradb") \
.option("user", "GRIGOREVDS") \
.option("password", "***") \
.option("dbtable", "GRIGOREVDS.myTable") \
.option("partitionColumn", "id") \
.option("lowerBound", 1) \
.option("upperBound", 100) \
.option("numPartitions", 10) \
.load()
df.repartition(2).write \
.mode("overwrite") \
.partitionBy("field1") \
.format("parquet") \
.option("compression", "snappy") \
.save("/tmp/migration/test")
После того, как скрипт будет запущен через YARN (например: spark-submit --master yarn ~/migration.py
) создастся Spark application, где можно будет отследить процесс выполнения запущенного скрипта в Spark History (Web сервер для визуализации создаваемых Jobs, Stages, Tasks, а также отображения логов выполнения приложений Spark в удобном виде).
Любое приложение в момент своей работы, состоит из драйвера – программы, которая выполняет основную функцию приложения (точка входа в приложение (main)), и исполнителей, которым на вход поступают данные и инструкция, по которой будет производиться выполнение на узлах кластера. После чего, исполнители отдают результат драйверу, чтобы получить новую задачу. При этом каждый исполнитель может выполнять обработку более чем в один поток, где каждый поток будет работать со своим блоком данных независимо друг от друга. Таким образом, если посмотреть на наш код выше, то можно заметить, что при запуске нашего приложения мы запросили у менеджера кластера одного исполнителя с двумя ядрами (потоками). Это говорит о том, что нам будет доступно (spark.executor.instances * spark.executor.cores
) 1 * 2 = 2 потока и в лучшем случае мы сможем обрабатывать два блока данных одновременно. При необходимости данные показатели можно увеличить.
Ниже приведен снимок экрана, на котором видна общая информация о создаваемых Jobs, после запуска нашего скрипта:
Можно заметить, что запущенный процесс состоит из 1-ого Job (save), который, в свою очередь, делится на 12 задач.
На скриншоте выше, можно увидеть, что созданный Job был разбит на 2-а этапа:
Чтение данных из базы данных (Stage Id: 0), который состоит из 10 задач;
Запись данных в HDFS (Stage Id: 1), который состоит из двух задач;
Перейдя в этап чтения данных можно увидеть, что были созданы и успешно выполнены 10 Tasks (tasks: 0 9).
1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях, в рамках рассматриваемого этапа
Разберемся, откуда взялись 10 задач, если чтение производилось одной таблицы (GRIGOREVDS.myTable). Посмотрев внимательно на код, можно заметить, что были применены дополнительные опции (partitionColumn, lowerBound, upperBound, numPartitions
), которые позволили распараллелить чтение данных из нашей таблицы. В результате, после применения данных опций мы получили 10 (numPartitions
), сгенерированных JDBC драйвером, запросов в БД.
Посмотрим детальнее на процесс генерации запросов. Когда задаются данные опций, часто возникает вопрос: А что же делают опции lowerBound и upperBound? Могу ли я наложить таким образом фильтр на данные и выгрузить только те, которые попали в диапазон данных опций? Ответ - нет. При указании данных опций мы лишь задаем крайние границы значений выбранной колонки partitionColumn
, чтобы в дальнейшем получить наиболее равномерное распределение данных, с помощью запросов выборки. В рассматриваемом примере мы увидим, что у нас имеется в таблице 100 записей, которые должны быть разбиты на 10 блоков по колонке "ID". В результате мы получим:
Блок |
Запрос выборки |
Накладываемый фильтр на данные |
1 |
SELECT |
WHERE "ID" < 11 or "ID" is null |
2 |
WHERE "ID" >= 11 and "ID" < 21 |
|
3 |
WHERE "ID" >= 21 and "ID" < 31 |
|
4 |
WHERE "ID" >= 31 and "ID" < 41 |
|
5 |
WHERE "ID" >= 41 and "ID" < 51 |
|
6 |
WHERE "ID" >= 51 and "ID" < 61 |
|
7 |
WHERE "ID" >= 61 and "ID" < 71 |
|
8 |
WHERE "ID" >= 71 and "ID" < 81 |
|
9 |
WHERE "ID" >= 81 and "ID" < 91 |
|
10 |
WHERE "ID" >= 91 |
Примеры отправленных запросов в Oracle представлены на скриншотах ниже:
Посмотреть информацию о том, было ли применено параллельное чтение данных, сколько строк было обработано в процессе работы каждого из этапов, количество выходных файлов, размер выходных данных и др., можно на вкладке SQL, перейдя в интересующий Query ID:
1 - SQL блок
2 - колонка для идентификации Queries
3 - ссылка на конкретный Query ID
1 - время выполнения блока
2 - параллельное чтение данных (разбиение на разделы) + количество разделов
3 - количество выходных файлов
4 - размер выходных данных
5 - количество выходных строк
После того, как фрейм данных получен, запускается блок записи в HDFS. В рамках рассматриваемого приложения в Spark History - это Stage Id: 1.
Так как все выходные данные решено было поделить на две, близкие к равенству по размеру части repartition(2)
, то в Spark History мы, соответственно, увидим две созданные задачи на запись. См. скриншот ниже:
1 - количество созданных задач, в рамках рассматриваемого этапа
2 - созданные задачи и их детальная информация
3 - детальная информация о задействованных исполнителях в рамках рассматриваемого этапа
В итоге, после успешного завершения работы скрипта, в директории HDFS (/tmp/migration/test
) мы увидим
файл "_SUCCESS", который будет сигнализировать пользователю о том, что запись была произведена успешно. А также директорию, которая была создана в результате выполнения партицирования по колонке "field1" (partitionBy("field1")
), в которой будут лежать два файла repartition(2)
, содержащие в себе экспортируемые из БД данные в формате parquet с применением кодека сжатия snappy.
1 - файл, сигнализирующий успешную запись
2 - директория, создавшаяся в результате партицирования
3 - два файла с данными
Название директории, по которой проводится партицирование, будет состоять из двух частей: название колонки, по которой проводится партицирование и сгруппированные значения из этой колонки.
Заключение
В рамках данной статьи мы рассмотрели вариант экспорта данных из реляционной базы данных в HDFS, с применением фреймворка Apache Spark. Кроме того, проанализировали поэтапно процесс выполнения исполняемого кода, а также рассмотрели возможные методы и опции для оптимизации и ускорения процесса выгрузки данных.
Комментарии (8)
Ninil
28.10.2021 22:16Хорошо бы добавить в статью, каким образом будет физически выполняться экспорт, если размер таблицы намного больше размера ОЗУ?
sshikov
29.10.2021 16:44Спарк не держит весь датасет в памяти. Зачем? Если у вас 10 партиций, то для каждой из них одновременно в памяти будет fetchSize строк. А дальше вполне можно скидывать данные на диск в паркеты. Ну, по крайней мере если вы их сразу не обрабатываете в процессе экспорта — тогда да, потребности могут быть другие.
Tellamonid
29.10.2021 09:11А почему Spark, а а не sqoop? Sqoop же как раз сделан для миграции между реляционными базами и Хадупом, и обратно.
sshikov
29.10.2021 16:38Sqoop тупой как валенок. В частности, кто запрос-то будет строить для него? Ну в смысле, если там хоть какие-то условия нужны, которые меняются время от времени? А если запрос уже нужно построить — то зачем Sqoop нужен, когда спарк сам может выполнить?
neoflex Автор
29.10.2021 19:46Apache Sqoop также является одним из решений миграции данных между РСУБД и Hadoop, но при миграции данных, может возникнуть потребность в индивидуальной обработке при извлечении данных и загрузкой в Hadoop. В таком случае Sqoop плохо подходит для реализации такого рода задач, и тогда предпочтительней будет использовать утилиты JDBC Apache Spark.
sshikov
Такое впечатление, что автор остановился на полдороги. Как правило, миграция не заканчивается на выполнении одного запроса. Чаще по опыту нужно синхронизировать СУБД и Hadoop регулярно, и чем чаще (и чем ближе они друг к другу), тем лучше. Как минимум, бизнесу хочется иметь Т-1. А как только вы хотите периодичность — вам как правило нужно точно знать, на какой момент вы имеете копию. И тут начинается много чего интересного… Скажем, у нас бизнес хочет иметь в хадупе копию через час после закрытия опердня (если не раньше), чтобы успеть в этот же календарный день построить в хадупе отчетность для регулятора.
BTW, раз уж вы такое делаете, можете рассказать, с какой примерно скоростью удается доставать данные по JDBC? В идеале с указанием на то, сколько примерно ресурсов в наличии в СУБД и хадупе.
neoflex Автор
Вы правы, отметив, что миграция не заканчивается на выполнении одного запроса, особенно, если бизнес хочет иметь в hadoop копию через час после закрытия опердня для построения отчетности. Для данных задач, может потребоваться разработка различных систем и сервисов, с развернутой архитектурой для реализации конкретных задач. В статье хотелось отразить концепцию взаимодействия СУБД и HADOOP, а также отразить некоторые важные нюансы протекания данного процесса.
Рассматривая вопрос скорости доставления данных по JDBC, тут есть множество посторонних факторов, которые могут повлиять на данный процесс (например: скорость передачи данных по сети, характеристики жестких дисков и др.). Рассматривая системы, с которыми мне приходилось работать, средняя скорость доставки данных для различных систем варьировалась от 70 до 100 Мбит/c.
sshikov
>некоторые важные нюансы
ну вот у нас нюанс простой — если вы хотите предикаты, или тем более, конкретные ключи, то надо учитывать, что спарк не умеет bind variables. Поэтому все условия — в запросе. А если ключей вам нужно много — то запрос быстро растет, достигает того размера, когда парсеру становится плохо. Ну т.е. у нас скажем были случаи, когда запрос получался размером в мегабайты, и этот запрос тупо ронял Oracle, или приводил к его деградации.