Из нашей повседневной практики доподлинно известно, что массивно(массово?)-параллельные вычисления это круто. Но что именно означает этот термин, и как "массивность" и "параллельность" реализованы в конкретной системе? В данной статье мы ответим на оба вопроса, проанализировав внутреннюю архитектуру популярного MPP-движка для больших данных Trino.

Что такое MPP?

Популярные термины зачастую скрывают за собой вязкую смесь технологий и маркетинга, за которыми не всегда легко уловить суть. Напишем в поисковике запрос "massively parallel processing" и прочитаем несколько технических статей от известных вендоров.

Компания IBM пишет:

Parallel processing environments are categorized as symmetric multiprocessing (SMP) or massively parallel processing (MPP) systems.

In a symmetric multiprocessing (SMP) environment, multiple processors share other hardware resources.

In a massively parallel processing (MPP) system, many computers are physically housed in the same chassis.

Все неплохо, но причем тут "chassis"? Далее в этой же статье:

In an MPP environment, performance is improved because no resources must be shared among physical computers.

In an MPP system, a file system is commonly shared across the network.

Стоп, стоп! Так shared или не shared?

Ладно, посмотрим на документацию Greenplum:

MPP (also known as a shared nothing architecture) refers to systems with two or more processors that cooperate to carry out an operation, each processor with its own memory, operating system and disks

Здесь коллеги из Pivotal скромно ставят знак равенства между "massively parallel processing" и "shared nothing", что конечно же некорректно в 2023 году. Например, как нам быть со Snowflake, который тоже заявляет о своей "массивности", но не использует shared-nothing архитектуру?

Snowflake’s architecture is a hybrid of traditional shared-disk and shared-nothing database architectures. Similar to shared-disk architectures, Snowflake uses a central data repository for persisted data that is accessible from all compute nodes in the platform. But similar to shared-nothing architectures, Snowflake processes queries using MPP (massively parallel processing) compute clusters where each node in the cluster stores a portion of the entire data set locally. 

На самом деле, за всем многообразием MPP-систем скрывается компактный общий знаменатель, хорошо описанный в Википедии:

Массово-параллельная архитектура — класс архитектур параллельных вычислительных систем. Особенность архитектуры состоит в том, что память физически разделена.

Еще одно емкое определение от Microsoft:

Massively Parallel Processing (MPP) is the coordinated processing of a single task by multiple processors, each processor using its own OS and memory and communicating with each other using some form of messaging interface.

Иными словами, это MPP — это несколько машин, скоординированно выполняющих одну и ту же задачу. Очевидно, что под это определение подходит большое количество продуктов порой с совершенно разными архитектурами и характеристиками.

Рассмотрим реализацию идей MPP на примере Trino.

Что такое Trino?

Trino — это распределенный аналитический SQL-движок для выполнения федеративных запросов. Технически Trino представляет собой набор вычислительных узлов, которые подключаются ко внешним источникам данных. При получении SQL-запроса, Trino определяет, какие источники данных задействованы, подтягивает необходимые данные из источников, после чего осуществляет финальную обработку в кластере и отдает результат пользователю.

Таким образом, Trino можно отнести к классу MPP-движков (много машин) с shared storage (данные находятся удаленно) архитектурой. Ну а наша команда занимается разработкой продукта CedrusData — форк Trino на стероидах для российского рынка, где нашим основным фокусом на текущий год является улучшение производительности.

Реализация MPP в Trino

Одна из важных "фишек" Trino — возможность запускать SQL-запросы к озерам данных (напр., S3-совместимому хранилищу). Рассмотрим реализацию MPP в Trino на примере SQL-запроса к набору файлов Parquet в озере данных.

Apache Parquet - это популярный колоночный формат хранения данных. Отдельный файл Parquet хранит набор записей. Большие датасеты обычно организованы в несколько файлов Parquet. Аналитические движки вроде Trino или Apache Spark рассматривают совокупность таких файлов как единую логическую "таблицу".

Рассмотрим выполнение следующего SQL-запроса к озеру данных (схема TPC-DS):

SELECT d_year, d_moy, SUM(ss_net_paid)
FROM store_sales JOIN date_dim ON ss_sold_date_sk = d_date_sk
GROUP BY d_year, d_moy
WHERE ss_customer_sk = <...>

