Проблема "статического планирования" в Spark

Представим ситуацию: мы спланировали маршрут до точки назначения, предположили, по какой дороге будет быстрее добраться, даже вспомнили, где обычно бывают пробки, чтобы их объехать. Но, неожиданно, на самом свободном участке образовался затор из-за аварии в правом ряду. В этот момент понимаем, что лучше бы мы ехали по навигатору, и какая-нибудь “Анфиса” предупреждала о дорожной ситуации, чтобы в определенный момент можно было изменить траекторию движения. Именно так годами чувствовали себя пользователи Spark, когда их красиво оптимизированные запросы наталкивались на суровую реальность распределенных данных.

Ограничения до Spark 3.0

До версии 3.0 оптимизатор Spark работал как навигатор, строящий маршрут без знания реальной ситуации на дорогах. Решения принимались на этапе компиляции запроса на основе:

  • Статистики таблиц (если она была собрана)

  • Эвристических правил

  • Предположений о распределении данных

Но в мире Big Data предположения часто оказываются ошибочными.

Например, кажется, что следующий запрос оптимален:

SELECT users.country, COUNT(orders.order_id)
FROM users JOIN orders ON users.id = orders.user_id
GROUP BY users.country

Но на практике выясняется:

  • 80% заказов принадлежат только 5% пользователей

  • Одна партиция обрабатывается 10 минут, остальные — по 10 секунд

  • Ресурсы простаивают, время выполнения зашкаливает

Революция: Adaptive Query Execution

Spark 3.0 принес решение — Adaptive Query Execution (AQE). Это не просто очередная оптимизация, а смена парадигмы: AQE превращает Spark из "слепого штурмана" в "опытного таксиста", который перестраивает маршрут, видя реальные пробки.

Три ключевые оптимизации AQE

  1. Skew Join Optimization (Оптимизация скошенных соединений)

  2. Coalesce Shuffle Partitions (Объединение партиций)

  3. Join Strategy Switch (Смена стратегии соединений)

Рассмотрим механизм оптимизации скошенных соединений на примере исходного кода: OptimizeSkewedJoin.scala

1. Сбор и анализ статистики

Первое и ключевое — это сбор реальной статистики во время выполнения.

В отличие от статического оптимизатора AQE собирает актуальную статистику прямо во время выполнения запросов:

case class MapOutputStatistics(
  shuffleId: Int,
  bytesByPartitionId: Array[Long]  // Размеры каждой партиции в байтах
)

Получаем реальные размеры партиций после выполнения Stage 0

val leftActualSizes = left.partitionsWithSizes.map(_._2)  // [45MB, 52MB, 48MB, 980MB, 50MB]
val rightActualSizes = right.partitionsWithSizes.map(_._2) // [42MB, 38MB, 45MB, 920MB, 40MB]

Вычисляем медианные размеры

val leftMedSize = medianSize(leftActualSizes)  // ~48MB
val rightMedSize = medianSize(rightActualSizes) // ~42MB

2. Обнаружение скошенных партиций

private def isSkewed(size: Long, medianSize: Long): Boolean = {
  size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
	size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}

Практический пример:

val sizes = Array(45, 52, 48, 980, 50) // в MB

Вычисляем медиану

val medianSize = medianSize(sizes) // = 48MB

Применяем функцию к каждой партиции, учитывая, что SKEW_JOIN_SKEWED_PARTITION_THRESHOLD (максимальный размер раздела) по умолчанию 256MB, а SKEW_JOIN_SKEWED_PARTITION_FACTOR вычисляем по формуле: 48MB × 5.0 = 240MB

Partition 0 (45MB): 45 > 240 && 45 > 256 => false
Partition 1 (52MB): 52 > 240 && 52 > 256 => false 
Partition 2 (48MB): 48 > 240 && 48 > 256 => false
Partition 3 (980MB): 980 > 240 && 980 > 256 => true => скошенная
Partition 4 (50MB): 50 > 240 && 50 > 256 => false

Двойное условие защищает от ложных срабатываний

// Для равномерно распределенных больших партиций
val uniformLarge = Array(280L, 290L, 270L, 300L, 310L) // в MB
val median = medianSize(uniformLarge) // 290MB
// Проверяем Partition 4 (310MB):
// 310 > 290 × 5.0 = 1450 => false
// 310 > 256 => true
// Результат: false => не скошенная

Процесс оптимизации скошенной партиции

1. Определение стратегии разбиения

Spark вычисляет, на какое количество подпартиций нужно разбить скошенную партицию:

val splitPoints = {
  // Вычисляем целевой размер подпартиции (медианный размер)
  val targetSize = Math.max(medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR),
                           conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD))
 
  // Определяем количество splits
  val numSplits = (size / targetSize).ceil.toInt
 
  // Генерируем равномерные точки разбиения
  (0 until numSplits).map { i =>
	start + (i * (end - start)) / numSplits
  }
}

