В рамках данной статьи будет созданы классы, использующие Fork/Join Framework. В коде показан один из возможных вариантов применения параллельного программирования. Итак, начнем.
Создавая приложения, следует максимально разделять части, отвечающие за запуск, настройку и обработку данных. И данный вариант работы с Fork/Join — не исключение. В примерах будут использованы классы Start, Stream, Calc соответственно.
Часть первая — запуск
Для тестирования создадим класс Start, он будет служить «точкой» запуска. Значение timebetweenStartEnd покажет нам интервал времени между началом и окончанием расчетов. Под расчетами подразумевается возведение в степень чисел от 0 до 1000000 в двух вариантах в однопоточном и многопоточном режиме.
В классе Start определен пул потоков ForkJoinPool(). С помощью метода invoke() был достигнут результат запуска задачи и ожидания ее выполнения. Значение componentValue определено равное 1000000. Во вновь созданном экземпляре класса Stream определены исходные данные. С помощью invoke() мы «переводим» данную задачу на выполнение.
import java.util.concurrent.ForkJoinPool;
public class Start {
public static void main(String[] args) {
final int componentValue = 1000000;
Long beginT = System.nanoTime();
ForkJoinPool fjp = new ForkJoinPool();
Stream test = new Stream(componentValue,0,componentValue);
fjp.invoke(test);
Long endT = System.nanoTime();
Long timebetweenStartEnd = endT - beginT;
System.out.println("=====time========" +timebetweenStartEnd);
}
}
Часть вторая. Настройка. Класс Stream
Вторая часть механизма представляет класс (Stream), отвечающий за настройку многопоточности. Сейчас у нас всего два таких варианта: первый — по количеству обрабатываемых значений в одном потоке (далее — «отсечка)», второй — по количеству процессоров (получаем с помощью метода availableProcessors()). Прошу обратить внимание читателей, что в данной статье не будет прорабатываться механизм динамического создания потоков в зависимости от количества процессоров и/или других условий. Это тема следующей статьи.
В классе использован абстрактный метод compute(), отвечающий за запуск вычислений, в нашем случае это выбор варианта расчета и запуск расчетов в методе go класса Calc. С помощью метода invokeAll() произведем запуск подзадач.
Из алгоритма видно, что в случае, если у нас больше одного процессора, или значение отсечки (500000) больше/равно полученным частям, то происходит расчет. В примере, мы делим forSplit на несколько частей (две) и запускаем две подзадачи. Изменив значение переменной countLimit или выставив значение countProcessors равное единице произойдет запуск только одной задачи по обработке данных.
import java.util.concurrent.RecursiveAction;
public class Stream extends RecursiveAction {
final int countProcessors = Runtime.getRuntime().availableProcessors();
final int countLimit = 500000;
int start;
int end;
int forSplit;
Stream(int componentValue,int startNumber, int endNumber) {
forSplit = componentValue;
start = startNumber;
end = endNumber;
}
protected void compute() {
if (countProcessors == 1 || end - start <= countLimit) {
System.out.println("=run=");
for(int i = start; i <= end; i++) {
new Calc().go(i);
}
} else {
int middle = (start + end)/ 2;
invokeAll(new Stream(forSplit, 0, middle),
new Stream(forSplit, middle+1, end));
}
}
}
Часть третья. Выполнение расчета. Класс Calc
Данный класс отвечает за возведение числа в степень. Нижеуказанная часть предназначена для демонстрации и может содержать любые вычисления от перебора коллекций до записи данных в хранилище.
public class Calc {
public void go(int numberForCalc) {
for(int i = 0; i <= numberForCalc; i++) {
double pow = Math.pow(numberForCalc,100);
}
}
}
Вместо концовки
Данный материал будет полезен тем, кто только начал изучать параллельное программирование. В нем показаны основы работы с небольшой частью функционала. Обращаю внимание читателей, что для небольших вычислений время, затраченное на создание второй подзадачи может быть больше времени выполнения расчета. В следующих статьях приблизимся к созданию гибкого функционала для запуска и определения максимально возможных параллельных потоков, а также затронем тему ограничений, связанных с одновременно исполняемыми командами.
Комментарии (12)
AterCattus
16.11.2015 13:48Fork/Join… Оно стартует новый процесс на каждый поток выполнения?
Terran37
16.11.2015 15:22Данный пример фактически демонстрирует разбиение задачи на части и параллельный запуск этих подзадач. Т.е. обработка выполняется отдельно.
Insomnium
16.11.2015 15:25Оно выделяет свободный поток на задачу, которую успел ухватить из внутренней очереди (очень неточное определение на самом деле. Вообще, лучше почитать документацию и посмотреть пару лекций, скажем, Шипилева). Сама фраза «стартует новый процесс на каждый поток» звучит страшно.
AterCattus
16.11.2015 15:53Вот и мне страшно, если оно стартует процессы. Если там просто пул потоков-воркеров, то норм.
Terran37
16.11.2015 22:01Выполнение задач происходит из пула потоков ForkJoinPool.
Тут вы вольны поступать тремя способами. Задать его самостоятельно, поставить по умолчанию(будет зависеть о количества процессоров), и использовать общий пул.
schroeder
пардон, ничего не понял
Terran37
Нет понимания зачем нужны потоки? Или просто плохой пример?
schroeder
Я знаю что такое потоки. Я не понял что происходит в вашем примере. Зачем там countLimit? Что за класс Stream( это родной явовский класс из 8-й явы или ваше собственное изобретение)? В общем вообще ничего не понял. Чем вам не нравится например ExecutorService?
Terran37
countLimit, как и countProcessors используются для демонстрации возможных ограничений при расчетах. Показал, что можно ввести «ключи». Вы можете оперировать и мощностями(процессорами) и ограничителями в виде констант.
Stream — придуманное название класса.
Terran37
Все три класса придуманные. Они созданы для демонстрации части возможностей Framework-а.
Terran37
«C ExecutorService у нас была гарантия, что одна задача от начала и до конца выполняется одним потоком.
В Fork/Join работа с потоками претерпела сильные изменения. Задачи (ForkJoinTask’s) имеют уже другую семантику нежели потоки: один поток может выполнять несколько задач пересекающихся по времени.» из статьи http://habrahabr.ru/post/134183/. Посмотрите.
Terran37
Ссылку поправил. Ссылка.