Статья посвящена такому интересному и полезному механизму (совокупностям механизмов и библиотек), как Fork/Join Framework. Он позволяет многократно ускорить вычисления, добиться максимальных результатов при обработке, используя все доступные возможности системы (процессоры).

В рамках данной статьи будет созданы классы, использующие Fork/Join Framework. В коде показан один из возможных вариантов применения параллельного программирования. Итак, начнем.

Создавая приложения, следует максимально разделять части, отвечающие за запуск, настройку и обработку данных. И данный вариант работы с Fork/Join — не исключение. В примерах будут использованы классы Start, Stream, Calc соответственно.

Часть первая — запуск


Для тестирования создадим класс Start, он будет служить «точкой» запуска. Значение timebetweenStartEnd покажет нам интервал времени между началом и окончанием расчетов. Под расчетами подразумевается возведение в степень чисел от 0 до 1000000 в двух вариантах в однопоточном и многопоточном режиме.

В классе Start определен пул потоков ForkJoinPool(). С помощью метода invoke() был достигнут результат запуска задачи и ожидания ее выполнения. Значение componentValue определено равное 1000000. Во вновь созданном экземпляре класса Stream определены исходные данные. С помощью invoke() мы «переводим» данную задачу на выполнение.

import java.util.concurrent.ForkJoinPool;

public class Start {
    public static void main(String[] args) {

     final int componentValue = 1000000;
        Long beginT = System.nanoTime();
        ForkJoinPool fjp = new ForkJoinPool();
        Stream test = new Stream(componentValue,0,componentValue);
        fjp.invoke(test);
        Long endT = System.nanoTime();
        Long timebetweenStartEnd = endT - beginT;
        System.out.println("=====time========" +timebetweenStartEnd);

    }
}

Часть вторая. Настройка. Класс Stream


Вторая часть механизма представляет класс (Stream), отвечающий за настройку многопоточности. Сейчас у нас всего два таких варианта: первый — по количеству обрабатываемых значений в одном потоке (далее — «отсечка)», второй — по количеству процессоров (получаем с помощью метода availableProcessors()). Прошу обратить внимание читателей, что в данной статье не будет прорабатываться механизм динамического создания потоков в зависимости от количества процессоров и/или других условий. Это тема следующей статьи.

В классе использован абстрактный метод compute(), отвечающий за запуск вычислений, в нашем случае это выбор варианта расчета и запуск расчетов в методе go класса Calc. С помощью метода invokeAll() произведем запуск подзадач.

Из алгоритма видно, что в случае, если у нас больше одного процессора, или значение отсечки (500000) больше/равно полученным частям, то происходит расчет. В примере, мы делим forSplit на несколько частей (две) и запускаем две подзадачи. Изменив значение переменной countLimit или выставив значение countProcessors равное единице произойдет запуск только одной задачи по обработке данных.

import java.util.concurrent.RecursiveAction;

public class Stream extends RecursiveAction {

    final int countProcessors = Runtime.getRuntime().availableProcessors();
    final int countLimit = 500000;
    int start;
    int end;
    int forSplit;

    Stream(int componentValue,int startNumber, int endNumber) {
        forSplit = componentValue;
        start = startNumber;
        end = endNumber;
    }
    protected void compute() {
        if (countProcessors == 1 || end - start <= countLimit) {
            System.out.println("=run=");
            for(int i = start; i <= end; i++) {
                new Calc().go(i);
            }
        } else {
            int middle = (start + end)/ 2;
            invokeAll(new Stream(forSplit, 0, middle),
                    new Stream(forSplit, middle+1, end));
        }
    }
}

Часть третья. Выполнение расчета. Класс Calc


Данный класс отвечает за возведение числа в степень. Нижеуказанная часть предназначена для демонстрации и может содержать любые вычисления от перебора коллекций до записи данных в хранилище.

public class Calc {
    public void go(int numberForCalc) {
        for(int i = 0; i <= numberForCalc; i++) {
            double pow = Math.pow(numberForCalc,100);
        }
    }
}

Вместо концовки


Данный материал будет полезен тем, кто только начал изучать параллельное программирование. В нем показаны основы работы с небольшой частью функционала. Обращаю внимание читателей, что для небольших вычислений время, затраченное на создание второй подзадачи может быть больше времени выполнения расчета. В следующих статьях приблизимся к созданию гибкого функционала для запуска и определения максимально возможных параллельных потоков, а также затронем тему ограничений, связанных с одновременно исполняемыми командами.

Комментарии (12)


  1. schroeder
    16.11.2015 13:30
    +8

    пардон, ничего не понял


    1. Terran37
      16.11.2015 15:22

      Нет понимания зачем нужны потоки? Или просто плохой пример?


      1. schroeder
        16.11.2015 15:28

        Я знаю что такое потоки. Я не понял что происходит в вашем примере. Зачем там countLimit? Что за класс Stream( это родной явовский класс из 8-й явы или ваше собственное изобретение)? В общем вообще ничего не понял. Чем вам не нравится например ExecutorService?


        1. Terran37
          16.11.2015 16:39

          countLimit, как и countProcessors используются для демонстрации возможных ограничений при расчетах. Показал, что можно ввести «ключи». Вы можете оперировать и мощностями(процессорами) и ограничителями в виде констант.
          Stream — придуманное название класса.


          1. Terran37
            16.11.2015 16:41

            Все три класса придуманные. Они созданы для демонстрации части возможностей Framework-а.


        1. Terran37
          16.11.2015 16:44

          «C ExecutorService у нас была гарантия, что одна задача от начала и до конца выполняется одним потоком.
          В Fork/Join работа с потоками претерпела сильные изменения. Задачи (ForkJoinTask’s) имеют уже другую семантику нежели потоки: один поток может выполнять несколько задач пересекающихся по времени.» из статьи http://habrahabr.ru/post/134183/. Посмотрите.


          1. Terran37
            16.11.2015 16:55

            Ссылку поправил. Ссылка.


  1. AterCattus
    16.11.2015 13:48

    Fork/Join… Оно стартует новый процесс на каждый поток выполнения?


    1. Terran37
      16.11.2015 15:22

      Данный пример фактически демонстрирует разбиение задачи на части и параллельный запуск этих подзадач. Т.е. обработка выполняется отдельно.


    1. Insomnium
      16.11.2015 15:25

      Оно выделяет свободный поток на задачу, которую успел ухватить из внутренней очереди (очень неточное определение на самом деле. Вообще, лучше почитать документацию и посмотреть пару лекций, скажем, Шипилева). Сама фраза «стартует новый процесс на каждый поток» звучит страшно.


      1. AterCattus
        16.11.2015 15:53

        Вот и мне страшно, если оно стартует процессы. Если там просто пул потоков-воркеров, то норм.


        1. Terran37
          16.11.2015 22:01

          Выполнение задач происходит из пула потоков ForkJoinPool.
          Тут вы вольны поступать тремя способами. Задать его самостоятельно, поставить по умолчанию(будет зависеть о количества процессоров), и использовать общий пул.