FYI: Первая часть.

Бенчмарк пакетного конвейера


Пакетный конвейер обрабатывает конечный объём сохранённых данных. Здесь нет потока результатов обработки, выходные данные агрегирующей функции нужно применить ко всему набору данных. Это меняет требования к производительности: задержка — ключевой фактор при потоковой обработке — здесь отсутствует, потому что мы обрабатываем данные не в реальном времени. Единственная важная метрика — общее время работы конвейера.

Поэтому мы выбрали Parallel. На первом этапе тестирования, при работе на одной ноде, этот сборщик действительно показал лучшую пропускную способность (но только после настройки). Однако это было получено ценой длительных пауз. Если одна из нод кластера останавливается на сборку мусора, это стопорит весь конвейер. А поскольку ноды собирают мусор в разное время, общее время сборки увеличивается с добавлением каждой ноды к кластеру. Мы проанализировали этот эффект, сравнив результаты тестирования на одной ноде и на кластере из трёх нод.

Кроме того, на этом этапе мы не рассматривали экспериментальные сборщики с низкой задержкой. Их очень короткие паузы не влияют на результаты тестирования, к тому же это достигается за счёт пропускной способности.

Тестирование на одной ноде: конвейер


Для пакетного бенчмарка на одной ноде мы использовали простой конвейер. Полный код здесь.

p.readFrom(longSource)
 .rebalance() // Introduced in Jet 4.2
 .groupingKey(n -> n % NUM_KEYS)
 .aggregate(summingLong(n -> n))
 .filter(e -> (e.getKey() & 0xFF_FFFFL) == 0)
 .writeTo(Sinks.logger())

Источником является заглушка, которая генерирует последовательность чисел типа long. Ключевая функция определена так, что groupingKey циклически проходит по пространству ключей: 0, 1, 2, ..., NUM_KEYS, 0, 1, 2,… Это означает, что в течение первого цикла конвейер видит все ключи и создаёт фиксированную структуру данных для хранения результатов агрегирования. А в течение следующих циклов система лишь обновляет имеющиеся данные. Это полностью соответствует гипотезе о сборке мусора с учётом разных поколений: объекты либо проходят через все вычисления, либо существуют недолго и становятся мусором вскоре после своего создания.

Наш источник сгенерировал 400 млн элементов, и мы создали 100 млн отдельных ключей, то есть прошли четыре раза по всем ключам.

Оператор .rebalance() заменяет используемое по умолчанию в Jet двухэтапное агрегирование на одноэтапное. Это сделало поведение движка в наших бенчмарках более предсказуемым.

Также мы протестировали вариант, при котором операция агрегирования использует в качестве состояния экземпляр объекта типа Long и генерирует мусор при каждом обновлении текущего счёта. В этом случае многие объекты умирают после того, как просуществуют приличное время в старом поколении. В этой ситуации нам пришлось уменьшить количество ключей до 70 млн, потому что при 100 млн нагрузка на сборщик была слишком высокой.

Мы не ориентировались на сборщики с низкой задержкой, потому что они ничего не могут предложить в случае с пакетным конвейером. Поскольку мы уже видели, что JDK 14 работает почти как JDK 11, то прогнали один тест для подтверждения этого. А затем сосредоточились на JDK 8 и JDK 11. И также сравнили с G1 используемый в JDK 8 по умолчанию сборщик Parallel.

Тестирование на одной ноде: результаты


Мы прогнали бенчмарк на ноутбуке с 16 Гб ОЗУ и 6-ядерным Intel Core i7. Размер кучи был 10 Гб.

Сначала из-за Parallel производительность была очень плохой, и нам пришлось настраивать сборку мусора. Поэтому мы очень рекомендуем использовать VisualVM и плагин Visual GC. Если задать максимальную частоту кадров (10 к/с.), то можно насладиться очень подробной визуализацией взаимосвязи между выделением памяти вашего приложения и работой сборщика мусора. Понаблюдав за анимацией, мы поняли, что главной проблемой были слишком большие фрагменты памяти, выделяемые для нового поколения. По умолчанию отношение старого поколения к новому равно всего 2:1, и в течение исполнения оно не меняется динамически. Поэтому мы решили применить настройку -XX:NewRatio=8, и это изменило всю картину. Теперь Parallel работал лучше всего. Также мы применили -XX:MaxTenuringThreshold=2 для уменьшения копирования данных между пространствами survivor space’ами, потому что временные объекты быстро умирают в конвейере.

Теперь о результатах. Единственной подходящей метрикой пакетного конвейера является длительность выполнения задачи. Чтобы визуализировать результаты, мы взяли их обратные величины, поэтому на графике показана пропускная способность в элементах в секунду. Для одной ноды:



