Всем привет!
В этой статье возьмем за основу пару таблиц и пройдемся по планам запросов по нарастающей: от обычного селекта до джойнов, оконок и репартиционирования. Посмотрим, чем отличаются виды планов друг от друга, что в них изменяется от запроса к запросу и разберем каждую строчку на примере партиционированной и непартиционированной таблицы.
Исходные данные
Будем работать с 3 таблицами:
campaigns – партиционирована по полю loading_id
campaigns_not_partitioned – точно такая же таблица, но не партиционирована
stats – таблица, с которой будем тестить джойны
Коротко про виды планов
Всего существует 4 плана:
Parsed Logical Plan – план после парсинга ячейки с кодом, отлавливает синтаксические ошибки
Analyzed Logical Plan – план после семантического анализа, подтягиваются конкретные таблички и столбцы с типами данных
Optimized Logical Plan – оптимизации к предыдущему плану, например, упрощает лишние операции для повышения производительности
Physical Plan – как физически будет исполнен запрос на кластере, конкретные файлики, пути, форматы, партиции и т. д.
Далее мы будем смотреть только на физический план, т.к. он отражает фактический алгоритм обработки данных. Чтобы чтение было комфортным, есть оглавление с быстрыми ссылками на сами запросы.
Погнали!
Оглавление
1. Самый обычный селект
Что ж, пойдемте изучать план:
1) FileScan parquet
Наша таблица читается из схемы, перечисляются все поля, формат данных – паркет, который хранится на hdfs по указанному пути, партиций нет (в данном случае берем все), фильтров нет, указывается схема данных.
2) ColumnarToRow
Исходные данные хранятся в паркет-файлах поколоночно, но в спарке датафрейм по сути содержит множество строк, поэтому нам нужно преобразование колонок в строки.
Напомню, что первая таблица была партиционированной по полю loading_id. Теперь посмотрим на непартиционированную таблицу:
Что поменялось?
Location вместо CatalogFileIndex стал InMemoryFileIndex.
CatalogFileIndex используется, когда мы читаем партиционированную таблицу полностью.
InMemoryFileIndex используется, когда мы читаем непартиционированную таблицу или отдельные партиции.
2. Самый обычный фильтр
Как мы видим, заполнилось поле PartitionFilters – мы отобрали конкретные партиции. Вообще оптимизатор Catalyst в фильтрах всегда добавляет еще один – isnotnull, т.к., чтобы отфильтровать по конкретным значениям, поле точно должно быть не пустым.
В непартиционированной таблице:
PartitionFilters переместились на уровень DataFilters и PushedFilters.
DataFilters – это фильтры на непартиционированных столбцах.
PushedFilters – фильтры, которые мы можем пробросить на уровень источника данных и применить прямо при чтении файла.
При этом эти два параметра могут не совпадать. Здесь я добавила чисто технический фильтр:
В DataFilters фильтр с concat() есть, а в PushedFilters уже нет, потому что мы не можем применить эту сложную конструкцию на источнике.
Также в план запроса добавляется еще один степ – Filter. Он нужен, чтобы окончательно отфильтровать данные по указанным условиям. Потому что на этапе PushedFilters мы берем не нужные строки, а мы берем файлы, содержащие нужные строки. Соответственно, в них запросто может попасть что-то лишнее.
3. Селект одного столбца
В случае партиционированной таблицы мы всегда будем таскать за собой поле партиционирования, а потом выполнять Project – это и есть select.
4. Селект одного столбца + фильтр
Помимо разницы между PartitionFilters и PushedFilters, которые мы уже обсудили в п. 2, еще добавилась разница в параметре ReadSchema. Если таблица партиционирована, то поле партиционирования не хранится в виде столбца в данных, оно выносится на уровень файловой системы: поэтому в первом пути есть папка /loading_id=40678148. Во втором случае loading_id хранится прямо в файле, поэтому нам нужно сначала его достать.
5. Кэширование
Здесь добавились 2 операции: InMemoryRelation, InMemoryTableScan, которые всегда будут сопровождать любые трансформации с закэшированной таблицей.
Итак, самые подкапотные штуки мы посмотрели, различий между таблицами больше не будет. Далее я буду вставлять только сами действия, чтобы не забивать экран однотипной информацией, а набор колонок, схемы и прочее оставлю за скобками.
6. Переименование, добавление нового столбца
Обе пары запросов приводят к одному результату, операции выполняются на этапе Project:
7. Селект с функциями, case when
Применение функций, case when так же, как и изменение нейминга полей, происходит на этапе Project.
8. Группировка
Так как для группировки нам нужно только одно поле account_id, а спарк на каждый последующий этап по возможности хочет передать как можно меньше данных, то в FileScan мы берем только нужные поля. Операция Project появилась из-за того, что мы еще тянем за собой поле партиционирования.
Теперь посмотрим на новые операции:
HashAggregate – агрегация, keys – поля группировки, functions – агрегирующая функция. Здесь используется partial_count, потому что спарк старается делать агрегацию в 2 подхода:
1 – агрегация в рамках каждой партиции.
Exchange hashpartitioning – это шафл, 200 – количество партиций после шафла. Все одинаковые ключи собираются в рамках одной партиции на основе вычисления хеш-кода, и происходит обмен парами ключ-partial_count.
2 – вторая итерация агрегации, суммируются все partial_count.
9. Distinct
Единственное отличие distinct от groupBy – это отсутствие агрегирующей функции.
10. Sort
Exchange rangepartitioning – на этом этапе происходит распределение данных на основе диапазона. Например, если account_id от 1 до 100 и мы хотим поделить на 3 партиции, то account_id с 1 по 33 попадут в первую партицию, с 34 по 66 – во вторую и далее по аналогии.
11. Агрегирующие функции
Здесь мы видим новый стейдж – SortAggregate. Он используется, когда HashAggregate невозможен из-за ограничений по памяти или когда он не поддерживает агрегирующие функции или ключи (например, неизменяемые типы данных, а у нас в примере оба поля типа string). Этот метод включает предварительную сортировку, поэтому работает медленнее. SortAggregate так же, как и HashAggregate, выполняется в 2 подхода: до шафла локально на каждом маппере и после шафла.
И так как мы считаем минимальное значение, то используются соответствующие функции partial_min и min.
Попробуем убрать группировку:
У нас изменился один из этапов: появился Exchange SinglePartition. Это означает, что все данные перемещаются в одну партицию и будут обработаны на одном ядре. Он используется при вычислении, например, min, max, avg или с оконкой без ключа партиционирования (плохо!).
12. DropDuplicates
Функция dropDuplicates при наличии дубликатов по умолчанию оставляет первый элемент, поэтому сначала считается partial_first в рамках каждой партиции, а после шафла first для каждого ключа. Все как всегда.
13. Window functions
Кажется, что с учетом предыдущих пунктов тут уже все довольно просто: нам не нужно предварительно агрегировать или сортировать, поэтому мы начинаем сразу с этапа шафла. Затем выполняется часть с .orderBy(), рассчитывается оконная функция, и берется указанная выборка полей. Все остальные оконки аналогичны, меняется только этап Window.
14. Union
Оба запроса приводят к одному результату, считываются две таблицы, которые затем объединяются на этапе Union:
15. Join
SortMergeJoin
SMJ работает, когда есть условие на равенство и когда ключи сортируемы.
Что происходит?
Filter isnotnull: так как у нас inner join, то ключи содержать null не могут, поэтому спарк фильтрует как можно раньше, чтобы обрабатывать меньшее количество данных.
Небольшая табличка по типам джойнов:
Тип джойна |
Наличие левого фильтра |
Наличие правого фильтра |
Комментарий |
inner |
+ |
+ |
оба ключа не null |
left |
- |
+ |
левая таблица может содержать null |
right |
+ |
- |
правая таблица может содержать null |
full |
- |
- |
обе таблицы могут содержать null |
Exchange hashpartitioning – оба датафрейма репатиционируются в 200 партиций по ключам джойна.
Sort – сортировка внутри партиции по ключам джойна.
SortMergeJoin – в цикле обходится каждая пара партиций, и с помощью сравнения левого и правого ключей соединяются строки с одинаковыми ключами.
ShuffledHashJoin
SHJ работает только при наличии хинта, т.к. по умолчанию используется SMJ. В данном случае у нас отсутствует этап Sort и изменяется вид джойна.
Exchange hashpartitioning – датафреймы с одинаковым ключом джойна перемещаются на один экзекьютор.
ShuffledHashJoin – на экзекьюторе создается хеш-таблица для меньшего датафрейма, где ключ – это кортеж из полей джойна (в нашем примере id). Затем происходит итерация по большему датафрейму внутри каждой партиции, и проверяется наличие ключей в хеш-таблице.
BroadcastHashJoin
BHJ работает, когда есть условие на равенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastExchange – это копирование правого датафрейма на каждый экзекьютор.
Hash join происходит аналогично, основное различие – в использовании стратегии обмена данными: шафл для SHJ и броадкаст для BHJ.
BroadcastNestedLoopJoin
BNLJ работает, когда есть условие на неравенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastNestedLoopJoin – во вложенном цикле проходимся по элементам каждой партиции левого датафрейма и копии правого датафрейма и проверяем условие.
CartesianProduct
CPJ работает, когда есть условие на неравенство, но BNLJ не может быть применен.
CartesianProduct – спарк создает пары из каждой партиции левого датафрейма с каждой партицией правого датафрейма. Затем перемещает каждую пару на один экзекьютор и проверяет условие джойна.
16. Repartition
Напоследок посмотрим на еще один вид шафла – Exchange RoundRobinPartitioning. Именно этот алгоритм позволяет получить партиции примерно одного размера: он равномерно распределяет данные и предотвращает перекосы (data skew).
17. Комплексные условия
Совмещаем все!
(ну, почти)
Надеюсь, здесь вам все понятно)
На этом все, спасибо за прочтение!
Контакты: дата инженеретта
Комментарии (4)
miksoft
16.04.2024 10:16BroadcastExchange – это копирование правого датафрейма на каждый экзекьютор.
Дополню, что предварительно этот датафрейм собирается на драйвере. Соответственно, driver.memory (емнип) должен быть достаточен.
Sanek_new
Вот почему я не понимаю планы... И не знаю что сделать чтобы начать понимать.
val6789 Автор
понять и принять
miksoft
Можно начать с планов в картинках. SparkUI довольно сносно их отображает, если, конечно, запрос не слишком сложный.