Всем привет. В качестве введения, хочется рассказать, как я дошел до жизни такой.
До того как встретиться с Big Data и Spark, в частности, мне довелось много и часто оптимизировать SQL запросы, сначала для MSSQL, потом для Oracle, и вот теперь я столкнулся со SparkSQL.
И если для СУБД уже существует множество хороших книг, описывающих методологию и «ручки», которые можно покрутить для получения оптимального плана запроса, то для Spark такого рода книг я не встречал. На глаза попадались больше статьи и наборы практик, причем больше относящиеся к работе через RDD/Dataset API, а не чистому SQL. Для меня одной из эталонных книг на тему оптимизации SQL является книга Дж. Льюис «Oracle. Основы стоимостной оптимизации». Что-то подобное по глубине проработки я и искал. Почему предметом исследования стал именно SparkSQL, а не API, лежащий в основе? Тут интерес был вызван особенностями проекта, над которым я работаю.
Для одного из заказчиков наша компания разрабатывает хранилище данных, детальный слой которого и часть витрин находится в кластере Hadoop, а итоговые витрины — в Oracle. Данный проект подразумевает обширный слой преобразования данных, который реализован на Spark. Для ускорения разработки и возможности подключения разработчиков ETL, не знакомых с тонкостями технологий Big Data, но хорошо знакомых с SQL и ETL средствами, был разработан инструмент, напоминающий по идеологии прочие ETL средства, например, Informatica, и позволяющий визуально конструировать ETL процессы с последующей генерацией кода для Spark. В силу сложности алгоритмов и большого количества преобразований разработчики в основном используют SparkSQL запросы.
Вот с этого места и начинается история, так как пришлось отвечать на большое количество вопросов вида «А почему запрос не работает/работает медленно/работает не так как в Oracle?». Наиболее интересной частью для меня оказался именно этот: «Почему медленно работает?». Тем более что в отличие от СУБД, с которыми я работал до этого, можно залезть в исходный код и получить ответ на интересующие вопросы.
Ограничения и допущения
Для запуска примеров и анализа исходного кода используется Spark 2.3.0.
Предполагается, что читатель знаком с архитектурой Spark, и общими принципами действия оптимизатора запросов для одной из СУБД. Как минимум, фраза «план запроса» точно не должна вызывать удивления.
Также данная статья пытается не стать переводом кода оптимизатора Spark на русский язык, поэтому для вещей, весьма интересных с точки зрения работы оптимизатора, но которые можно прочитать в исходном коде, тут будут просто кратко упомянуты со ссылками на соответствующие классы.
Переходим к изучению
Начнем с небольшого запроса, чтобы изучить основные стадии, через которые он проходит от синтаксического разбора до выполнения.
scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal")
scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust")
scala> val df = spark.sql("""
| select bal.account_rk, cust.full_name
| from bal
| join cust
| on bal.party_rk = cust.party_rk
| and bal.actual_date = cust.actual_date
| where bal.actual_date = cast('2017-12-31' as date)
| """)
df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string]
scala> df.explain(true)
Основной модуль, отвечающий за разбор SQL, и оптимизацию плана выполнения запроса — Spark Catalyst.
Расширенный вывод при описании плана запроса (df.explain(true)) позволяет отследить все стадии, которые проходит запрос:
- Parsed Logical Plan — получаем после синтаксического разбора SQL. На этом этапе проверяется только синтаксическая корректность запроса.
== Parsed Logical Plan ==
'Project ['bal.account_rk, 'cust.full_name]
+- 'Filter ('bal.actual_date = cast(2017-12-31 as date))
+- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date))
:- 'UnresolvedRelation `bal`
+- 'UnresolvedRelation `cust`
- Analyzed Logical Plan — на этом этапе добавляется информация о структуре используемых сущностей, проверяется соответствие структуры и запрашиваемых атрибутов.
== Analyzed Logical Plan ==
account_rk: decimal(38,18), full_name: string
Project [account_rk#1, full_name#59]
+- Filter (actual_date#27 = cast(2017-12-31 as date))
+- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88))
:- SubqueryAlias bal
: +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc
+- SubqueryAlias cust
+- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc
- Optimized Logical Plan — самое интересное для нас. На данном этапе происходит преобразование получившегося дерева запроса на основании доступных правил оптимизации.
== Optimized Logical Plan ==
Project [account_rk#1, full_name#59]
+- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88))
:- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27]
: +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18))
: +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc
+- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88]
+- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531))
+- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc
- Physical Plan — начинают учитываться особенности доступа к исходным данным, включая оптимизации по фильтрации партиций и данных для минимизации получаемого набора данных. Выбирается стратегия выполнения join (более подробно о доступных вариантах – ниже).
== Physical Plan ==
*(2) Project [account_rk#1, full_name#59]
+- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight
:- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27]
: +- *(2) Filter isnotnull(party_rk#18)
: +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true]))
+- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88]
+- *(1) Filter isnotnull(party_rk#57)
+- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string>
Следующие стадии оптимизации и выполнения (например, WholeStageCodegen) выходят за рамки данной статьи, но весьма подробно (равно как и описанные выше стадии) описаны в Mastering Spark Sql.
Чтение плана выполнения запроса обычно происходит «изнутри» и «снизу вверх», то есть первыми выполняются наиболее вложенные части, и постепенно продвигаются к итоговой проекции, расположенной на самом верху.
Виды оптимизаторов запросов
Можно выделить два вида оптимизаторов запросов:
- Оптимизаторы, основанные на фиксированных правилах (Rule-based optimizator, RBO).
- Оптимизаторы, основанные на оценке стоимости выполнения запроса (Cost-based optimizator, CBO).
Первые заточены на применении набора фиксированных правил, например, применение условий фильтраций из where на более ранних этапах, если это возможно, предвычисление констант и т.д.
CBO оптимизатор для оценки качества полученного плана используют стоимостную функцию, которая обычно зависит от объема обрабатываемых данных, количества строк, попадающих под фильтры, стоимости выполнения тех или иных операций.
Ознакомиться детально с дизайн-спецификацией на CBO для Apache Spark можно по ссылкам: спецификация и основная JIRA задача для реализации.
Отправной точкой для изучения полного набора существующих оптимизаций может послужить код Optimizer.scala.
Вот небольшая выдержка из длинного списка доступных оптимизаций:
def batches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
ConstantPropagation,
........
Следует отметить, что список данных оптимизаций включает в себя как оптимизации, основанные на правилах, так и оптимизации, основанные на оценки стоимости запроса, о которых будет сказано ниже.
Особенностью CBO является то, что для корректной работы ему необходимо знать и хранить информацию по статистике данных, используемых в запросе — количество записей, размер записи, гистограммы распределения данных в столбцах таблиц.
Для сбора статистики используется набор SQL команд ANALYZE TABLE… COMPUTE STATISTICS, кроме того, необходим набор таблиц для хранения информации, API предоставляется через ExternalCatalog, точнее через HiveExternalCatalog.
Так как в настоящий момент CBO по умолчанию отключен, то основной упор будет сделан на исследовании доступных оптимизация и нюансов RBO.
Виды и выбор стратегии join
На стадии формирования физического плана выполнения запроса производится выбор стратегии join. На настоящий момент в Spark доступны следующие варианты (изучение кода можно начать с кода в SparkStrategies.scala).
Broadcast hash join
Наилучший вариант в случае если одна из сторон join достаточно мала (критерий достаточности задается параметром spark.sql.autoBroadcastJoinThreshold в SQLConf). В этом случае данная сторона целиком копируется на все executor'ы, где и происходит hash join с основной таблицей. Кроме размера следует учитывать, что в случае outer join можно копировать только outer сторону, поэтому по возможности в качестве ведущей таблицы в случае outer join необходимо использовать таблицу с наибольшим объемом данных.
Помимо всего прочего, это единственный вариант оптимизации, для которого в коде SQL можно указать хинт по образцу Oracle, в виде /*+ broadcast(t1, t2) */
Sort merge join
С включенной по умолчанию настройкой spark.sql.join.preferSortMergeJoin данный способ применяется по умолчанию, если ключи для join можно отсортировать.
Из особенностей можно отметить, что в отличие от предыдущего способа, оптимизация по кодогенерации для выполнения операции доступна только для inner join.
Shuffle hash join
В случае если ключи не поддаются сортировке, либо отключена настройка выбора sort merge join по умолчанию, Catalyst пытается применить shuffle hash join. Помимо проверки на настройки, проверяется также, что Spark хватит памяти, чтобы построить локальный hash map для одной партиции (общее количество партиций задается настройкой spark.sql.shuffle.partitions)
BroadcastNestedLoopJoin и CartesianProduct
В случае, когда отсутствует возможность прямого сравнения по ключу (например, условие по like) или отсутствуют ключи для соединения таблиц, в зависимости от размера таблиц, выбирается либо данный тип, либо CartesianProduct.
Порядок указания таблиц в join'ах
В любом случае для работы join необходим shuffle таблиц по ключу. Поэтому в настоящий момент порядок указания таблиц, особенно в случае выполнению подряд нескольких join важен (если быть занудой, то в том случае, если не включен CBO, и не включена настройка JOIN_REORDER_ENABLED).
По возможности порядок соединения таблиц должен минимизировать количество shuffle операций для больших таблиц, для чего соединения по одному и тому же ключу должны идти последовательно. Также не стоит забывать про минимизацию данных для join, для возможности включения Broadcast Hash Join.
Транзитивное применение условий фильтрации
Рассмотрим следующий запрос:
select bal.account_rk, cust.full_name
from balance bal
join customer cust
on bal.party_rk = cust.party_rk
and bal.actual_date = cust.actual_date
where bal.actual_date = cast('2017-12-31' as date)
Здесь мы соединяем две таблицы, которые партицированы одинаково, по полю actual_date и накладываем явный фильтр только на партицию по таблице balance.
Как видно из оптимизированного плана запроса, фильтр по дате применяется также и к customer, причем на моменте чтения данных с диска определяется, что необходима ровно одна партиция.
== Optimized Logical Plan ==
Project [account_rk#1, full_name#59]
+- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88))
:- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27]
: +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18))
: +- Relation[,... 4 more fields] orc
+- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88]
+- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57))
+- Relation[,... 9 more fields] orc
Но стоит только заменить в запросе inner join на left outer, как тут же отваливается push predicate для таблицы customer, и происходит полное сканирование, что является нежелательным эффектом.
== Optimized Logical Plan ==
Project [account_rk#1, full_name#59]
+- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88))
:- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27]
: +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531))
: +- Relation[,... 4 more fields] orc
+- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88]
+- Relation[,... 9 more fields] orc
Конвертация типов
Рассмотрим простой пример отбора из таблицы с фильтрацией по типу клиента, в схеме тип поля party_type — string.
select party_rk, full_name
from cust
where actual_date = cast('2017-12-31' as date)
and party_type = 101 -- некорректный тип
-- and party_type = '101' -- тип соответствует схеме данных
И сравним два получившихся плана, первый — когда обращаемся по некорректному типу (будет неявное приведение int to string), второй — когда по типу, соответствующему схеме.
PushedFilters: [IsNotNull(PARTY_TYPE)] // В первом случае фильтрация данных будет после вычитки из файла в память.
PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] // В данном случае фильтрация данных будет осуществлена непосредственно в момент чтения из файла.
Аналогичная проблема наблюдается и для случая сравнения даты со строкой, будет фильтр на сравнение строк. Пример:
where OPER_DATE = '2017-12-31'
Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31)
PushedFilters: [IsNotNull(OPER_DATE)]
where OPER_DATE = cast('2017-12-31' as date)
PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)]
Для случая, когда возможно неявное приведение типа, например, int -> decimal, оптимизатор справляется самостоятельно.
Направление дальнейших исследований
Много любопытной информации о «ручках», которые можно использовать для тонкой настройки Catalyst, а также о возможностях (настоящих и будущих) оптимизатора можно почерпнуть из SQLConf.scala.
В частности, как можно увидеть по умолчанию стоимостной оптимизатор на текущий момент все еще выключен.
val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
.booleanConf
.createWithDefault(false)
Равно как и зависящие от него оптимизации, связанные с переупорядочиванием join'ов.
val JOIN_REORDER_ENABLED =
buildConf("spark.sql.cbo.joinReorder.enabled")
.doc("Enables join reorder in CBO.")
.booleanConf
.createWithDefault(false)
или
val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
.doc("When true, it enables join reordering based on star schema detection. ")
.booleanConf
.createWithDefault(false)
Краткие итоги
Удалось затронуть только небольшую часть существующих оптимизаций, впереди ждут эксперименты со стоимостной оптимизацией, которая может дать намного больше простора для преобразования запросов. Также отдельный интересный вопрос — сопоставление набора оптимизаций при чтении из Parquet и Orc файлов, судя по jira проекта дело идет к паритету, но так ли это на самом деле?
Кроме того:
- Анализ и оптимизация запросов — это интересно и увлекательно, особенно с учетом доступности исходников.
- Включение CBO даст простор для дальнейших оптимизаций и исследований.
- Необходимо следить за применимостью основных правил, позволяющих отфильтровать как можно больше «лишних» данных на как можно более ранних этапах.
- Join — это необходимое зло, но по возможности стоит их минимизировать и следить за тем, какая реализация используется под капотом.
Комментарии (4)
sshikov
13.07.2018 19:31>Join — это необходимое зло, но по возможности стоит их минимизировать и следить за тем, какая реализация используется под капотом.
Именно за этом мы и любим API :) Сделать на нем можно практически все тоже самое, и при этом сочетать гибкость и контроль над реализацией.
andr1983
Спасибо, хорошее интро. Но немного коробит от постоянного сравнивания Spark с РСУБД. Стоило бы ещё рассказать в каких случаях происходит pushdown предикатов, а когда нет. Хотя бы для основных источников, как orc, parquet, hive.
kpavn Автор
Спасибо большое за отзыв. В принципе, со всем согласен :)
Насчет уклона — это скорее уже моя проф деформация восприятия технологий.
Насчет pushdown предикатов — возможно, стоило сделать отдельный раздел, информация получается размазалась по статье, например, в «Конвертации типов».
Но согласен, можно написать много больше, постараюсь дать развернутый комментарий на эту тему.
andr1983
Было бы классно, а то в своё время пришлось потратить много времени на разбор данного вопроса. А так готовая статья на Хабре — красота )