В данном запросе мы считаем объем продаж по месяцам для определенного клиента, соединяя таблицу фактов store_sales со справочником date_dim. Таблица store_sales представляет собой большое количество Parquet файлов, партиционированных по дате. Таблица date_dim представлена единственным файлом Parquet.

├── date_dim
    └── part1.parquet
├── store_sales
│   ...
│   ├── ss_sold_date_sk=2450836
│   │   ├── part1.parquet
│   │   └── part2.parquet
│   ├── ss_sold_date_sk=2450837
│   │   ├── part1.parquet
│   │   ├── part2.parquet
│   │   └── part3.parquet
│   ...   

Параллелизм фрагментов

Распределенные SQL-движки выполняют запросы, задействуя несколько узлов. Для выполнения некоторых операций узлы должны обмениваться друг с другом промежуточными результатами. Например, для выполнения операции агрегации необходимо убедиться, что все записи с одними и теми же значениями атрибутов GROUP BY оказались на одном узле.

Для организации выполнения распределенного запроса планировщик движка анализирует, какое распределение данных необходимо конкретным операторам, и вставляет специальные операторы Exchange (Shuffle), которые перераспределяют данные в кластере по мере необходимости. Далее план запроса "нарезается" на фрагменты по границам Exchange таким образом, что каждый фрагмент имеет строго один "выход" и один или несколько "входов" (таблица или другой фрагмент). Подробнее про операторы Exchange/Shuffle можно почитать в нашем блоге.

В случае Trino план до оптимизации будет выглядеть следующим образом:

Aggregation
  Join
    TableScan[store_sales]
    TableScan[date_dim]

За расстановку операторов Exchange в Trino отвечает правило AddExchanges, которое определяет требования операторов к распределению входящего потока данных, и вставляет Exchange там, где требуемое и текущее распределения не совпадают.

К слову, в общем случае существует несколько допустимых расстановок Exchange для конкретного SQL-запроса (подумайте, какие есть альтернативные расстановки в рассматриваемом запросе, и напишите их в комментариях). Стоимость выполнения планов с разными расстановками Exchange может сильно варьировать, а значит нам нужна cost-based оптимизация. К сожалению, Trino расставляет Exchange эвристически. Нам это не нравится, поэтому в CedrusData мы делаем новый cost-based оптимизатор на основе алгоритма Cascades (пользуясь случаем, передаем привет Apache Calcite и gporca).

Вернемся к основной теме. План Trino после работы правила AddExchanges:

Aggregation[FINAL]              // Финальная агрегация
  Exchange[HASH[d_year, d_moy]] // Перераспределение по [d_year, d_moy]
    Aggregation[PARTIAL]        // Предварительная агрегация по [d_year, d_moy]
      Join
        Filter[ss_customer_sk]
          TableScan[store_sales]
        Exchange[BROADCAST]     // Доставить данные из date_dim к store_sales
          TableScan[date_dim]

План после разбиения на фрагменты (см. PlanFragmenter):

-- Fragment 1: 
Exchange[BROADCAST]
  TableScan[date_dim]

-- Fragment 2:
Exchange[HASH[d_year, d_moy]]
  Aggregation[PARTIAL]
    Join
      Filter[ss_customer_sk]
        TableScan[store_sales]
      RemoteFragment[id=1]

-- Fragment 3:
Aggregation[FINAL]
  RemoteFragment[id=2]

На данном этапе мы впервые задумываемся о параллелизме. В простейшем случае мы можем выполнять фрагменты один за другим согласно графу зависимостей: 1->2->3 . Однако, если фильтр по ss_customer_sk является высокоселективным, более выгодной стратегией может оказаться одновременное выполнение первого и второго фрагментов.

В Trino за выбор стратегии вертикального параллелизма фрагментов отвечает компонент ExecutionPolicy.

Исторически Trino использовал наивный алгоритм, который безусловно запускал все фрагменты параллельно. Подобная стратегия плохо работает в ряде случаев, так как множество фрагментов начинают конкурировать за ограниченные ресурсы кластера (сеть, CPU, RAM). Например, это особенно актуально для динамических фильтров, про которые мы подробно поговорим в другой раз.

