Фреймворк Spark позволяет выполнять множество различных операций с распределенными наборами данных. При этом, объем обрабатываемых данных может быть достаточно большим. Конечно, можно сохранять обрабатываемую информацию в файлы, но что делать, если набор данных не умещается на одном компьютере или на одном дисковом хранилище.

Для решения данной проблемы фреймворк поддерживает широкий диапазон различных механизмов ввода/вывода. Это можно объяснить в том числе и тем, что Apache Spark создавался в экосистеме Hadoop, предназначенной для работы с большими данными. Так, для доступа к данным Spark использует интерфейсы InputFormat и OutputFormat из Hadoop MapReduce, программной платформы, предназначенной для создания заданий, обрабатывающих большие объемы данных. А данные интерфейсы, в свою очередь поддерживают множество форматов файлов и систем хранения (SЗ, HDFS, Cassandra, HBаsе и т. д.)

Форматы файлов

Файлы с данными традиционно могут иметь различные форматы от неструктурированных и полуструктурированных, таких как текст и JSON, до полностью структурированных, таких как SequenceFiles, используемых в Hadoop. Кроме того, для всех допустимых форматов Spark прозрачно поддерживает сжатие, опираясь на расширения в именах файлов.

Дополнительным средством, позволяющим работать с механизмами вывода является Hadoop API, работающий с файлами, хранящими данные в виде пар ключ/значение. Однако эти программные интерфейсы можно использовать только для работы с данными в виде пар ключ/значение из-за требований Hadoop, даже при том, что в некоторых форматах ключи игнорируются. В случаях, когда формат игнорирует ключ, обычно используется ложный ключ (например, null).

Для начала рассмотрим самый простой формат – текстовый. Если нам необходимо загрузить данные из единственного текстового файла, то необходимо вызвать функцию textFile () объекта SparkContext, как показано в примере ниже для Python.

 input = sc.textFile("file:///home/holden/repos/spark/README.md")

Соответственно, сохранение в текстовый файл на Python будет выполнено следующей командой:

result.saveAsTextFile(outputFile)

Если мы имеем дело с множеством файлов, находящихся в каталоге, из которых нужно загрузить данные, то можно просто вызвать тот же самый метод textFile () и передать ему путь к каталогу, и в этом случае он загрузит содержимое всех файлов в набор RDD (Resilient Distributed Dataset, неизменяемая коллекция объектов данных.).

Но в некоторых случаях нам важно знать, какой файл соответствует какой части данных (например, данные за период времени с ключом в файле), или требуется обработать весь файл целиком. Для небольших файлов можно воспользоваться методом SparkContext. wholeTextFiles () и получить набор RDD с парами ключ/значение, роль ключей в котором будут играть имена файлов. Этот метод может быть очень полезен, когда каждый файл представляет данные за определенный период времени. Например, если нам нужно вычислить среднее за каждый период. Так, код ниже берет из файла данные о продажах и вычисляет среднее за каждый период.

val input = sc.wholeTextFiles("file://home/holden/salesFiles")

val result = input.mapValues { y =>

   val nums = y.split(" ") .map(x => x.toDouble)

   nums.sum / nums.size.toDouble

}

Отдельно хотелось бы отметить возможности по работе с форматом JSON. Проще всего, прочитать их как текстовый файл и затем отобразить в значения с помощью парсера JSON. Аналогично с помощью предпочтительной библиотеки поддержки формата JSON можно осуществлять преобразование значений в строки и выводить их в файл.

Для работы с форматом JSON существует множество различных библиотек, но в простейшем случае, для Python можно воспользоваться встроенной библиотекой json.

import json

data = input.map(lambda х: json.loads(x))

При этом, важно понимать, что обработка неправильно сформированных записей может представлять большую проблему, особенно в полуструктурированных форматах, таких как JSON. Так как набор данных, которых мы обрабатываем может быть очень большим, вероятность столкнуться с неправильно сформированной записью там является ненулевой, и нам важно корректно обрабатывать подобные ситуации.