На примере партиции 2100MB из коллекции [128MB, 135MB, 2100MB, 140MB, 150MB]

  • Медиана = 140MB

  • Целевой размер = max(140 * 5, 256) = max(700, 256) = 700MB

  • Количество splits = ceil(2100 / 700) = 3 подпартиции

2. Разбиение данных скошенной партиции

Spark физически разделяет данные большой партиции на несколько меньших:

// Было: одна задача обрабатывает 2100MB
Partition 2: [все данные 2100MB] → Медленная задача
 
// Стало: три задачи обрабатывают по ~700MB каждая
Partition 2.1: [~700MB]
Partition 2.2: [~700MB]
Partition 2.3: [~700MB]

Получаем более равномерное распределение по партициям => равномерное распределение ресурсов.

3. Репликация соответствующих данных с другой стороны

Это самый важный аспект - для сохранения корректности JOIN:

// Для левой стороны (скошенная партиция)
val leftPartitions = Array(
  Partition 0: 128MB, 	// без изменений
  Partition 1: 135MB, 	// без изменений 
  Partition 2.1: 700MB,   // разбита
  Partition 2.2: 700MB,   // разбита
  Partition 2.3: 700MB,   // разбита
  Partition 3: 140MB, 	// без изменений
  Partition 4: 155MB  	// без изменений
)
 
// Для правой стороны (репликация соответствующих данных)
val rightPartitions = Array(
  Partition 0: 120MB, 	// без изменений
  Partition 1: 130MB, 	// без изменений
  Partition 2: 125MB, 	// РЕПЛИЦИРОВАНА 3 РАЗА!
  Partition 2_copy1: 125MB,
  Partition 2_copy2: 125MB,
  Partition 2_copy3: 125MB,
  Partition 3: 150MB,	// без изменений
  Partition 4: 145MB  	// без изменений
)

4. Перепланирование выполнения JOIN

Теперь каждая подпартиция соединяется с соответствующей реплицированной партицией:

-- Изначальный план выполнения
Left Partition 2 (2100MB) JOIN Right Partition 2 (125MB)
-- Одна медленная задача
 
-- Новый план выполнения 
Left Partition 2.1 (700MB) JOIN Right Partition 2_copy1 (125MB)
Left Partition 2.2 (700MB) JOIN Right Partition 2_copy2 (125MB) 
Left Partition 2.3 (700MB) JOIN Right Partition 2_copy3 (125MB)
-- Три задачи, выполняемых параллельно

5. Визуализация процесса

Решение проблемы "магического числа" партиций.

Проблема статического подхода

В традиционном подходе Spark сталкивается с фундаментальной проблемой: система определяет оптимальное количество партиций для начального этапа, но для последующих этапов использует эмпирическое значение по умолчанию — 200 партиций (которое в целом можно изменить, но только в статическом варианте).

Это создает три серьезных ограничения:

1.    Производительность: 200 редко бывает идеальным числом, хотя количество партиций критически влияет на эффективность обработки.

2.    Эффективность хранения: при сохранении результатов образуется 200 отдельных файлов, создавая избыточную нагрузку.

3.    Потерянный потенциал: неоптимальная конфигурация ограничивает возможности дальнейших оптимизаций.

Ограничения ручной настройки

Хотя разработчик может скорректировать параметры:

spark.conf.set("spark.sql.shuffle.partitions", "2000")

Этот подход создает новые проблемы:

  • Операционные издержки: постоянная ручная настройка непрактична

  • Устаревание конфигураций: оптимальные значения меняются с эволюцией данных

  • Глобальное воздействие: изменения затрагивают все этапы перемешивания в запросе

Фундаментальная проблема — асимметрия информации: перед первым этапом Spark знает о распределении данных, но для последующих этапов эта информация становится доступной только после выполнения предыдущих шагов.

Динамическая адаптация стратегий соединения

AQE автоматически преобразует соединения sort-merge в broadcast hash-соединения, когда статистика выполнения показывает, что размер одной из сторон ниже установленного порога. Это было и раньше, но теперь Spark собирает статистику в процессе выполнения, в рамках текущей spark-сессии.

Преимущества:

  • Автоматическое определение оптимальной стратегии соединения

  • Избегание дорогостоящих shuffle-операций, когда это возможно

  • Динамическая адаптация к реальному распределению данных

Заключение

Adaptive Query Execution представляет собой фундаментальный сдвиг в архитектуре Spark — от статического планирования до динамической адаптации. Благодаря AQE Spark теперь может:

  • Интеллектуально обнаруживать и оптимизировать скошенные соединения

  • Динамически настраивать количество партиций на основе реальных данных

  • Автоматически выбирать оптимальные стратегии соединений

P.S. Конечно, Spark не идеален, но мне нравятся подходы его разработчиков, зачастую внедряющих простые, понятные и эффективные алгоритмы.

Комментарии (0)