Как следует из названия, здесь будет только про параллельное выполнение (не путать с конкурентностью). Так же я не буду затрагивать проект Loom, который «должен убить» все остальные подходы написания параллельного кода. Надеюсь изложенное ниже позволит начинающим java программистам разобраться с разными подходами и структурировать имеющиеся знания.

Начинающие java программисты, разрабатывая новые сервисы, сталкиваются с задачей выбора технологий, фреймворков и т. д. При написании параллельного кода так же есть множество различных вариантов API. Вряд ли кто-то будет создавать треды вручную, но можно использовать старый добрый ExecutorService. Можно выбрать Stream API или Reactor с его реализацией параллельной обработки. Есть ещё Akka и куча других экзотических фреймворков. Некоторые из них разработаны под набор конкретных задач, другие же вроде подходят для любых. И как оценить производительность того же Reactor против ExecutorService, или же, например, используя Stream API. Допустим вы разрабатываете простенький сервис. Нужно забрать данные из одного места, смапить их параллельно в другом месте, и положить результат в третье. Классическая задача. Давайте напишем простенький бенчмарк, чтобы понять, как себя поведут разные фреймворки на одной и той же задаче. Конечно же использовать будем JMH по заветам «Светил» отрасли. Бенчмарк будет крайне простой — есть массив данных, на нём параллельно имитируем какую-то работу, и результаты куда-то сливаем. Код выглядит примерно так:

var result = Flux.fromIterable(list)
                 .parallel(PARALLELISM, PARALLELISM)
                 .runOn(scheduler)
                 .map(Worker::call)
                 .sequential()
                 .toIterable();

Код естественно упрощён. Такой же примерно код был написан с использованием Stream API и ExecutorService с пулом фиксированной длины. Worker здесь делает всю «полезную» работу. А именно, для теста, просто жжёт циклы через стандартный интерфейс JMH – Blackhole::consumeCPU. Ну что, давайте запустим этот пример и посмотрим, как справились разные API. Измерять будем throughput (больше = лучше)

Получаем достаточно интересные результаты на первый взгляд и сразу бежим переписывать всё на Stream API (конечно же нет), обещая менеджеру прирост производительности в 5-10 раз. Но давайте сначала разберёмся с этим. Reactor оказался для этой задачи достаточно медленный. Но почему мы имеем разницу в 2 раза в сравнении ExecutorService и Stream API. Код для исполнения на Stream API выглядит следующим образом:

var result = data.parallelStream()
                 .map(Worker::call)
                 .toList();

Внимательные сразу скажут: «тут же не настраивается пул потоков, поэтому тест нерелевантный». Да, ParallelStream использует внутри себя common ForkJoinPool который создаётся самой JVM при старте нашего приложения. И количество тредов в нём пропорционально количеству ядер вашего компьютера. Но разработчики JVM подумали за нас и дали возможность изменить это значение флагом «java.util.concurrent.ForkJoinPool.common.parallelism». И в данном примере, выставляя количество тредов равное с нашим ExecutorService, мы не получим существенное изменения в throughput.

И здесь следует вспомнить о том, что ForkJoinPool – это не просто ExecutorService. На нём конечно можно исполнять обычные Runnable и Callable, но создан он был для исполнения специальных задач, которые наследуются от ForkJoinTask. Эти задачи должны логически представлять собой задачи «разделяй и властвуй». Основная идея в том, что у вас есть большая задача. Вы её делите до тех пор, пока кусок этой задачи не станет на столько маленьким, чтобы его выполнить в текущем потоке. Таким образом, каждый поток в ForkJoinPool получает себе достаточно большую подзадачу и форкает её в свою же очередь, на которой почти не нужно блокироваться (в отличие от ExecutorService). И второй важный момент. Задача делится не до атомарного состояния, а до состояния, когда её будет оптимальнее выполнить на одном треде, чем разделять далее. Что это значит? Можно привести утрированный пример для идеальных условий. Допустим у вас есть 1000 «одинаковых» задач, которые можно выполнять параллельно и 10 потоков для них (выносим за скобки ядра процессора и т.п.). Очевидно, что наиболее оптимальным вариант — это выполнить на каждом потоке по 100 задач. И здесь у нас два варианта: сабмитить каждому потоку задачи по одной или же засабмитить сразу всю пачку из 100 задач. Естественно второй вариант будет предпочтительнее по многим причинам (создание отдельного Future на каждую задачу, вычитывание каждой задачи с BlockingQueue и т.д.). Чтобы убедиться давайте тоже напишем коротенький бенчмарк.