Сейчас Trino использует так называемый phased алгоритм. Данный алгорит старается отложить выполнение тех фрагментов, которые с большой вероятностью будут заблокированы ожиданием результатов от других фрагментов. В большинстве случаев это приводит к ускорению запросов: "нижние" фрагменты сталкиваются с меньшей конкуренцией за ресурсы, быстрее завершают свою работу, а следовательно быстрее разблокируют "родителей".

В нашем примере, если проигнорировать нюансы работы динамических фильтров (про которые, как было сказано выше, мы подробно поговорим в другой раз), Trino посчитает, что первый и второй фрагменты можно выполнять параллельно. Таким образом, сканирование обеих таблиц будет происходить одновременно. Выполнение третьего фрагмента же начнется по мере появления результатов промежуточных агрегаций из второго фрагмента.

Параллелизм чтения данных

Второй важнейший источник "массивности" параллелизма в Trino — параллельное чтение данных. Ключевой абстракцией является Split — часть данных таблицы источника, которая может быть отсканирована независимо. В процессе анализа запроса Trino обращается через специальный SPI к метаданным источника, что бы получить список сплитов для конкретной таблицы. Чем больше сплитов можно выделить для таблицы, тем выше максимально возможный уровень параллелизма чтения.

Логика разбиения таблицы на сплиты зависит от типа источника. Рассмотрим второй фрагмент, в котором происходит чтение большой таблицы store_sales, состоящей из нескольких файлов Parquet. Сначала Trino обращается к Hive Metastore и файловой системе озера (HDFS, S3, и т.п.) для получения списка файлов таблицы. В простейшем случае одному сплиту будет соответствовать один файл. Если конкретный файл является достаточно большим, имеет смысл разбить его на несколько непересекающихся диапазонов. В таком случае, для одного файла может быть создано несколько сплитов. В Trino за выбор размера сплита в озере данных отвечают параметры hive.max-initial-split-size и hive.max-split-size . По умолчанию, Trino будет стремиться "нарезать" файл на сплиты размером от 32 до 64 мегабайт. Например:

Split 1: /stores_sales/ss_sold_date_sk=2450836/part1.parquet
Split 2: /stores_sales/ss_sold_date_sk=2450836/part2.parquet [offset=0, length=32Mb]
Split 3: /stores_sales/ss_sold_date_sk=2450836/part2.parquet [offset=32Mb, length=64Mb]

Важно отметить, что такое грубое разбиение файлов на части может не соответствовать их внутренней структуре. Например, поставленное фактически наобум смещение в 32Mb почти никогда не будет совпадать с границей row group Parquet. Это не является проблемой: в процессе обработки сплита Trino корректно скорректирует границы чтения файла таким образом, что бы ничего не пропустить, и не сделать дубликаты.

В общем случае таблица может состоять из тысяч файлов, поэтому получение списка сплитов является асинхронным процессом.

По мере появления информации о новых сплитах Trino начинает рассылать запросы на обработку сплитов по узлам. При этом Trino отслеживает текущее состояние узлов, и пытается сбалансировать распределение сплитов таким образом, что бы нагрузка распределялась по узлам равномерно. Это позволяет Trino избегать ситуаций "hot partition", характерных для shared-nothing систем вроде Greenplum, когда неравномерность распределения нагрузки по партишенам (например, разные размеры партишенов или особый интерес пользователей к данным из конкретного партишена) приводит к перегрузке отдельных узлов.

Пример распределения сплитов по узлам.
Пример распределения сплитов по узлам.

Стоит отметить, что в Trino так же присутствует возможность задать более строгие правила распределения сплитов по узлам, но об этом мы подробнее поговорим в другой "другой" раз.

Локальный параллелизм

Нам осталось рассмотреть, как Trino выполняет фрагменты. Фрагмент по своей сути является набором операторов, через которые необходимо "прогнать" данные, поступающие из таблиц или других фрагментов (сплиты).

