Рассмотрим, как реализован SortMergeJoin в Apache Spark, и заодно заглянем в исходный код на GitHub. Spark написан на языке Scala, и вся логика работы оператора доступна в открытом репозитории проекта. Вот здесь :)

Первое, что рассмотрим - это конструктор кейс-класса

1. Конструктор SortMergeJoinExec

case class SortMergeJoinExec(
    leftKeys: Seq[Expression],    // Ключи соединения (например, col("id"))
    rightKeys: Seq[Expression],
    joinType: JoinType,           // Тип соединения: Inner, LeftOuter и т.д.
    condition: Option[Expression],// Доп. условия (col("flag") === "yes")
    left: SparkPlan,              // Левый дочерний план
    right: SparkPlan,             // Правый дочерний план
    isSkewJoin: Boolean = false   // Обработка перекосов данных
) extends ShuffledJoin
  • leftKeys, rightKeys: выражения (колонки), по которым происходит соединение.

  • joinType: тип соединения (inner, left outer, right outer, full outer и т.д.).

  • condition: дополнительное условие соединения (например, a.id = b.id AND a.flag = 'yes').

  • left, right: левые и правые под-планы (другие SparkPlan'ы, поддеревья выполнения).

  • isSkewJoin: флаг, указывающий, нужно ли учитывать перекос (skew) в данных (редкая, но важная оптимизация).

extends ShuffledJoin означает, что соединение происходит с предварительным перемешиванием данных (shuffle), чтобы обеспечить одинаковые ключи на соответствующих партициях.

2. Ключевые методы

a) getKeyOrdering

  • Он вызывает requiredOrders для ключей левого и правого входов (leftKeys, rightKeys).

  • Возвращает список списков сортировок — один для левой стороны, один для правой.

Иными словами: перед выполнением операции, Spark попросит левые и правые данные быть отсортированными по их ключам.

b) requiredChildOrdering

def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq(
  leftKeys.map(SortOrder(_, Ascending)),  // Сортировка левой таблицы
  rightKeys.map(SortOrder(_, Ascending))  // Сортировка правой таблицы
)

Определяет порядок, необходимый для ключей соединения в левом и правом отношениях

c) getSpillThreshold

Возвращает порог спиливания данных на диск из конфигурации.

Назначение:
Определяет порог, при превышении которого данные начинают сбрасываться (spill) на диск для избежания OutOfMemoryError.

Технические детали:

  • Конфигурируется через параметр spark.sql.sortMergeJoinExec.buffer.spill.threshold (по умолчанию 4096).

  • Значение представляет собой максимальное количество строк в буфере перед сбросом на диск.

  • Важность: Балансирует между использованием оперативной памяти и диска, позволяя обрабатывать большие датасеты без перегрузки памяти.

private val spillThreshold: Int = 
  sqlContext.conf.sparkSqlSortMergeJoinExecBufferSpillThreshold

d) onlyBufferFirstMatchedRow

Контекст: Оптимизация для LEFT SEMI JOIN и ANTI JOIN.

Механика:

  • Для LEFT SEMI JOIN достаточно найти хотя бы одно совпадение в правой таблице.

  • Для ANTI JOIN (например, LEFT ANTI) нужно убедиться в отсутствии совпадений.

Реализация:

private val onlyBufferFirstMatchedRow: Boolean =
  joinType.isInstanceOf[LeftExistence] && condition.isEmpty

Эффект: вместо буферизации всех совпадающих строк сохраняется только первая, что:

  • Сокращает использование памяти.

  • Ускоряет обработку, так как не требуется перебор всех совпадений.

3. Генерация кода (Codegen)

a) Создание ключей

val leftKey = BindReferences.bindReference(leftKeys.head, left.output)
val rightKey = BindReferences.bindReference(rightKeys.head, right.output)

Использует BindReferences для статического определения смещений столбцов.

b) Сравнение ключей

Генерируется код для сравнения ключей:

// Пример сгенерированного кода для ключей (a, b):
int comp = 0;
comp = left_a - right_a;
if (comp == 0) {
  comp = left_b - right_b;
}
return comp;

c) SortMergeJoinScanner

Обрабатывает основные сценарии соединения:

val scanner = new SortMergeJoinScanner(
  leftKeyGenerator, 
  rightKeyGenerator, 
  keyOrdering, 
  leftIter, 
  rightIter, 
  inMemoryThreshold, 
  spillThreshold
)
  • Компоненты:

    • leftKeyGenerator/rightKeyGenerator: генераторы ключей для обеих таблиц.

    • keyOrdering: порядок сортировки ключей (ASC/DESC).

    • leftIter/rightIter: итераторы отсортированных данных.

    • Пороги (inMemoryThreshold, spillThreshold): управление памятью.

  • Логика: Сканирует обе таблицы, находит совпадения ключей и управляет продвижением указателей.

4. Алгоритм работы

Этап 1: Сортировка

Каждая партиция сортируется по ключам соединения:

val leftSorted = left.execute().asInstanceOf[RDD[InternalRow]]
val rightSorted = right.execute().asInstanceOf[RDD[InternalRow]]

Используется Tungsten Sort (внекучная сортировка для эффективности).

Этап 2: Слияние

Двухсторонний алгоритм "двух указателей":

while (currentLeftRow != null && currentRightRow != null) {
  compareKeys() match {
    case 0 => 
      bufferMatches()  // Буферизация совпадений
      generateJoinRow() // Генерация результата
    case -1 => advanceLeft()  // Левый ключ меньше
    case 1 => advanceRight()  // Правый ключ меньше
  }
}