Получаем разницу почти в два раза. В реальном мире конечно нельзя просто разделить весь объём на количество тредов, так как наверняка не все задачи будут «одинаковыми». Поэтому у ForkJoinPool есть дополнительный коэффициент разбиения.

Давайте попробуем написать свою ForkJoinTask и сравнить производительность с ParallelStream. Код будет выглядеть примерно так:

@Override
protected Long compute() {
    if (data.size() <= SEQUENTIAL_THRESHOLD) {
        return computeSequentially();
    } else {
        int mid = data.size() / 2;
        var firstSubtask =
                new WorkerTask(data.subList(0, mid));
        var secondSubtask =
                new WorkerTask(data.subList(mid, data.size()));

        firstSubtask.fork();

        return secondSubtask.compute() + firstSubtask.join();
    }
}

Запускаем и наслаждаемся результатом

Но давайте теперь вернёмся к нашему реактивному коду. Так почему же он выполняется так медленно для нашей задачи? Для этого достаточно просто взглянуть на стэктрейс из нашего метода.

Половина здесь — это внутренности реактора, которые в итоге вызывают тот самый ExecutorService. И более того, реактор не гарантирует, что каждый шаг будет выполняться в том же потоке. И тут не будет оптимизаций такого уровня как в ForkJoinPool. Так что же теперь выкинуть реактор? Конечно же нет. Посмотрим это на таком примере. В оригинальном бенчмарке наша основная работа имитировалась через Blackhole::cosnumeCPU и занимала микросекунды. Давайте попробуем увеличить её до миллисекунд. Запускаем бенчмарк.

Все результаты в пределах погрешности. Получается, когда размер нашей работы вычисляется уже в миллисекундах, то оптимизации Stream API и накладные расходы на Reactor уже не играют столь значимой роли. Такие шаги можно сравнить с сетевым вызовом.

Давайте подводить итоги. Всякий раз, когда вы пишете параллельно выполняющийся код, стоит думать о той самой «атомарной» задаче. И как мы увидели по бенчмарку, если она действительно мала, то можно попробовать воспользоваться ParallelStream API (ForkJoinPool для тех, кто хочет управлять своей мечтой исполнением сам). Вероятно, вам стоит взглянуть по-новому на старый код, написанный через ExecutorService. Стоит подумать над использованием Reactor, там, где нет сетевых взаимодействий и вы не используете все его «плюшки» вроде backpressure, retry ну и сама парадигма сервиса не реактивная. И не стоит бояться использовать Reactor, считая что он слишком медленный для параллельного исполнения. И совсем точно стоит подумать над тем, чтобы комбинировать все эти подходы там, где они действительно нужны и могут ускорить определённый этап вашей программы.

Комментарии (5)


  1. dididididi
    05.09.2023 17:03
    -3

    Фигасе простой... Нихрена непонятно((


  1. brake
    05.09.2023 17:03

    Первое адекватное описание работы форк-джоин пула, в котором я не потерялся.


    1. AstarothAst
      05.09.2023 17:03

      Это не "описание работы форк-джоин пула", это по сути Рабинович напел — кое-как описана fork-фаза, и ни слова о join-фазе. Порассуждали о дроблении задач, но ни слова о том, что итоговый результат тоже должен быть собран из множества результатов подзадач. Про детали реализации, вроде того же "воровства работы", я молчу.


      Могу порекомендовать вот это видео: https://www.youtube.com/watch?v=t0dGLFtRR9c
      Шипилёву тут потребовалось более полутора часов, что бы сказать все, что он хотел о fjp.


  1. vooft
    05.09.2023 17:03

    Тут много вопросов, на самом деле. К примеру, что за scheduler вы используете? Или Чему равен PARALLELISM?

    Стандартные реактивные шедулеры используют внутри пул ExecutorService с одним тредом, чтобы как раз избежать переключения контекста.

    Плюс, в реакторе есть ForkJoinPoolScheduler.


    1. Jhumper Автор
      05.09.2023 17:03

      Использую ParallelScheduler. PARALLELISM - 10