Spark можно определить как вычислительный движок с открытым исходным кодом, функциональный подход к параллельной обработке данных на компьютерных кластерах, а также как набор библиотек и выполняемых файлов.

или как фреймворк для распределённой пакетной и потоковой обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop.

Архитектура Apache Spark

Экосистема

Компоненты

Spark содержит 5 основных компонентов. Это Spark Core, Spark SQL, Spark Streaming, Spark MLlib и GraphX.

  1. Spark Core включает функции управления памятью, а также восстановлению в случае аварий, планирования задач в кластере и взаимодействие с хранилищем.

  2. Spark SQL — механизм запросов SQL, который поддерживает различные источники данных и использует такую структуру данных, как DataFrame.

  3. Spark Streaming — обработка потоковых данных в режиме реального времени.

  4. MLlib — библиотека для машинного обучения.

  5. GraphX — библиотека для работы с графами.

Cluster Manager

Cluster Manager занимается управлением реальными машинами кластера и контролирует выделение ресурсов для Spark-приложений. Cluster Manager могут быть Standalone cluster, Apache Mesos и YARN.

YARN есть в Hadoop 2.0. Существует два режима, которые доступны при запуске приложений на YARN:

  • yarn-client режим. Запуск драйвера используются только для запроса ресурсов из YARN.

  • yarn-cluster режим, драйвер Spark запускается в ведущем процессе приложения, который управляем YARN в кластере, а клиент может быть удаленным.

Catalyst Optimizer

Catalyst Optimizer: ввиду того что Spark SQL — это наиболее частый и удобный в использовании компонент Spark, который поддерживает как SQL‑запросы, так и DataFrame API, оптимизатор Catalyst использует расширенные функции языка программирования для построения более оптимальных запросов. Он позволяет добавлять новые методы и функций оптимизации, предоставляет возможности расширять функционал оптимизатора. При этом Catalyst Optimizer используется в том числе и для увеличения скорости выполнения задач, а также с целью оптимизации использования ресурсов.


Процесс вычислений

В процесс вычислений вовлечены несколько основных его частей

  • DRIVER: исполнитель программы

    • исполняет код программы (преобразование пользовательской программы в задачи)

    • планирует и запускает работу executors

    Каждое приложение Spark содержит драйвер, который инициирует различные операции в кластере. Процесс драйвера запускает пользовательский код, который создает SparkContext, создает RDDs и выполняет преобразования и действия. Если драйвер завершает работу, приложение закончено.

  • EXECUTOR: выполняет вычисления

    • исполняет код от driver

    • передает driver'у информацию о процессе вычислений

  • CLUSTER MANAGER: занимается управлением реальными машинами кластера и контролирует выделение ресурсов для Spark-приложений

    • standalone, YARN, Mesos

Обработка в памяти (in-memory processing). Spark хранит и обрабатывает данные в оперативной памяти. Такой подход обеспечивает гораздо большую скорость, чем загрузка и чтение данных с диска, как в MapReduce.

Параллельная обработка и комбинирование операций. Spark распределяет данные и вычисления по нескольким узлам в кластере, выполняя разные операции обработки параллельно в режиме реального времени. Это отличает его от MapReduce, при использовании которого каждый следующий этап работы с датасетом требует завершения предыдущего. Благодаря этим особенностям Spark в десятки раз быстрее, чем MapReduce.

Workflow в Apache Spark:

  • У распределённой системы есть менеджер кластеров (cluster manager), контролирующий распределение ресурсов между исполнителями.

  • При запуске Spark-приложения драйвер запрашивает у менеджера кластеров ресурсы для запуска исполнителей.

  • Если ресурсы есть, менеджер кластеров запускает исполнители.

  • Драйвер отправляет доступным исполнителям свои задачи.

  • Исполнители проводят необходимые вычисления и отправляют результат драйверу.

  • После завершения всех вычислений исполнители закрываются, их ресурсы возвращаются в общий кластер. После этого распределённая система может работать над другими задачами.


Структуры данных

Хронология появления структур данных по версиям Spark:

  • RDD — начиная с Spark 0 (Low level API)

    • Resilient Distributed Dataset — это определенный набор объектов, разбитых на блоки partitions. RDD может быть представлен как в виде структурированных наборов данных так и неструктурированных. Partitions могут храниться на разных узлах кластера. RDD отказоустойчивы и могут быть восстановлены в случае сбоя.

  • DataFrame — начиная с Spark 1.3 (Structured API)

    • это набор типизированных записей, разбитых на блоки. Иными словами — таблица, состоящая из строк и столбцов. Блоки могут обрабатываться на разных нодах кластера. DataFrame может быть представлен только в виде структурированных или полуструктурированных данных. Данные представлены именованным набором столбцов, напоминая таблицы в реляционных БД.

  • DataSet — начиная с Spark 1.6 (Structured API)

    • это набор записей типа Row, разбитых на блоки