Сохранение записей в этом плане является гораздо более простой операцией, так как нам не нужно беспокоиться об ошибках форматирования и, к тому же, мы точно знаем тип записываемых данных.

Разделители

Еще один широко распространенный формат данных это CSV (Comma-Separated Values). Данные файлы, как предполагается, содержат фиксированное число полей в каждой строке, и эти поля, как правило, разделены запятыми. При этом, каждая запись должна храниться в отдельной строке, хотя иногда можно встретить записи, занимающие по нескольку строк. Файлы CSV иногда могут быть непоследовательными, особенно в отношении символов перевода строк, экранирования и отображения нe-ASCII символов или нецелых чисел. Формат CSV не предусматривает поддержку вложенных полей, поэтому упаковывать и распаковывать такие поля приходится вручную. Стоит также отметить, что в отличие от полей JSON, записи в формате CSV не имеют имен полей. Обычно, в файле CSV отводят первую строку под запись, поля которой содержат имена соответствующих полей.

Для загрузки CSV в Python проще всего воспользоваться одноименной библиотекой, как показано в примере ниже.

import csv

import StringIO

def loadRecord(line):

   input = StringIO.StringIO(line)

   reader = csv.DictReader(input,

      fieldnames= [ "name", "favouriteAnimal"])

   return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

 Однако, здесь не все так просто. Мы можем столкнуться с ситуацией, когда в полях будут присутствовать символы перевода строки, и это может стать дополнительной проблемой, требующей решения. В таком случае нам придется каждый файл загружать целиком и выполнять его парсинг. И здесь кроется существенный риск, потому что, если каждый файл будет иметь большой размер, загрузка и парсинг могут потребовать большого объема ресурсов и это может стать узким местом в приложении.

Ниже пример загрузки всех записей.

def loadRecords(fileNameContents):

   input = StringIO.StringIO(fileNameContents[1])

   reader = csv.DictReader(input,

      fieldnames = ["name", "favoriteAnimal"])

   return reader

fullFileData; sc.wholeTextFiles(inputFile) .flatMap(loadRecords)

Запись данных в формате CSV в Python

def writeRecords(records):

   output = StringIO.StringIO()

   writer = csv.DictWriter(output,

      fieldnames= ["name", "favoriteAnimal"])

   for record in records:

      writer.writerow(record)

    return [output.getvalue()]

panda.mapPartitions(writeRecords).saveAsTextFile(outputFile)

SequenceFiles

Еще один популярный формат файлов, используемый в Hadoop и состоящих из пар ключ/значение это SequenceFiles. Данный формат имеет метки синхронизации, что позволяет фреймворку Spark находить нужную точку в файле и повторно синхронизировать с границами записей. Таким образом, мы можем обеспечить высокую эффективность параллельного чтения файлов в формате SequenceFiles сразу несколькими узлами. Поддержка SequenceFiles состоит из элементов, реализующих интерфейс Hadoop Writable, так как Hadoop использует собственную инфраструктуру сериализации.

Фреймворк Spark имеет специализированный API интерфейс для чтения данных в формате SequenceFiles: мы можем использовать метод sequenceFile(path, keyClass, valueClass, minPartitions) объекта SparkContext. Как отмечалось выше, формат SequenceFiles поддерживается классами, реализующими интерфейс WritaЬle, поэтому оба аргумента — keyClass и valueClass — должны быть классами, реализующими интерфейс WritaЬle. Давайте рассмотрим пример загрузки информации о людях и числе панд из файла в формате SequenceFile.

Загрузка данных в формате SequenceFile в Python

val data = sc.sequenceFile(inFile,

   *org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritаblе")

Заключение

В этой статье мы рассмотрели несколько примеров работы с данными в Apche Spark. Стоит отметить, что это не исчерпывающий список, так как за рамками статьи остались объектные файлы и другие форматы Hadoop.


Приглашаем на бесплатные открытые занятия, которые пройдут в рамках курса «Spark Developer»:

Комментарии (0)