Всем привет!
В этой статье возьмем за основу пару таблиц и пройдемся по планам запросов по нарастающей: от обычного селекта до джойнов, оконок и репартиционирования. Посмотрим, чем отличаются виды планов друг от друга, что в них изменяется от запроса к запросу и разберем каждую строчку на примере партиционированной и непартиционированной таблицы.
Исходные данные
Будем работать с 3 таблицами:
campaigns – партиционирована по полю loading_id
campaigns_not_partitioned – точно такая же таблица, но не партиционирована
stats – таблица, с которой будем тестить джойны
![Таблица campaigns / campaigns_not_partitioned Таблица campaigns / campaigns_not_partitioned](https://habrastorage.org/getpro/habr/upload_files/c85/7b1/2a1/c857b12a10d36c000026597c9a065ac6.jpg)
![Схема таблицы campaigns / campaigns_not_partitioned Схема таблицы campaigns / campaigns_not_partitioned](https://habrastorage.org/getpro/habr/upload_files/f82/6a7/5d2/f826a75d25761c75d32959a84bb33c33.png)
![Схема таблицы campaigns / campaigns_not_partitioned Схема таблицы campaigns / campaigns_not_partitioned](https://habrastorage.org/getpro/habr/upload_files/f02/e29/f4e/f02e29f4e9f75220729f82b9787324a8.png)
![Схема таблицы stats Схема таблицы stats](https://habrastorage.org/getpro/habr/upload_files/836/33e/139/83633e1394c5bf82a46daf540549d367.png)
Коротко про виды планов
Всего существует 4 плана:
Parsed Logical Plan – план после парсинга ячейки с кодом, отлавливает синтаксические ошибки
Analyzed Logical Plan – план после семантического анализа, подтягиваются конкретные таблички и столбцы с типами данных
Optimized Logical Plan – оптимизации к предыдущему плану, например, упрощает лишние операции для повышения производительности
Physical Plan – как физически будет исполнен запрос на кластере, конкретные файлики, пути, форматы, партиции и т. д.
![Планы запросов при чтении таблицы campaigns Планы запросов при чтении таблицы campaigns](https://habrastorage.org/getpro/habr/upload_files/b0c/92a/01f/b0c92a01fa76035571d4882bf0dd130b.png)
Далее мы будем смотреть только на физический план, т.к. он отражает фактический алгоритм обработки данных. Чтобы чтение было комфортным, есть оглавление с быстрыми ссылками на сами запросы.
Погнали!
Оглавление
1. Самый обычный селект
![Выборка всех полей из таблицы campaigns Выборка всех полей из таблицы campaigns](https://habrastorage.org/getpro/habr/upload_files/cd6/ffd/cd9/cd6ffdcd9390db4bd24c956b3c356326.png)
Что ж, пойдемте изучать план:
1) FileScan parquet
Наша таблица читается из схемы, перечисляются все поля, формат данных – паркет, который хранится на hdfs по указанному пути, партиций нет (в данном случае берем все), фильтров нет, указывается схема данных.
2) ColumnarToRow
Исходные данные хранятся в паркет-файлах поколоночно, но в спарке датафрейм по сути содержит множество строк, поэтому нам нужно преобразование колонок в строки.
Напомню, что первая таблица была партиционированной по полю loading_id. Теперь посмотрим на непартиционированную таблицу:
![Выборка всех полей из таблицы campaigns_not_partitioned Выборка всех полей из таблицы campaigns_not_partitioned](https://habrastorage.org/getpro/habr/upload_files/d5b/04e/914/d5b04e9149f2809822b32f993cf7a68e.png)
Что поменялось?
Location вместо CatalogFileIndex стал InMemoryFileIndex.
CatalogFileIndex используется, когда мы читаем партиционированную таблицу полностью.
InMemoryFileIndex используется, когда мы читаем непартиционированную таблицу или отдельные партиции.
2. Самый обычный фильтр
![Выборка всех полей из таблицы campaigns с фильтром по loading_id Выборка всех полей из таблицы campaigns с фильтром по loading_id](https://habrastorage.org/getpro/habr/upload_files/5f8/4b0/af4/5f84b0af458e0a02426560aa1e672073.png)
Как мы видим, заполнилось поле PartitionFilters – мы отобрали конкретные партиции. Вообще оптимизатор Catalyst в фильтрах всегда добавляет еще один – isnotnull, т.к., чтобы отфильтровать по конкретным значениям, поле точно должно быть не пустым.
В непартиционированной таблице:
![Выборка всех полей из таблицы campaigns_not_partitioned с фильтром по loading_id Выборка всех полей из таблицы campaigns_not_partitioned с фильтром по loading_id](https://habrastorage.org/getpro/habr/upload_files/ca7/1a6/f39/ca71a6f397867329837e561bfc06cfc3.png)
PartitionFilters переместились на уровень DataFilters и PushedFilters.
DataFilters – это фильтры на непартиционированных столбцах.
PushedFilters – фильтры, которые мы можем пробросить на уровень источника данных и применить прямо при чтении файла.
При этом эти два параметра могут не совпадать. Здесь я добавила чисто технический фильтр:
![Выборка всех полей из таблицы campaigns_not_partitioned с фильтром по loading_id и кастомному полю Выборка всех полей из таблицы campaigns_not_partitioned с фильтром по loading_id и кастомному полю](https://habrastorage.org/getpro/habr/upload_files/ff8/710/aac/ff8710aac94b10ecc391e9e3a8bb3f04.png)
В DataFilters фильтр с concat() есть, а в PushedFilters уже нет, потому что мы не можем применить эту сложную конструкцию на источнике.
Также в план запроса добавляется еще один степ – Filter. Он нужен, чтобы окончательно отфильтровать данные по указанным условиям. Потому что на этапе PushedFilters мы берем не нужные строки, а мы берем файлы, содержащие нужные строки. Соответственно, в них запросто может попасть что-то лишнее.
3. Селект одного столбца
![Выборка поля account_id из таблиц campaigns / campaigns_not_partitioned Выборка поля account_id из таблиц campaigns / campaigns_not_partitioned](https://habrastorage.org/getpro/habr/upload_files/50b/488/e33/50b488e333e54d16ea5296cf53779384.png)
В случае партиционированной таблицы мы всегда будем таскать за собой поле партиционирования, а потом выполнять Project – это и есть select.
4. Селект одного столбца + фильтр
![Выборка поля account_id из таблиц campaigns / campaigns_not_partitioned с фильтром по loading_id Выборка поля account_id из таблиц campaigns / campaigns_not_partitioned с фильтром по loading_id](https://habrastorage.org/getpro/habr/upload_files/fe4/138/ade/fe4138adeb3a9b2545f894b11b0339a7.png)
Помимо разницы между PartitionFilters и PushedFilters, которые мы уже обсудили в п. 2, еще добавилась разница в параметре ReadSchema. Если таблица партиционирована, то поле партиционирования не хранится в виде столбца в данных, оно выносится на уровень файловой системы: поэтому в первом пути есть папка /loading_id=40678148. Во втором случае loading_id хранится прямо в файле, поэтому нам нужно сначала его достать.
5. Кэширование
![Выборка из закэшированной таблицы Выборка из закэшированной таблицы](https://habrastorage.org/getpro/habr/upload_files/325/12b/c28/32512bc28bdbdc19a236c576f8f5dca3.png)
Здесь добавились 2 операции: InMemoryRelation, InMemoryTableScan, которые всегда будут сопровождать любые трансформации с закэшированной таблицей.
Итак, самые подкапотные штуки мы посмотрели, различий между таблицами больше не будет. Далее я буду вставлять только сами действия, чтобы не забивать экран однотипной информацией, а набор колонок, схемы и прочее оставлю за скобками.
6. Переименование, добавление нового столбца
Обе пары запросов приводят к одному результату, операции выполняются на этапе Project:
![Переименование столбца таблицы Переименование столбца таблицы](https://habrastorage.org/getpro/habr/upload_files/af9/05c/c02/af905cc02cc5417039e11fcf949c2e3f.png)
![Добавление нового столбца в таблицу Добавление нового столбца в таблицу](https://habrastorage.org/getpro/habr/upload_files/d22/0ab/1c6/d220ab1c6740759ee8d62fc169237807.png)
7. Селект с функциями, case when
![Применение функций к полям таблицы Применение функций к полям таблицы](https://habrastorage.org/getpro/habr/upload_files/bfa/c61/824/bfac618242db7cd666cc7d24709012a4.png)
![Применение конструкции case when Применение конструкции case when](https://habrastorage.org/getpro/habr/upload_files/421/57c/c93/42157cc939eac5efb24c6893799ce32b.png)
Применение функций, case when так же, как и изменение нейминга полей, происходит на этапе Project.
8. Группировка
![Подсчет количества строк в рамках account_id Подсчет количества строк в рамках account_id](https://habrastorage.org/getpro/habr/upload_files/efb/56a/4a2/efb56a4a27f91c03def740af6d50354b.png)
![Подсчет количества account_id в рамках account_id :) Подсчет количества account_id в рамках account_id :)](https://habrastorage.org/getpro/habr/upload_files/1b2/353/ef2/1b2353ef2f2d1c0438d99e03f31d0046.png)
Так как для группировки нам нужно только одно поле account_id, а спарк на каждый последующий этап по возможности хочет передать как можно меньше данных, то в FileScan мы берем только нужные поля. Операция Project появилась из-за того, что мы еще тянем за собой поле партиционирования.
Теперь посмотрим на новые операции:
HashAggregate – агрегация, keys – поля группировки, functions – агрегирующая функция. Здесь используется partial_count, потому что спарк старается делать агрегацию в 2 подхода:
1 – агрегация в рамках каждой партиции.
Exchange hashpartitioning – это шафл, 200 – количество партиций после шафла. Все одинаковые ключи собираются в рамках одной партиции на основе вычисления хеш-кода, и происходит обмен парами ключ-partial_count.
2 – вторая итерация агрегации, суммируются все partial_count.
9. Distinct
![Выборка уникальных account_id Выборка уникальных account_id](https://habrastorage.org/getpro/habr/upload_files/543/e5d/077/543e5d077aea3a80b90abc05a01dde51.png)
Единственное отличие distinct от groupBy – это отсутствие агрегирующей функции.
10. Sort
![Сортировка по полю account_id Сортировка по полю account_id](https://habrastorage.org/getpro/habr/upload_files/b78/86a/9ed/b7886a9eda8a1a7d7123475baedd927d.png)
Exchange rangepartitioning – на этом этапе происходит распределение данных на основе диапазона. Например, если account_id от 1 до 100 и мы хотим поделить на 3 партиции, то account_id с 1 по 33 попадут в первую партицию, с 34 по 66 – во вторую и далее по аналогии.
11. Агрегирующие функции
![Расчет минимального start_time для каждого account_id Расчет минимального start_time для каждого account_id](https://habrastorage.org/getpro/habr/upload_files/e7d/bfa/9c0/e7dbfa9c0f75f00658531932056fcaac.png)
Здесь мы видим новый стейдж – SortAggregate. Он используется, когда HashAggregate невозможен из-за ограничений по памяти или когда он не поддерживает агрегирующие функции или ключи (например, неизменяемые типы данных, а у нас в примере оба поля типа string). Этот метод включает предварительную сортировку, поэтому работает медленнее. SortAggregate так же, как и HashAggregate, выполняется в 2 подхода: до шафла локально на каждом маппере и после шафла.
И так как мы считаем минимальное значение, то используются соответствующие функции partial_min и min.
Попробуем убрать группировку:
![Вычисление минимального account_id в датафрейме Вычисление минимального account_id в датафрейме](https://habrastorage.org/getpro/habr/upload_files/e2e/6e7/0f1/e2e6e70f1edb407e8944d4620d31ef6e.png)
У нас изменился один из этапов: появился Exchange SinglePartition. Это означает, что все данные перемещаются в одну партицию и будут обработаны на одном ядре. Он используется при вычислении, например, min, max, avg или с оконкой без ключа партиционирования (плохо!).
12. DropDuplicates
![Удаление дубликатов по полю account_id Удаление дубликатов по полю account_id](https://habrastorage.org/getpro/habr/upload_files/e04/31b/610/e0431b610ba237390f51558f2aff288e.png)
Функция dropDuplicates при наличии дубликатов по умолчанию оставляет первый элемент, поэтому сначала считается partial_first в рамках каждой партиции, а после шафла first для каждого ключа. Все как всегда.
13. Window functions
![Применение оконной функции rank Применение оконной функции rank](https://habrastorage.org/getpro/habr/upload_files/530/d31/3fe/530d313feed12578d440934f5102ed07.png)
Кажется, что с учетом предыдущих пунктов тут уже все довольно просто: нам не нужно предварительно агрегировать или сортировать, поэтому мы начинаем сразу с этапа шафла. Затем выполняется часть с .orderBy(), рассчитывается оконная функция, и берется указанная выборка полей. Все остальные оконки аналогичны, меняется только этап Window.
14. Union
Оба запроса приводят к одному результату, считываются две таблицы, которые затем объединяются на этапе Union:
![Объединение двух датафреймов Объединение двух датафреймов](https://habrastorage.org/getpro/habr/upload_files/45a/5b8/4de/45a5b84de7c44c7abd24299bdb6096ba.png)
15. Join
SortMergeJoin
![SortMergeJoin SortMergeJoin](https://habrastorage.org/getpro/habr/upload_files/422/10d/1e7/42210d1e728ce8abbf818a3abdf89a74.png)
SMJ работает, когда есть условие на равенство и когда ключи сортируемы.
Что происходит?
Filter isnotnull: так как у нас inner join, то ключи содержать null не могут, поэтому спарк фильтрует как можно раньше, чтобы обрабатывать меньшее количество данных.
Небольшая табличка по типам джойнов:
Тип джойна |
Наличие левого фильтра |
Наличие правого фильтра |
Комментарий |
inner |
+ |
+ |
оба ключа не null |
left |
- |
+ |
левая таблица может содержать null |
right |
+ |
- |
правая таблица может содержать null |
full |
- |
- |
обе таблицы могут содержать null |
Exchange hashpartitioning – оба датафрейма репатиционируются в 200 партиций по ключам джойна.
Sort – сортировка внутри партиции по ключам джойна.
SortMergeJoin – в цикле обходится каждая пара партиций, и с помощью сравнения левого и правого ключей соединяются строки с одинаковыми ключами.
ShuffledHashJoin
![ShuffledHashJoin ShuffledHashJoin](https://habrastorage.org/getpro/habr/upload_files/242/6aa/1c7/2426aa1c718c4147e9bd01daa16a3c83.png)
SHJ работает только при наличии хинта, т.к. по умолчанию используется SMJ. В данном случае у нас отсутствует этап Sort и изменяется вид джойна.
Exchange hashpartitioning – датафреймы с одинаковым ключом джойна перемещаются на один экзекьютор.
ShuffledHashJoin – на экзекьюторе создается хеш-таблица для меньшего датафрейма, где ключ – это кортеж из полей джойна (в нашем примере id). Затем происходит итерация по большему датафрейму внутри каждой партиции, и проверяется наличие ключей в хеш-таблице.
BroadcastHashJoin
![BroadcastHashJoin BroadcastHashJoin](https://habrastorage.org/getpro/habr/upload_files/e13/676/9c3/e136769c3409b14373e238e400a20b07.png)
BHJ работает, когда есть условие на равенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastExchange – это копирование правого датафрейма на каждый экзекьютор.
Hash join происходит аналогично, основное различие – в использовании стратегии обмена данными: шафл для SHJ и броадкаст для BHJ.
BroadcastNestedLoopJoin
![BroadcastNestedLoopJoin BroadcastNestedLoopJoin](https://habrastorage.org/getpro/habr/upload_files/676/1e6/575/6761e65752b7c6137ea7d2927ac95ece.png)
BNLJ работает, когда есть условие на неравенство и когда один из датафреймов небольшой и полностью помещается в память экзекьютора.
BroadcastNestedLoopJoin – во вложенном цикле проходимся по элементам каждой партиции левого датафрейма и копии правого датафрейма и проверяем условие.
CartesianProduct
![CartesianProduct CartesianProduct](https://habrastorage.org/getpro/habr/upload_files/64a/d08/e47/64ad08e474bb2fff4313a75bee6633d7.png)
CPJ работает, когда есть условие на неравенство, но BNLJ не может быть применен.
CartesianProduct – спарк создает пары из каждой партиции левого датафрейма с каждой партицией правого датафрейма. Затем перемещает каждую пару на один экзекьютор и проверяет условие джойна.
16. Repartition
![Репартиционирование Репартиционирование](https://habrastorage.org/getpro/habr/upload_files/8fa/537/553/8fa537553dc08c16c92c5f392ce06887.png)
Напоследок посмотрим на еще один вид шафла – Exchange RoundRobinPartitioning. Именно этот алгоритм позволяет получить партиции примерно одного размера: он равномерно распределяет данные и предотвращает перекосы (data skew).
17. Комплексные условия
Совмещаем все!
(ну, почти)
Надеюсь, здесь вам все понятно)
![Использование where, groupBy, countDistinct, having и sort в одном запросе Использование where, groupBy, countDistinct, having и sort в одном запросе](https://habrastorage.org/getpro/habr/upload_files/a6d/863/2a3/a6d8632a3155955e706e1959ff89e754.png)
![Использование where, groupBy, countDistinct, having и sort в одном запросе Использование where, groupBy, countDistinct, having и sort в одном запросе](https://habrastorage.org/getpro/habr/upload_files/499/d7b/bd7/499d7bbd7d9ec1c2b3b043300a012837.png)
На этом все, спасибо за прочтение!
Контакты: дата инженеретта
Комментарии (4)
miksoft
16.04.2024 10:16BroadcastExchange – это копирование правого датафрейма на каждый экзекьютор.
Дополню, что предварительно этот датафрейм собирается на драйвере. Соответственно, driver.memory (емнип) должен быть достаточен.
Sanek_new
Вот почему я не понимаю планы... И не знаю что сделать чтобы начать понимать.
val6789 Автор
понять и принять
miksoft
Можно начать с планов в картинках. SparkUI довольно сносно их отображает, если, конечно, запрос не слишком сложный.