Некоторые операторы фрагмента могут быть блокирующими. Оператор называется блокирующим, если он может произвести первый результат только после получения всех входных данных от дочернего оператора. Примером такого оператора является построение хэш-таблицы для выполнения hash join. Trino предварительно разбивает фрагмент по границам блокирующих операторов на отдельные пайплайны (pipeline). Например, второй фрагмент в нашем запросе может быть разбит на два пайплайна:

Exchange[HASH[d_year, d_moy]]
  Aggregation[PARTIAL]
    Join
      Filter[ss_customer_sk]
        TableScan[store_sales]
      RemoteFragment[id=1]
Пример разбиения фрагмента на пайплайны.
Пример разбиения фрагмента на пайплайны.

После получения сплита на конкретном узле, Trino определяет, какому пайплайну он принадлежит. Далее Trino создает исполняемый экземпляр пайплайна, называемый драйвер (Driver), и ставит его в очередь на исполнение. Таким образом, в рамках одного узла Trino может одновременно выполнять несколько драйверов, которые представляют разные пайплайны, разные фрагменты и, разумеется, разные запросы. Поистине массивный параллелизм!

Драйвер представляет собой исполняемое дерево физических операторов, и сплит, который необходимо обработать. Сплиты могут иметь различное количество ассоциированных с ними данных (например, большой или маленький файлы). Кроме того, некоторые драйвера могут требовать значительно больше времени для обработки данных, чем другие. Таким образом, Trino необходимо управлять порядком и временем работы драйверов.

Для определения порядка запуска драйверов Trino использует многоуровневую priority queue (MultilevelSplitQueue). Для каждого драйвера идет подсчет суммарного времени выполнения. По мере того как драйвер накапливает время, он последовательно перемещается между уровнями очереди: от нулевого к четвертому. Планировщик стремится распределить время выполнения таким образом, что бы драйвера на нулевом уровне получали в среднем в два раза больше времени, чем драйвера на первом уровне, в четыре раза больше времени, чем драйвера на втором уровне, и т.д. Мультипликатор можно изменить с помощью параметра конфигурации task.level-time-multiplier. Таким образом, Trino отдает предпочтение более "легким" запросам, но в то же время и длительные запросы гарантированно рано или поздно получат свой квант времени.

Целевое распределение времени по уровням.
Целевое распределение времени по уровням.

Выбор очередного драйвера для исполнения состоит из двух шагов:

  1. Определить уровень, который должен получить следующий квант процессорного времени.

  2. Взять из выбранного уровня драйвер, который до сих пор получил наименьшее количество процессорного времени.

После того как драйвер выбран, начинается его выполнение. Планировщик отводит драйверу квант времени, равный одной секунде. Если квант времени истек (см. DriverYeildSignal), или драйвер не может продолжать свое выполнение (например, закончились входные данные или заполнен output buffer), выполнение драйвера останавливается, и он снова попадает в очередь на исполнение. Таким образом, блокировка драйвера не приводит к снижению общей пропускной способности узла.

Процесс повторяется до тех пор, пока драйвер не завершит обработку сплита. Квантование времени позволяет Trino гарантировать (с поправкой на непредсказуемые блокирующие операции чтения из источника), что один или несколько тяжелых запросов не смогут надолго заблокировать прогресс конкурентных задач.

Выводы

Массивно(массово?)-параллельная архитектура, в которой несколько вычислительных узлов занимаются выполнением общей задачи, является де-факто стандартом обработки больших объемов данных.

В Trino за "массивность" отвечают следующие технические решения:

  1. Вертикальный параллелизм выполнения запроса за счет разбиения на фрагменты.

  2. Вертикальный параллелизм выполнения фрагмента на узле за счет разбиения на пайплайны.

  3. Горизонтальный параллелизм выполнения пайплайнов за счет разбиения данных таблицы на сплиты, обрабатываемые драйверами.

  4. Локальный scheduler, который параллельно выполняет множество драйверов.

Все вместе это позволяет Trino быстро обрабатывать большие объемы данных. Используйте Trino для бигдаты, и бигдата скажет вам спасибо.

В следующей статье мы подробно рассмотрим реализацию одной из важнейших оптимизаций в Trino, без которой никакой MPP не сможет обеспечить нам приемлемую производительность — динамические фильтры. До встречи!

Комментарии (0)