Всем привет!
В этой статье мы расскажем, как нам удалось настроить взаимодействие Apache Spark и S3 для обработки больших файлов: с какими проблемами пришлось столкнуться и как нам удалось их решить.
Начнем с предыстории. Мы занимаемся разработкой фреймворка для автоматизации ETL-процессов, который на вход принимает файл с настройками для загрузки. Основным инструментом в фреймворке для обработки данных выступает Apache Spark. Изначально в качестве источников данных были только реляционные БД (Postgres, Oracle и MS SQL) и Kafka, а приемником был Apache Hive.
Однако некоторые источники не готовы были предоставлять доступ к своим базам напрямую. Вариантом возможности получения данных из таких систем, удовлетворяющим всех: и системы-источники, и нас, как агрегаторов информации, – стало хранилище S3. Данный подход работает следующим образом: система-источник раз в определенный период (например, раз в день) выгружает некоторый срез данных из своей БД и кладет их в S3 в виде файла. Срез данных, который выгружает источник, может быть, как полным (все данные по таблице), так и частичным (только инкрементные данные). Содержание среза определяется договорённостями между системой-источником и системой-приёмником, поскольку оно зависит от алгоритма, которым эти данные далее будут обработаны. Таким образом возникла необходимость настроить в нашем фреймворке поддержку репликации данных из S3 в Apache Hive.
Примечание: приведённый в статье код упрощен во избежание необходимости описания структуры проекта.
Постановка задачи
Пусть у нас есть хранилище S3, в котором есть несколько бакетов (buckets). Бакет – специальный контейнер с уникальным ID. Пусть каждый бакет соответствует одному источнику.
В бакете может быть несколько папок и каждая папка относится к отдельной загрузке. В папке может находиться несколько файлов, причём разные файлы могут содержать данные из разных таблиц. Для идентификации принадлежности файла к таблице в имени файла обязательно есть имя таблицы, из которой эти данные берутся, и timestamp – время формирования файла. Кроме того, в имени файла может быть ещё какая-либо информация, поэтому для извлечения имени таблицы и даты используется уникальная маска для каждого решения. Все файлы имеют расширение csv.
Например, имена файлов могут быть следующими: table_1_20230620_122145.csv
и 20230523_231243_workers.csv
. Тогда маски этих файлов можно представить следующим образом: employee_table_*.csv
и worker_*.csv
соответственно.
За одну загрузку нам необходимо читать все файлы из одной папки по определённой маске, обрабатывать их имеющимися алгоритмами (об алгоритмах будет написано в пунктах ниже), и далее записывать их в Hive в виде таблицы. Получившиеся таблицы будут готовы к использованию нашими заказчиками для решения их задач.
На рисунке показана описанная выше структура хранилища. Отмечу, что в S3 объекты физически хранятся в плоском адресном пространстве, без иерархической структуры – в виде пар ключ-значение. Каждый объект в S3 имеет свой уникальный идентификатор, по которому можно напрямую к нему обратиться. Однако мы можем организовать привычный для нас вариант хранения файлов, имитирующий файловое хранилище. В этом случае идентификатор объекта будет выглядеть как путь в файловой системе, например, /data/files/file1.txt
. При этом физического представления папок в S3 нет, но мы можем формально называть промежуточные значения папками для упрощения. В примере выше папками можем назвать «data» и «files».
Начало разработки
Сначала мы решили полностью сделать чтение и обработку данных через Spark. Для этого мы сделали тривиальный код для чтения файлов csv из папки:
val delimiter = ";"
val header = "true"
val path = "/folder1/table1_*.csv"
spark.read
.format("csv")
.option("header", header)
.option("delimiter", delimeter)
.option("inferSchema", "true")
.withColumn("filename", input_file_name)
.load(path)
path – это папка, в которой лежат необходимые файлы, и маска, по которой необходимо извлечь только нужные файлы, т.к. в папке могут быть файлы, которые относятся к этой же загрузке, но к другой таблице на источнике;
delimiter – разделитель столбцов в фале csv;
Установили параметр inferSchema в true для автоматического определения типов столбцов;
Добавили колонку «filename» с именем файла в DataFrame, чтобы по нему можно было узнать, из какого файла была извлечена строка таблицы.
Далее мы переделали этот код так, чтобы он мог читать файлы из S3:
spark
.sparkContext
.hadoopConfiguration
.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark
.sparkContext
.hadoopConfiguration
.set("fs.s3a.endpoint", <endpoint>)
spark
.sparkContext
.hadoopConfiguration
.set("fs.s3a.access.key",<accessKey>)
spark
.sparkContext
.hadoopConfiguration
.set("fs.s3a.secret.key",<secretKey>)
val delimiter = ";"
val header = "true"
val path = "/folder1/table1_*.csv"
spark.read
.format("csv")
.option("header", header)
.option("delimiter", delimiter)
.option("inferSchema", "true")
.withColumn("filename", input_file_name)
.load(path)
Для поддержки чтения из S3 мы всего лишь добавили параметры конфигурации. Это всё благодаря нативной поддержке S3 в Spark.
Рассмотрим подробнее указанные параметры:
Реализацию файловой системы, с которой мы хотим работать. Есть 3 реализации S3: s3, s3n и s3a. Мы выбрали s3a, поскольку она является собственной файловой системой для чтения и записи обычных файлов на S3, поддерживает все возможности s3n, но также является более производительным и допускает возможность работы с файлами более 5 Гб. Подробнее о данных реализациях можно узнать тут;
Endpoint S3;
Открытый ключ для доступа к бакету
accessKey
;Закрытый ключ для доступа к бакетy
secretKey
.
Нам также понадобился JAR-файл hadoop-aws-2.7.1
в пути к классам. Данный JAR содержит класс org.apache.hadoop.fs.s3a.S3AFileSystem
, который нам необходим.
Теперь код для чтения файлов с S3 готов. Далее результат чтения подается на вход алгоритмам append, replace или scd2, которые были реализованы ранее для других типов источников.
Первый запуск данной реализации оказался неудачным. Появилась следующая ошибка: Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
. Такая проблема возникает, когда ваш сервер имеет самоподписанный сертификат. Есть два способа решить это:
Отключить проверку ssl-сертификатов при запуске;
Добавить этот сертификат в список доверенных сертификатов JVM.
Для удобства разработки и тестирования программы мы использовали первый вариант решения. Отключить проверку сертификатов в spark-приложении можно, указав следующие команды в конфигурацию spark-submit
:
spark-submit
--conf spark.driver.extraJavaOptions=-Dcom.amazonaws.sdk.disableCertChecking=true
--conf spark.executor.extraJavaOptions=-Dcom.amazonaws.sdk.disableCertChecking=true …
Такой вариант решения допустим только при разработке, но в продакшн такое решение выводить недопустимо. Поэтому в продуктивном контуре необходимо было настроить добавление сертификата в список доверенных. Для этого мы сделали следующее:
Экспортировали SSL-сертификат с помощью браузера. Имя файла сертификата выглядит примерно так
cert_name.crt
;Создали хранилище доверенных сертификатов и добавили в него сертификат:
keytool -import -alias <any alias>
-file cert_name.crt -keystore <the truststore file name> -storepass <password>;
В команду
spark-submit
добавили путь к созданному хранилищу доверенных сертификатов:
spark‑submit
‑-conf
'spark.executor.extraJavaOptions=‑Djavax.net.ssl.trustStore=<the truststore filename>
‑Djavax.net.ssl.trustStorePassword=<password>'
‑-conf
'spark.driver.extraJavaOptions=‑Djavax.net.ssl.trustStore=<the truststore filename>
‑Djavax.net.ssl.trustStorePassword=<password>' …
После установки сертификатов удалось запустить загрузку. Но в процессе анализа и тестирования описанной реализации оказалось, что она имеет ряд проблем и ограничений:
-
Не учитываются прочитанные файлы.
При данном варианте чтения мы каждый раз берем все файлы по определенной маске в папке и никак не помечаем то, что мы их уже прочитали и в следующий раз их обрабатывать не нужно.
-
inferSchema работает долго.
На больших файлах (более 1 Гб) наблюдается значительное проседание по скорости чтения файла при использовании параметра inferSchema. Кроме того, не все типы данных были определены верно.
-
Проблемы пустых файлов.
Требуется явное понимание причин присутствия пустых файлов, т.к. непонятно, как их обрабатывать: считать ошибкой или на источнике просто нет данных? Этот вопрос также необходимо было решить, поскольку отсутствие файла фреймворком понимается как пустая таблица на источнике. При этом алгоритм scd2 чувствителен к такой ситуации, поскольку он посчитает, что все данные неактуальны, а на следующий день снова эти данные переоткроет.
-
Алгоритм SCD2 работает некорректно.
Алгоритм SCD2 является одним из типов алгоритма SCD (медленно меняющиеся измерения от англ. Slowly Changing Dimensions). SCD — механизм отслеживания изменений в данных измерения в терминах хранилища данных.
SCD2 использует добавление новой строки и дополнительных столбцов. Такой подход позволяет сохранить историчность. Дополнительно можно добавить служебные столбцы, которые могут отвечать за версионирование, статус, временной интервал, в течение которого данные строки можно считать актуальными. Про алгоритм SCD2 можно подробнее прочитать здесь.
Алгоритм SCD2 предполагает, что на вход приходит полный срез данных с источника, далее он сравнивает эти данные с приемником на предмет обновления каких-либо столбцов. В случае с файлами у нас может прийти сразу несколько файлов по одной и той же таблице, а значит в этом случае мы имеем несколько срезов по таблице в разные промежутки времени, что порождает большое количество дублей. Из-за этого алгоритм не может понять, как строить историчность, какая строка была раньше, а какая позже.
Учёт прочитанных файлов
В начале разработки у нас не было никаких требований по удалению прочитанных файлов, поэтому удалять мы их не стали. Соответственно перед нами встала задача: как тогда понимать, какие файлы мы уже прочитали и обработали при прошлом запуске, а какие – нет и их необходимо обрабатывать сейчас. Мы стали искать решение этой проблемы при помощи средств spark, но такого варианта нам найти не удалось. Поэтому было решено не читать файлы сразу по маске непосредственно через spark, а сначала отдельным механизмом найти имена файлов, которые нам надо обрабатывать, и список этих файлов уже подать на вход spark.
Для этих целей мы решили использовать класс файловой системы org.apache.hadoop.fs.FileSystem
. Это абстрактный базовый класс для довольно общей файловой системы. Она может быть реализована как распределенная файловая система или как «локальная», отражающая локально подключенный диск. С помощью него можно легко работать с файлами: просматривать содержимое папок, искать файлы по маске, удалять и перемещать отдельные файлы.
Для того, чтобы создать реализацию S3, необходимо в качестве аргумента передать URI нашего S3-хранилища и его конфигурацию. Описание этих параметров уже есть в пункте «Начало разработки». Аналогичные параметры мы указываем и для FileSystem.
val conf = new Configuration()
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("fs.s3a.endpoint", "<endpoint>")
conf.set("fs.s3a.access.key", "<accessKey>")
conf.set("fs.s3a.secret.key", "<secretKey>")
После перечисленных настроек мы можем создать класс для манипуляций с файловой системой. Для этого сначала составим URI для подключения, который состоит из префикса s3a://
и имени бакета в S3:
val bucketPath = s"s3a://${bucketName}/"
val s3Path = new URI(bucketPath)
val s3FileSystem = FileSystem.get(s3Path, conf)
Теперь мы можем извлечь имена файлов по заданной маске:
s3FileSystem.globStatus(new Path(s"$directory/$fileMask")).filter(_.isFile)
Однако это только половина решения. Как нам теперь понять, какие из полученных файлов уже были обработаны, а какие – нет. Первым вариантом было читать имена файлов прямо из таблицы Hive, в которую мы и записываем содержимое этих файлов, но этот вариант имеет две проблемы:
Скорость получения списка прочитанных файлов. Чтобы получить прочитанные файлы, нам нужно обратиться к таблице и через distinct получить этот список. Но у нас в таблицах хранится большое количество данных, что может сильно замедлить работу программы;
При работе некоторых алгоритмов (например, scd2) могут отсутствовать новые данные, из-за чего имя файла не запишется в таблицу, а значит мы будем этот файл обрабатывать несколько раз, что некорректно.
Поэтому было решено писать в отдельный файл в hadoop список прочитанных файлов после загрузки данных с помощью spark. Код этой записи выглядит следующим образом:
val dirPath = "<путь к папке со списком прочитанных файлов>"
val tempPath = s"${dirPath}_tmp"
val dfNewReadFiles = listFiles.toDF("filename")
val dfAllReadFiles =
if (dirExists(dirPath)) {
spark
.read
.parquet(dirPath)
.write
.mode(SaveMode.Overwrite)
.parquet(tempPath)
dfNewReadFiles union spark.read.parquet(tempPath)
}
else dfNewReadFiles
dfAllReadFiles.repartition(1).write.mode(SaveMode.Overwrite).parquet(dirPath)
fs.delete(new Path(tempPath), true)
Здесь мы записываем и ранее прочтенные файлы, и те, что прочитали и обработали на текущей итерации.
Теперь мы можем легко получить список необработанных файлов с s3. Доработаем код чтения файлов с s3, который приведен выше, добавив в него фильтр по уже прочитанным файлам:
val readedFiles =
app.spark.read
.parquet(readedFilesDirPath)
.collect
.map(_.getString(0))
.toList
s3FileSystem.globStatus(new Path(s"$s3DirectoryPath/$fileMask"))
.filter(_.isFile)
.map(file => s"$s3DirectoryPath/${file.getPath.getName}")
.diff(readedFilesDirPath)
.toList
.sorted
Отказ от inferSchema в пользу указания типов колонок
Как уже говорилось выше, inferSchema замедляет работу программы и не всегда верно проставляет типы. Это связано с тем, что для определения типа колонки при указании этого параметра spark читает входной файл как минимум один раз целиком для определения типов, а после этого уже с полученной схемой читает файл для дальнейшей обработки. Кроме того, в колонке может не быть ни одного значения. В этом случае программе вообще непонятно, какой тип проставлять, и она выбирает самый безопасный и широкий тип – строку.
По этим причинам лучшим решением было отказаться вообще от использования параметра inferSchema, а вместо него непосредственно указывать колонки и их типы. Для этого в аргументах нашей программы было решено принудительно указать параметр columns, который будет обязателен при s3-загрузках. Этот параметр представляет собой список пар: колонка и её тип.
Далее исправим код чтения файлов с указанием колонок:
val delimiter = ";"
val header = "true"
val path = "/folder1/table1_*.csv"
val columns = List(("name", StringType), ("age", IntegerType), ...)
val customSchema = StructType(
columns
.map(column => StructField(column._1, column._2, true)))
spark.read
.format("csv")
.option("header", header)
.option("delimiter", delimiter)
.option("inferSchema", "true")
.schema(customSchema)
.withColumn("filename", input_file_name)
.load(path)
Обработка пустых файлов
Разберём сначала, насколько для наших алгоритмов критично то, что на вход приходит пустой файл.
Для алгоритма replace, который выполняют полную перезапись данных на приёмнике, пустой файл просто удалит все данные с приёмника и получится пустая таблица. Для алгоритма append пустой файл не сделает ровным счётом ничего и данные останутся в неизменном виде. А вот для SCD2 это означает, что данных на источнике больше нет и все имеющиеся актуальные строки необходимо пометить как удалённые.
Было решено пустые файлы вообще не обрабатывать фреймворком и считать их ошибочными, поскольку вероятность того, что таблица действительно на источнике пуста – минимальна, а пустой файл с большей долей вероятности является ошибкой чтения данных с источника. Кроме того, такой вывод был сделан также на основе запросов пользователей по следующим причинам:
При работе алгоритма replace лучше, чтобы оставались хотя бы какие-то данные, пусть и не совсем актуальные. При этом у каждой строки отмечается дата и время их загрузки, что позволяет безошибочно определить их актуальность;
При работе алгоритма SCD2 необходимо поддерживать актуальность данных. Если пришел пустой файл, то все актуальные строки пометятся, как удалённые. Затем, когда придёт новый непустой файл, появятся новые актуальные строки с теми же данными, из-за чего у нас получится «дырка», в которой все данные были неактуальны. Такая ситуация критична для пользователей данных.
По этим причинам перед чтением самих файлов мы добавили проверку на пустоту каждого нового файла. Если файл пуст, то мы его не обрабатываем, а просто добавляем к списку прочитанных файлов, чтобы далее их не учитывать.
// filePaths - список путей к файлам в S3
// read - функция для чтения данных с файла в DataFrame при помощи spark
val filesFromSource =
filePaths
.map(filePath =>
(
read(filePath).withColumn("filename", input_file_name),
filePath
)
)
filesNamesForLoad =
filesFromSource
.filterNot(x => x._1.isEmpty)
.map(_._2)
Теперь у нас остались только файлы с данными, которые далее отправляются на обработку алгоритмом.
Алгоритм SCD2 для файлов
В пункте «Начало разработки» мы описали проблему работы алгоритма SCD2 с несколькими файлами, поэтому классическим алгоритмом SCD2 мы не сможем обработать данные из файлов. Нам необходимо модифицировать наш алгоритм следующим образом:
Файлы будем читать последовательно в хронологическом порядке. Этот порядок будем определять по дате и времени, которые указаны в имени файла (в пункте «Постановка задачи» мы говорили об этом). Можно было взять время модификации файла из метаинформации, но этот подход может сломать хронологию, если по какой-либо причине файл будет перезагружен позднее, чем файл с более новым срезом таблицы источника;
Для данных из каждого файла будем применять классический алгоритм SCD2. В этом случае он будет работать корректно, поскольку файлы приходят в хронологическом порядке.
Рассмотрим код нового алгоритма. Метод read()
теперь выглядит следующим образом:
def read(filesList: List[String]): List[DataFrame] = {
filesList.map(filename =>
spark
.read
.format("csv")
.option("header", header)
.option("delimiter", delimiter)
.load(filename)
.withColumn("filename", input_file_name)
)
}
Мы получаем на вход список имен файлов, читаем их по отдельности, добавляем имя файла в качестве дополнительной колонки и возвращаем список DataFrame. Каждый из DataFrame содержит данные из одного файла. Далее этот список передается на вход алгоритму SCD2:
val fileNamesIterator = filesList.iterator
val sourceDataframes: List[DataFrame] = read(filesList)
val countFiles = sourceDataframes.length
sourceDataframes.map { data =>
fileName = fileNamesIterator.next()
val transformedData = scd2run(data)
val status = load(transformedData)
status
}
В данном фрагменте кода мы читаем файлы в sourceDataframes
, обрабатываем список полученных DataFrame по очереди и каждый отправляем на обработку методу scd2run
, а затем записываем данные на приёмник.
Заключение
Мы рассмотрели вариант настройки взаимодействия Apache Spark и S3 для обработки файлов, который у нас получилось реализовать на своем проекте. Данная статья может помочь вам разобраться с подобными проблемами и быстрее настроить работу с S3.