Рассмотрим поведение указанных структур данных в контексте Immutability (неизменяемости) и Interoperability (совместимости)**:

RDD состоит из набора данных, которые могут быть разбиты на блоки. Блоком, или партицией, можно считать цельную логическую неизменяемую часть данных, создать которую можно в том числе посредством преобразования (transformations) уже существующих блоков.

DataFrame можно создать из RDD. После подобного transformation вернуться к RDD уже не получится. То есть исходный RDD не подлежит восстановлению после трансформации в DataFrame.

DataSet: функционал Spark позволяет преобразовывать как RDD так и DataFrame в сам DataSet. dataset - это типичированный Dataframe.

Если в Dataframe можно обращаться к колонке, в то время как в Dataset колонка - это отдельный объект. dataframe — это dataset типа raw.

Источники для структур данных

Data Sources API

  • RDD — Data source API позволяет RDD формироваться из любых источников включая текстовые файлы, при этом необязательно даже структурированных.

  • DataFrame — Data source API позволяет обрабатывать разные форматы файлов (AVRO, CSV, JSON, а также из систем хранения HDFS, HIVE** таблиц, MySQL).

  • DataSet — Dataset API также поддерживает различные форматы данных.

Spark DataFrame может быть создан из различных источников:

Примеры формирования Spark DataFrame


Spark и Hadoop

Отношение между Spark и Hadoop играет роль, только если вы хотите запустить Spark в кластере, на котором установили Hadoop. В противном случае вам не нужен Hadoop, чтобы запустить Spark.

Чтобы запустить Spark в кластере, вам нужна совместно используемая файловая система. Кластер Hadoop обеспечивает доступ к распределенной файловой системе через HDFS и кластерного менеджера в форме YARN. Spark может использовать YARN в качестве кластерного менеджера по распределительной работе и использовать HDFS, чтобы получить доступ к данным. Кроме того, некоторые приложения Spark могут использовать модель программирования MapReduce, но MapReduce не является базовой моделью для вычислений в Spark.

Ленивые вычисления. 

Lazy evaluation: Spark использует концепцию отложенного выполнения вычислений. Это означает, что операции над данными проводятся только перед непосредственным использованием результатов этих операций. Благодаря этому вычислительные мощности не тратятся на вычисления, которые понадобятся когда-то в будущем.

Transformations

Transformations (преобразования) является операциями на существующем RDD, которые возвращают новый RDD.

Actions

Actions (Действия) вычисляет конечный результат на основе RDD и или возвращает тот результат к программе драйвера или сохраняет его во внешнюю систему хранения, такую как HDFS. Преобразования возвращают RDDs, тогда как действия возвращают другие типы данных.


Spark Application

Приложения Spark включают в себя программу драйвера и исполнителей и запускают различные параллельные операции в кластере. Существует два типа приложений Spark: приложения Spark notebook и приложения Spark batch.

Driver ~ application
Driver ~ application

Spark Application разделяется на Job'ы. 1 Job = 1 Action (неленивая трансформация).

Job разделяется на Stages (suffle порождает stages).

Внутри приложения Spark (экземпляра SparkContext) несколько параллельных job'ов могут выполняться одновременно, если они были отправлены из отдельных потоков. Под «job'ом» в этом подразумевается действие Spark (например, savecollect) и любые задачи, которые необходимо выполнить для оценки этого действия. Планировщик Spark полностью потокобезопасен и поддерживает этот вариант использования, чтобы включить приложения, которые обслуживают несколько запросов (например, запросы для нескольких пользователей).

По умолчанию планировщик Spark запускает задания в режиме FIFO. Каждое задание делится на этапы «stages» (например, фазы map и reduce), и первый job получает приоритет на всех доступных ресурсах, пока на его этапах есть задачи для запуска, затем второй job получает приоритет и т. д. Если job в начале очереди не нужно использовать весь кластер, более поздние job могут начать выполняться сразу, но если job'ы в начале очереди большие, то более поздние job'ы могут значительно задерживаться.

SparkConf

SparkConf хранит параметры конфигурации приложения, развертываемого на Spark. Каждое приложение должно быть сконфигурировано до того, чтобы быть развернутым на кластере Spark. Некоторые параметры конфигурации задают свойства приложения, и некоторые используются Spark, чтобы выделить ресурсы в кластере. Параметры конфигурации передаются на кластер Spark через SparkContext.

SparkContext

 SparkContext представляет связь с кластером Spark. Это - точка входа к Spark и настраивает внутренние сервисы, необходимые, чтобы установить связь со средой выполнения Spark

Spark UI

Руководство для начинающих по Spark UI: Как отслеживать и анализировать задания Spark

Spark Jobs optimization

Динамическое распределение ресурсов

