Аналитические системы должны эффективно обрабатывать сложные пользовательские запросы к десяткам и сотням терабайт данных (пета-?). Продвинутый оптимизатор запросов является важнейшим компонентом любого big data движка. В данной статье мы рассмотрим, как устроен оптимизатор запросов в массивно-параллельном аналитическом SQL-движке Trino.
Это умеренно сложная статья, в которой мы опишем основные принципы оптимизации запросов в Trino, без путешествия в "машинный зал". Схожие идеи реализованы во многих других аналитических системах, поэтому статья будет полезна широкому кругу читателей, занимающихся анализом данных и эксплуатацией аналитических систем.
Что такое Trino?
Trino — это массивно-параллельный аналитический SQL-движок, который позволяет обрабатывать большие объемы данных из разных источников.
Изначально называемый Presto, продукт был разработан более десяти лет назад компанией Facebook как более производительная альтернатива Hive для решения задач интерактивного анализа больших данных. Hive использует парадигму map-reduce с материализациями промежуточных результатов на диске, что существенно замедляет выполнение запросов. Trino реализует массивно-параллельную архитектуру, а промежуточные результаты в большинстве случаев "стримятся" между узлами без сохранения на диск, что делает движок существенно более производительным, чем Hive.
За десятилетие своего развития Trino прошел большой путь от "замены Hive" до полноценного федеративного движка общего назначения, который позволяет пользователям легко интегрировать данные из различных систем без болезненного ETL, и переносить аналитическую нагрузку из дорогих в обслуживании и трудно масштабируемых корпоративных хранилищ (напр. Greenplum) в более дешевые озера данных. Таким образом, Trino позволяет организациям быстрее реализовывать новые сценарии анализа данных, одновременно сокращая расходы на инфраструктуру. Если для анализа данных вы используете Hive или корпоративные хранилища, Trino может стать отличной современной альтернативой: быстро работает, снижает потребность в ETL, уменьшает дублирование данных, подходит для облака, эластично масштабируется, прост в администрировании.
Наша команда создает CedrusData — коммерческий форк Trino для российского рынка, в который мы добавляем критические улучшения производительности и администрирования, что бы сделать продукт еще более удобным и эффективным. Мы делимся опытом разработки Trino в нашем блоге, а так же канале и чате в Telegram.
Теоретические основы оптимизации запросов
Практически любой SQL-запрос можно выполнить несколькими способами. Аналитические SQL-запросы обычно содержат большое количество сложных операций, таких как Join, агрегации и window-функции. В подобных запросах количество альтернативных путей выполнения, называемых планами, может измеряться миллионами.
В реальных движках (особенно распределенных) количество теоретически возможных планов растет гораздо быстрее факториала от количества таблиц в запросе. Поэтому "миллионы" — это достаточно оптимистичная оценка.
Разница в производительности самого быстро и самого медленного планов может составлять многие порядки. Представьте, что один и тот же запрос можно выполнить за секунду или за час.
Задача оптимизатора состоит в нахождении оптимального плана выполнения запроса за разумное время. Это конфликтующие задачи: быстро перебрать планов в общем случае невозможно в силу физических ограничений железа. Промышленные оптимизаторы ищут не столько оптимальные, сколько "адекватные" планы, перебирая только часть возможных альтернатив.
Современные движки используют одну или несколько следующих техник оптимизации запросов.
Наша команда так же занимается международным консалтингом по вопросам разработки оптимизаторов SQL-запросов. В списке ниже мы привели ссылки на наши англоязычные статьи по некоторым вопросам оптимизации.
Паттерн visitor — ходим по плану сверху вниз и/или снизу вверх и меняем отдельные его части. Применяется для оптимизаций, которым нужен весь контекст запроса. Пример: удаление неиспользуемых полей, алгоритм Cascades, и т.п.
Rule-based оптимизация — пишем трансформации для индивидуальных паттернов операторов, после чего "натравливаем" их на план (например, с помощью уже упомянутого visitor-паттерна). Пример: "если встретили Filter по ключу группировки над Aggregate, то переставим Filter под Aggregate".
Эвристическая оптимизация — применяем ту или иную трансформацию без оценки стоимости. Например, считаем, что Filter под Aggregate всегда лучше, чем Filter над Aggregate, хотя в реальности это не всегда так. Например, если фильтр не отбрасывает ни одной записи, то его перемещение под Aggregate приведет скорее к замедлению запроса.
Cost-based оптимизация — одновременное рассмотрение нескольких планов запросов, и выбор наиболее дешевого на основе стоимости. Обычно реализуется для наиболее критичных оптимизаций с большим количеством альтернатив. Самые распространенные примеры: планирование порядка Join, расстановка операторов Exchange в распределенных движках. Cost-based оптимизация обычно требует доступ к статистикам о данных для расчета стоимости планов, а так же механизм отсечения повторяющихся задач на основе динамического программирования или мемоизации.
Многофазная оптимизация — комбинирование нескольких оптимизационных шагов, описанных выше в фиксированную последовательность. Например, (1) запушим фильтры вниз, (2) выберем оптимальный порядок Join, (3) попробуем заменить доступ к таблицам материализованными представлениями, (4) расставим Exchange. Многофазная оптимизация решает проблему экспоненциального роста количества планов. Например, задача выбора порядка Join является NP-полной, как и задача выбора материализованных представлений. Если мы попытаемся объединить обе эти задачи в монолитную фазу, то мы получим "перемножение" их сложностей, и время планирования улетит в космос даже для относительно простых запросов. Многофазная оптимизация позволяет нам удерживать время планирования под контролем ценой отсечения значительного количества (предположительно не очень хороших) планов. Все без исключения промышленные оптимизаторы являются многофазными.
Теперь поговорим о Trino.
Задачи оптимизатора Trino
Trino — это массивно-параллельный распределенный колоночный SQL-движок для обработки больших данных из удаленных источников, прежде всего озер данных и КХД. Фокус на big data и аналитику определяет ключевые задачи оптимизатора Trino:
Эффективно работать с источниками данных. Например, стремиться по возможности уменьшить количество данных, передаваемых из источника.
Разумно планировать Join-ы.
Расставлять операторы Exchange для корректного и эффективного выполнения распределенных операций (Join, Aggregation, Window, и т.п.).
В данной статье мы рассмотрим первые две задачи. Вопрос планирования операторов Exchange будет рассмотрен в отдельной статье.
Реляционные операторы
Оптимизатор Trino является многофазным, и содержит более 80 (прописью: восьмидесяти) шагов. Каждый шаг представлен интерфейсом PlanOptimizer, который получает на вход один план, и выдает другой:
PlanNode optimize(
PlanNode plan,
Session session,
TypeProvider types,
SymbolAllocator symbolAllocator,
PlanNodeIdAllocator idAllocator,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector,
TableStatsProvider tableStatsProvider);
PlanNode — это дерево реляционных операторов. Примеры часто используемых операторов:
TableScanNode — просканировать данные из таблицы источника. Так как Trino работает с различными источниками данных, за логическим понятием "таблица" могут скрываться разные физические реализации. Например, "таблицей" озера данных являвется некоторое количество файлов и их метаданные, "таблицей" Kafka является topic, а "таблицей" Postgres является ... таблица Postgres.
ProjectNode — преобразовать атрибуты дочернего оператора. Например,
SELECT a, b + 10
.FilterNode — применить предикат. Например,
WHERE a > 10
.JoinNode — объединить данные из двух дочерних операторов по условию. Например,
a JOIN b ON c1 = c2
.AggregationNode — применить агрегацию. Например,
SUM(a) GROUP BY b
.WindowNode — применить одну или несколько оконных функций. Например,
SUM(a) OVER(PARTITION BY b)
.
Для получения реляционного дерева, Trino сначала получает AST из строки запроса с помощью ANTLR (см. грамматику Trino). Далее Trino транслирует AST в реляционное дерево с помощью класса RelationPlanner.
Преобразование AST в реляционное дерево является критически важным шагом, так как структура AST слишком сложна для реализации многих важных оптимизаций. Практически все современные движки используют реляционное представление для оптимизации запросов. Продукты, которые это не делают (например, Postgres), существенно ограничивают потенциал своего оптимизатора.
После получения реляционного дерева Trino последовательно применяет один шаг оптимизации за другим. Фраза "более 80 шагов" звучит угрожающе. Однако, большая часть данных оптимизаций являются достаточно интуитивно понятными упрощениями отдельных частей запроса с помощью визиторов или правил. Мы остановимся на наиболее важных оптимизациях Trino.
Rule-based оптимизации
Большая часть оптимизаций Trino реализована с помощью правил. Правило (см. Rule) представляет собой паттерн, который мы ищем в реляционном дереве, и логику трансформации.
Паттерн — это условие, при выполнении которого произойдет запуск правила. Например, правило PushLimitThroughProject перемещает оператор LimitNode
под ProjectNode
, что бы уменьшить количество передаваемых записей как можно раньше. Паттерн данного правила выглядит следующим образом:
private static final Pattern<LimitNode> PATTERN =
// Ищем LimitNode, ...
limit()
// ... входом которого является ProjectNode, ...
.with(source().matching(
project()
// ... который соответствует некоторому дополнительному условию
.matching(projectNode -> !projectNode.isIdentity())
.capturedAs(CHILD)));
Что бы преобразовать план с помощью правил, необходимо определить список интересующих правил, и передать их в драйвер IterativeOptimizer. Драйвер рекурсивно проходит дерево сверху вниз, применяя подходящие правила. Процесс оптимизации останавливается когда больше нет доступных трансформаций, либо когда превышен фиксированный таймаут.
Выход по таймауту является защитой от ошибок программирования: если вы передадите в драйвер правила, которые взаимно отменяют друг друга (например, одно правило переносит
FilterNode
подJoinNode
, а другое правилоJoinNode
подFilterNode
), то без таймаута процесс оптимизации никогда не остановится. Таймаут позволяет выбросить ошибку. Это лучше, чем молчаливое зависание.
Алгоритм IterativeOptimizer
является эвристическим, так как он применяет правила без учета стоимости планов до и после трансформации. Таким образом, предполагается, что применяемые правила делают план безусловно лучше, хотя на практике это не всегда так. Это нормальная практика: почти все промышленные оптимизаторы делают те или иные трансформации без учета стоимости.
Перед началом работы IterativeOptimizer
заменяет все входы операторов на специальный оператор GroupReference, который представляет собой ссылку на оригинальный вход. Такой подход позволяет эффективно трансформировать дерево, заменяя лишь ссылки. Без этого каждая трансформация приводила бы к необходимости пересоздавать все вышестоящие операторы, затрачивая и без того дефицитные циклы CPU.
Схожим образом работает итеративный планировщик Apache Calcite, называемый HepPlanner. Аналогом
GroupReference
в Apache Calcite является класс HepRelVertex.
Trino имеет более 150 правил оптимизации. Основные группы оптимизаций:
Упрощение операторов. Например: constant folding; удаление неиспользуемых аттрибутов; выявление операторов, которые не производят записей; замена OUTER JOIN на INNER JOIN.
Перестановка операторов местами. Например: Filter под Join; Aggregation под Join; Limit под Project.
Декорреляция и упрощение подзапросов.
Pushdown вычислений в источники.
Планирование порядка Join.
Рассмотрим две последние группы оптимизаций более подробно.
Pushdown вычислений в источник
Trino выполняет SQL-запросы к данным, которые хранятся в других системах. На практике источниками чаще всего являются озера данных (т.е., файлы) и другие реляционные СУБД (например, Postgres, Greenplum). В процессе оптимизации запроса важно спланировать доступ к источнику таким образом, что бы минимизировать ненужную работу.
Рассмотрим запрос к таблице sales
в озере данных, которая партиционирована по атрибуту s_date
. Мы хотим узнать, что было с продажами в один особенно нервный день.
SELECT s_date, SUM(s_amount)
FROM sales
WHERE s_date = DATE '2023-06-24'
Мы можем доставить все колонки и записи таблицы sales из озера в Trino только для того, что бы через мгновение отбросить все колонки, кроме s_date
и s_amount
, и записи всех партишенов, кроме одного. Очевидно, это неэффективно.
Trino содержит набор правил, которые позволяют передать часть вычислений непосредственно в источник данных. Такие правила ищут паттерн "оператор над TableScanNode", и нотифицируют соответствующий коннектор. Далее коннектор принимает решение, может ли он использовать вышестоящий оператор для ускорения запроса или нет. Если да, он сообщает обратно ядру Trino, какую часть вычислений он взял на себя, а какую все же должен выполнить Trino.
Trino поддерживает pushdown множества операторов, включая предикаты, проекции, агрегации, Join и Limit. Вы можете изучить методы applyXXX
в классе ConnectorMetadata, которые реализуют pushdown в разных коннекторах, а так же соответствующие правила PushXXXIntoTableScan
NB:
XXX
в имени метода и соответствующего правила иногда могут не совпадать :-)
Например, за pushdown предиката в коннектор отвечает правило PushPredicateIntoTableScan. При работе с озером данных предикат может быть использован для реализации техник partition pruning и row group pruning, которые позволяют избегать чтения целых файлов или их частей, если те заведомо не содержат удовлетворяющих предикату записей. Соответствующая логика находится в методе HiveMetadata.applyFilter.
Вернемся к примеру. Благодаря способности Trino переносить вычисления на сторону источника, мы можем передать в коннектор информацию о задействованных колонках и предикате по колонке партиционирования. Полученный план запроса сканирует только несколько колонок из одного партишена вместо сканирования всех файлов таблицы целиком.
Планирование порядка операторов Join
Планирование порядка операторов Join — важнейшая оптимизация для любого аналитического движка. Неспособность выбрать оптимальный (или хотя бы относительно адекватный) порядок Join может увеличивать время выполнения запросов на порядки.
В Trino планирование порядка Join реализовано через правило ReorderJoins, которое "выстреливает" на операторах JoinNode
. Оптимизация происходит в три этапа:
Правило преобразует текущий
JoinNode
, а так же находящиеся под ним операторыJoinNode
,ProjectNode
иFilterNode
в специальный оператор MultiJoinNode. Таким образом, информация о соседних Join-ах, их фильтрах и проецируемых атрибутах оказывается собранной в одном месте.Происходит выбора оптимального порядка Join, принцип работы которого описан ниже.
Правило преобразует
MultiJoinNode
обратно в дерево, состоящее из операторовJoinNode
,ProjectNode
иFilterNode
.
За выбор оптимального порядка Join отвечает класс JoinEnumerator. Он реализует прямолинейный cost-based алгоритм top-down перебора порядка Join с мемоизацией:
"Cost-based" означает, что мы выбираем оптимальный план на основе сравнения его стоимости со стоимостями альтернативных планов.
"Top-down" означает, что control flow алгоритма начинает с корня дерева. Для сравнения, классический алгоритм System R использует bottom-up подход, при котором обход начинается с листьев дерева (т.е. индивидуальных сканов).
Предположим, мы хотим выбрать оптимальный порядок Join для запроса а JOIN b JOIN c
. Мы начинаем оптимизацию с группы эквивалентности [abc]
, которая представляет собой все доступные порядки Join таблиц a
, b
и c
. Мы хотим найти для это группы порядок Join, который соответствует наименьшей стоимости. Для этого мы находим все возможные разбиения текущей группы на пару других непустых групп. Для группы [abc]
это будут пары [ab][c]
, [ac][b]
и [bc][a]
. Процесс повторяется рекурсивно до тех пор, пока мы не доходим до групп, состоящих из одной таблицы: [a]
, [b]
, [c]
. Для каждой такой группы мы рассчитываем стоимость, которая равна стоимости сканирования соответствующей таблицы.
После этого контроль возвращается в вышестоящие группы, где происходит выбор оптимального порядка Join. Например, для группы [ab]
мы оцениваем стоимость a JOIN b
и b JOIN a
. Оценка стоимости происходит на основе статистик и различных эвристических формул. В итоге для каждой промежуточной группы мы определяем наиболее дешевый порядок Join.
Наконец, мы попадаем в корневую группу, в которой находим наиболее дешевый порядок Join на основе лучших планов из подлежащих групп.
Обратите внимание, что в процессе оптимизации мы можем попадать в одни и те же группы несколько раз. Что бы не обрабатывать одни и те же группы многократно, Trino использует мемоизацию. Фактически, это обычный словарь, в котором ключом является группа, а значением — наиболее дешевый порядок Join этой группы.
Таким образом, Trino использует cost-based оптимизацию для выбора оптимального порядка Join.
Алгоритм
ReorderJoins
имеет ряд недостатков, про которые мы поговорим в отдельной статье.
Динамические фильтры
Динамические фильтры — это оптимизация, которая позволяет сформировать дополнительный предикат на одной из сторон Join
во время выполнения запроса, и передать его другой стороне Join
для более оптимального сканирования задействованных таблиц.
Мы подробно рассказали о данной оптимизации в нашем прошлом блоге. Поэтому ограничимся лишь упоминанием того, что основная часть соответствующей логики реализована с помощью паттерна visitor (см. PredicatePushDown). Это хороший пример оптимизации, которую затруднительно делать через правила, так как нам нужен доступ ко всему плану, а не отдельным его частям.
Заключение
Trino — это массивно-параллельный движок для анализа больших данных из различных источников.
Оптимизатор Trino использует дерево реляционных операторов в качестве промежуточного представления плана запроса.
Сама оптимизация разделена на множество последовательных фаз, которые постепенно трансформируют начальный план к более оптимальной форме. Большинство фаз представляют собой специализированные визиторы, либо итеративную rule-based оптимизацию. Некоторые правила, такие как ReorderJoins
дополнительно используют cost-based оптимизацию для выбора наиболее дешевого плана из множества альтернатив.
Особое внимание Trino уделяет переносу (pushdown) вычислений в источники данных, что существенно снижает время выполнения запросов. Trino использует набор правил для осуществления pushdown в процессе планирования, а так же динамические фильтры для осуществления pushdown дополнительных предикатов во время выполнения запросов.
В данной статье мы не рассмотрели еще одну важную оптимизацию — расстановку операторов
Exchange
, потому что это удвоило бы количество текста. Мы обязательно позанудствуем на эту тему в отдельной статье.
Не забудьте попробовать CedrusData, а так же подписаться на наши канал и чат про Trino и CedrusData в Telegram.
mentin
Правильно ли я понял, что всё планирование перед исполнением, и в процессе исполнения план не меняется (динамические фильтры передают фильтры, но топология похоже статически решается оптимизатором)?
devozerov Автор
Сложный вопрос :-)
Оптимизатор формирует базовую структуру плана. После этого план разбивается на так называемые фрагменты по границам Exchange. Фрагмент - это набор операторов, которые могут быть выполнены независимо.
Далее фрагменты бьются на так называемые пайплайны по границам блокирующих операторов. Например, Join. Каждый пайплайн имеет не более одного входа (таблица или корень другого фрагмента).
Описания пайплайнов рассылаются по узлам, что бы каждый узел знал, какие операции принадлежат какому пайплайну. В терминах Trino эта метаинформация называется Task.
А далее становится интересно. Trino бьет входные данные таблиц на независимые куски, называемые Split. Каждый сплит отправляется на тот или иной узел в зависимости от загрузки узла или требований плана (например, для выполнения колоцированного джоина может потребоваться группировать сплиты по узлам особым образом). Аналогично перераспределяются и сплиты, произведенные промежуточными фрагментами.
Мы в некоторой степени описали этот процесс в другом блоге https://www.cedrusdata.ru/blog/trino-massively-parallel-processing, но вообще это заслуживет отдельной статьи.
В итоге — сам план статичен, но на каких узлах выполняются отдельные его части определяется в рантайме, в зависимости от загрузки узлов и требований операторов.
Кроме того, недавно в Trino был добавлен так называемый fault-tolerant execution (aka "project Tardigrade" https://trino.io/blog/2022/05/05/tardigrade-launch.html). Он позволяет перезапускать определенные части плана в случае ошибок. Благодаря этой фиче появляются некоторые точки в коде, где можно прямо в рантайме перехватить кусочек плана и как-то его поменять. Это дает возможность делать реоптимизацию планов (по некоторым заранее определенным границам) в рантайме. Однако в том самом месте, где происходит реоптимизация сейчас стоит заглушка, мол, "вставляйте сюда код runtime-реоптимизации, если хотите" :-)