Агрегирование без сборки мусора работает примерно на 30-35 % быстрее, несмотря на более крупный набор ключей. Хуже всего отработала связка G1 и JDK 8, а лучше всего — настроенный Parallel и JDK 11. Не сильно уступила связка G1 и JDK 11. Обратите внимание, что мы не трогали конфигурацию G1, это важное замечание. Настройка сборки мусора сильно зависит от ситуации. Результаты могут сильно меняться, например, при увеличении количества данных. И настраивать нужно для всего кластера, под конкретный вид нагрузки.

Вот производительность используемого по умолчанию сборщика Parallel по сравнению с настроенной версией, которую мы применяли при тестировании:


При куче размером 10 Гб результаты очень плохие. Система застревала на полных паузах на сборку, каждая из которых занимала около 7 с. При увеличении кучи ситуация улучшилась, но всё равно циклы полной сборки происходили очень часто. Обратите внимание, что эти результаты получены для самого частого случая — агрегирования без сборки.

Тестирование на трёх нодах: конвейер


Для правильного тестирования кластера пришлось использовать более сложный конвейер:

p.readFrom(longSource())
 .rebalance()
 .flatMap(n -> {
     Long[] items = new Long[SOURCE_STEP];
     Arrays.setAll(items, i -> n + i);
     return traverseArray(items);
 })
 .rebalance()
 .groupingKey(n -> n % NUM_KEYS)
 .aggregate(AggregateOperations.summingLong(n -> n))
 .filter(e -> e.getKey() % 1_000_000 == 0)
 .writeTo(Sinks.logger())
;

Поскольку источник работает не параллельно, мы сделали некоторые оптимизации, чтобы он не превратился в узкое место системы. Источник генерирует числа 0, 10, 20,…, а мы распараллелили этап flatMap, на котором интерполируются недостающие числа. Также между источником и flatMap мы использовали rebalance(), распределяя данные по кластеру. А перед началом основного этапа (агрегирования по ключу) мы снова сделали перебалансировку. После агрегирования мы сначала оставляем только каждую миллионную пару ключ-значение, а затем передаём их в логгер. Применялся миллиард элементов данных и набор из 500 млн ключей.

Как и в случае с одной нодой, мы протестировали конвейер с агрегированием без мусора и с ним.

Тестирование на трёх нодах: результат


Мы прогнали этот бенчмарк на кластере AWS из трёх экземпляров c5d.4xlarge. У них было 16 виртуализированных процессорных ядер и 32 Гб памяти. Пропускная способность канала 10 Гбит/с. Результат:



Вскользь отметим примерно трёхкратное общее увеличение пропускной способности по сравнению с одной нодой. Это последствие распределённой обработки. А что касается сборщиков, то победителем обоих тестов стала связка G1 и JDK 11. Другой поразительный результат — почти неработоспособная связка G1 и JDK 8. Однако у этого есть более глубокие причины, которые влияют и на другие измерения. Например, очевидное преимущество Parallel на JDK 8 и JDK 11. Это связано с эффектом, который мы отметили в самом начале: как только любая нода встаёт на паузу для сборки мусора, останавливается обработка на всём кластере. А G1 на JDK 8 встаёт на очень длинные паузы, больше минуты. Этого достаточно, чтобы детектор сбоев в кластере сработал и решил, что нода умерла. Задача сбоит, кластер переформирует себя, и задача запускается заново уже на двух нодах. Это ещё быстрее приводит к новому сбою, потому что каждая нода теперь обрабатывает больше данных. В то же время выброшенная нода присоединяется снова, и задача снова начинается на двух нодах, но уже других. Возникает бесконечная петля перезапуска задач.

Паузы Parallel не такие длинные, чтобы развалить кластер, но он сработал гораздо хуже на в тестах на одной ноде. На трёх нодах он уступил связке G1 и JDK 11 на 30 %. А на более крупных кластерах ситуация будет ещё хуже.

Если посмотреть на все тесты, то удивительно, что Parallel на JDK 8 работает быстрее, чем на JDK 11. Однако это связано с очень удачным совпадением: в этих тестовых прогонах полные паузы начинались синхронно на всех узлах, что распараллелило работу сборщика. Очевидно, что на этот эффект нельзя полагаться.

Даже не смотря на то, что на нашей тестовой конфигурации при использовании Parallel мы не наблюдали катастрофического влияния длинных пауз на стабильность кластера, это очень вероятный сценарий. В других тестах, когда мы увеличивали кучу и количество данных, либо оставляли такую же кучу, но уменьшали операционный запас, Parallel действовал столь же разрушительно. Но даже когда он не приводил к сбоям, на графиках исчезало его преимущество при работе на одной ноде. Можно ожидать, что ситуация будет ухудшаться с добавлением в кластер новых нод.

С другой стороны, связка G1 и JDK 11 работала с достаточно короткими паузами, поэтому конвейер не останавливался. В нём есть механизм, который смягчает короткие простои, и пока паузы не превышают 150 мс, сборка мусора оказывает только локальное влияние.