Как работает алгоритм?
Исходные данные:

1) Два отсортированных набора данных: left (левый) и right (правый).

2) Указатели: currentLeftRow (текущая строка слева), currentRightRow (текущая строка справа).

Сравнение ключей:

compareKeys() сравнивает ключи текущих строк:

1) 0: ключи равны → совпадение.

2) -1: ключ слева меньше → двигаем левый указатель.

3) 1: ключ справа меньше → двигаем правый указатель.

Простой схематичный пример:

Left:   [1, 3, 5, 7]
Right:  [2, 3, 6, 7]

Сравнения:
1 vs 2 → 1 < 2 → двигаем left
3 vs 2 → 3 > 2 → двигаем right
3 vs 3 → совпадение! → объединяем строки
5 vs 6 → 5 < 6 → двигаем left
7 vs 6 → 7 > 6 → двигаем right
7 vs 7 → совпадение! → объединяем строки

------------------------------

Подытожим, как работает Sort Merge Join в Apache Spark.

Sort Merge Join — это алгоритм соединения данных, оптимизированный для работы с большими наборами. Он сочетает сортировку и слияние данных, обеспечивая стабильность и поддержку всех типов соединений. Рассмотрим его работу поэтапно.

1) Перетасовка

Данные перераспределяются (перетасовываются) между узлами, чтобы строки с одинаковым ключом находились на одном и том же исполнителе.

2) Сортировка

Каждый DataFrame сортируется по ключу (ключам) объединения, обеспечивая выравнивание строк с одинаковыми ключами. Сортировка распределяется по кластеру с использованием параллельной обработки Spark.

Перед соединением Spark выполняет:

  • Сортировку левой таблицы по ключам соединения (например, user_id).

  • Сортировку правой таблицы по тем же ключам.

Пример плана выполнения:

// Левый поток
Sort [user_id#10 ASC]
+- Exchange hashpartitioning(user_id#10, 200)
   +- Scan table A

// Правый поток
Sort [user_id#20 ASC]
+- Exchange hashpartitioning(user_id#20, 200)
   +- Scan table B

3) Слияние

После сортировки начинается этап слияния, реализованный в SortMergeJoinExec.

a) Алгоритм «двух указателей»:

  • Левый итератор читает отсортированные данные из левой таблицы.

  • Правый итератор читает отсортированные данные из правой таблицы.

  • Ключи сравниваются последовательно:

while (leftRow != null && rightRow != null) {
  val comp = compare(leftKey, rightKey)
  comp match {
    case 0 =>  // Ключи совпали
      generateJoinedRows()  // Создать все комбинации совпадений
    case -1 => // Левый ключ меньше: продвинуть левый итератор
      leftRow = leftIterator.next()
    case 1 =>  // Правый ключ меньше: продвинуть правый итератор
      rightRow = rightIterator.next()
  }
}

b) Обработка совпадений

  • При совпадении ключей (comp == 0) все строки с одинаковыми ключами буферизуются.

  • Для Inner Join генерируются все комбинации строк.

  • Для Left Outer Join сохраняются все строки левой таблицы, даже без совпадений.

Поддержка разных типов соединений

Тип соединения

Логика работы

Inner Join

Только строки с совпадающими ключами.

Left Outer

Все строки левой таблицы + null для правой части при отсутствии совпадений.

Full Outer

Все строки обеих таблиц + null для отсутствующих совпадений.

Semi Join

Возвращает строки левой таблицы, где есть хотя бы одно совпадение в правой.

Anti Join

Возвращает строки левой таблицы, где нет совпадений в правой.

Оптимизация AQE при SortMergeJoin в Spark

Адаптивное выполнение запросов (AQE) в Spark представляет собой механизм оптимизации, который действует во время выполнения запросов. В отличие от классического подхода, где план выполнения формируется заранее на основе приблизительных оценок, AQE реагирует на реальные данные по мере их обработки. Это позволяет динамически менять стратегию выполнения запроса, чтобы лучше учитывать особенности данных — такие как уменьшившийся объём, перекос распределения или сложные типы объединений. Используя точки перераспределения данных (shuffle), система может в нужный момент пересмотреть и усовершенствовать текущий план, что позволяет значительно повысить производительность и снизить затраты на вычисления в распределённой среде Spark.

Сравнение с другими алгоритмами

Критерий

Sort Merge Join

Broadcast Join

Hash Join

Размер данных

Большие (TB+)

Малая правая таблица (~МБ)

Средние (ГБ)

Память

Низкий расход

Высокий (кэширование)

Зависит от хэш-таблицы

Сортировка

Обязательна

Не требуется

Не требуется

Типы соединений

Все

Inner, Left

Inner, Left

Когда использовать Sort Merge Join?

  • Большие таблицы, которые не помещаются в память.

  • Full Outer Join или соединения со сложными условиями.

Итог

Sort Merge Join — это надежный алгоритм для обработки больших данных в Spark. Он гарантирует стабильную работу даже на кластерах с ограниченной памятью, поддерживает все типы соединений и интегрирован с ключевыми оптимизациями Spark (Tungsten, AQE). Однако из-за накладных расходов на сортировку он уступает в скорости BroadcastJoin для малых таблиц.

Пример построенного плана выполнения в Spark UI:

План SortMergeJoin из Spark UI
План SortMergeJoin из Spark UI

Упрощенно получаем такую схему работы SortMergeJoin:

Схема SortMergeJoin
Схема соединения SortMergeJoin

Комментарии (0)