Spark предоставляет механизм для динамической корректировки ресурсов, которые занимает приложение, в зависимости от рабочей нагрузки. Это означает, что приложение может возвращать ресурсы кластеру, если они больше не используются и запрашивать их снова позже, когда возникнет потребность. Эта функция особенно полезна, если несколько приложений совместно используют ресурсы в кластере Spark.


Оптимизация работы Spark Jobs

  • Выбор правильных структур данных
    Хотя RDD является базовой структурой данных Spark, это API более низкого уровня, который требует более подробного синтаксиса и не имеет оптимизаций, предоставляемых структурами данных более высокого уровня. Spark перешел к более удобному и оптимизированному API с введением DataFrames — абстракций более высокого уровня, построенных поверх RDD. Данные в DataFrame организованы в именованные столбцы, что делает их более похожими на данные в реляционной базе данных. Операции DataFrame также выигрывают от Catalyst, оптимизированного механизма выполнения Spark SQL, который может повысить вычислительную эффективность, потенциально улучшая производительность. Преобразования и действия могут выполняться в DataFrames так же, как в RDD.

  • Кеширование
    При вычислении Actions происходит дополнительное чтение (несмотря на то что датафрейм уже предварительно сформирован = считан). Чтобы избежать лишнего чтения можно сделать кеширование.

    Кэширование — это важный метод, который может привести к значительному повышению вычислительной эффективности. Часто используемые данные и промежуточные вычисления можно кэшировать или сохранять в области памяти, что позволяет ускорить их извлечение. Spark предоставляет встроенную функцию кэширования, которая может быть особенно полезна для алгоритмов машинного обучения, обработки графов и любых других приложений, в которых к одним и тем же данным необходимо обращаться многократно. Без кэширования Spark пересчитывал бы RDD или DataFrame и все его зависимости каждый раз при вызове действия.

    Следующий блок кода Python использует PySpark, API Python Spark, для кэширования DataFrame с именем df:

    df.cache()

    Важно помнить, что кэширование требует тщательного планирования, поскольку оно использует ресурсы памяти рабочих узлов Spark, которые выполняют такие задачи, как выполнение вычислений и хранение данных. Если набор данных значительно превышает доступную память или вы кэшируете RDD или DataFrames без их повторного использования на последующих этапах, потенциальное переполнение и другие проблемы управления памятью могут привести к снижению производительности.

  • Data Partitioning
    Архитектура Spark построена на основе партиционирования, разделения больших объемов данных на более мелкие, более управляемые единицы, называемые партициями. Разделение позволяет Spark обрабатывать большие объемы данных параллельно, распределяя вычисления по нескольким узлам, каждый из которых обрабатывает подмножество общих данных.

    Хотя Spark предоставляет стратегию партиционирования по умолчанию, как правило, основанную на количестве доступных ядер ЦП, он также предоставляет возможности для настраиваемого партиционирования. Вместо этого пользователи могут указать настраиваемую функцию партиционирования, например, разделение данных по определенному ключу.

  • Количество партиций
    Одним из наиболее важных факторов, влияющих на эффективность параллельной обработки, является количество партиций. Если партиций недостаточно, доступная память и ресурсы могут быть недоиспользованы. С другой стороны, слишком большое количество партиций может привести к увеличению расходов на производительность из-за планирования и координации задач. Оптимальное количество партиций обычно устанавливается как фактор общего количества доступных ядер в кластере.

    Разделы можно задать с помощью repartition() и coalesce(). В этом примере DataFrame переразбивается на 200 партиций:

    df = df.repartition(200) # метод переразбиения
    df = df.coalesce(200) # метод объединения

    Метод repartition() увеличивает или уменьшает количество разделов в RDD или DataFrame и выполняет полную перетасовку данных по всему кластеру, что может быть затратным с точки зрения обработки и сетевой задержки. Метод coalesce() уменьшает количество разделов в RDD или DataFrame и, в отличие от repartition(), не выполняет полную перетасовку, а вместо этого объединяет смежные разделы для уменьшения общего количества.

  • Broadcasting
    Join() — это распространенная операция, в которой два набора данных объединяются на основе одного или нескольких общих ключей. Строки из двух разных наборов данных можно объединить в один набор данных, сопоставив значения в указанных столбцах. Поскольку требуется перетасовка данных между несколькими узлами, join() может быть дорогостоящей операцией с точки зрения сетевой задержки.

  • Фильтрация неиспользуемых данных
    При работе с данными высокой размерности минимизация вычислительных затрат имеет важное значение. Любые строки или столбцы, которые не являются абсолютно необходимыми, должны быть удалены. Два ключевых метода, которые снижают вычислительную сложность и использование памяти — это ранняя фильтрация и обрезка столбцов.

  • Минимизация использования пользовательских функций Python (UDF)
    Один из самых эффективных методов оптимизации PySpark — это использование встроенных функций PySpark, когда это возможно. PySpark поставляется с богатой библиотекой функций, все из которых оптимизированы

Комментарии (0)