В данном посте будет рассказано, как реализовать сортировку на Java c использованием ExecutorService. Общая суть сортировки в следующем:
Здесь применяются идеи сортировки слиянием, но массив разбивается только на две части (рекурсия не используется).
Для слияния можно использовать следующую функцию:
Код функции слияния взят отсюда.
Суть слияния в следующем: в начале указатели находятся на первом элементе для обоих массивов. Далее значения элементов, соответствующих позициям указателя сравниваются и указатель для меньшего элемента сдвигается на следующий элемент, сам элемент добавляется в результирующий массив. Цикл продолжается до тех пор, пока не дойдем до конца одного из массивов, тогда оставшаяся часть второго массива будет скопирована в конец результирующего массива. Таким образом, на выходе получаем отсортированный массив.
Также был создан класс для многопоточной сортировки, в нем был создан метод run, который выполняется, когда к объекту типа Thread применяется метод start(). В нашем случае за это будет отвечать executorService. Приведем здесь код класса merge, объекты которого и будут создаваться для реализации многопоточной сортировки:
Для сортировки частей массива была использована встроенная в java сортировка. Далее приведен код для сортировки с помощью пула потоков. Проводятся замеры времени для многопоточного и обычного варианта (спойлер: многопоточная дает ускорение только на большом объеме данных):
В начале основной функции массив заполняется произвольными строками, которые содержат числа от 0 до 10000000. В качестве количества потоков берется количество доступное на устройстве. Переменная batchSize отвечает за размерность массивов для сортировки в параллельном режиме. Затем создается executorService с фиксированным количеством потоков.
Для каждого потока создается свой объект класса merge, затем этот ставим задачу по сортировке в очередь на выполнение. С помощью future ждем, пока все посчитается, собираем все отсортированные части массива и сливаем их по очереди в результирующий массив. Останавливаем executorService и можем посмотреть на временные затраты последовательного и параллельного варианта реализации.
Код лежит здесь
- Массив разбивается на части
- Каждая часть массива сортируется
- Идем по упорядоченным массивам, сливаем их в один
Здесь применяются идеи сортировки слиянием, но массив разбивается только на две части (рекурсия не используется).
Для слияния можно использовать следующую функцию:
public static String[] merge( String[] leftPart, String[] rightPart ) {
int cursorLeft = 0, cursorRight = 0, counter = 0;
String[] merged = new String[leftPart.length + rightPart.length];
while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
merged[counter] = leftPart[cursorLeft];
cursorLeft+=1;
} else {
merged[counter] = rightPart[cursorRight];
cursorRight+=1;
}
counter++;
}
if ( cursorLeft < leftPart.length ) {
System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
}
if ( cursorRight < rightPart.length ) {
System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
}
return merged;
}
Код функции слияния взят отсюда.
Суть слияния в следующем: в начале указатели находятся на первом элементе для обоих массивов. Далее значения элементов, соответствующих позициям указателя сравниваются и указатель для меньшего элемента сдвигается на следующий элемент, сам элемент добавляется в результирующий массив. Цикл продолжается до тех пор, пока не дойдем до конца одного из массивов, тогда оставшаяся часть второго массива будет скопирована в конец результирующего массива. Таким образом, на выходе получаем отсортированный массив.
Также был создан класс для многопоточной сортировки, в нем был создан метод run, который выполняется, когда к объекту типа Thread применяется метод start(). В нашем случае за это будет отвечать executorService. Приведем здесь код класса merge, объекты которого и будут создаваться для реализации многопоточной сортировки:
public class Merger implements Runnable{
private String[] unsorted, sorted;
public Merger(String[] unsorted) {
this.unsorted = unsorted;
}
public void run() {
int middle;
String[] left, right;
// array is sorted
if ( unsorted.length <= 1 ) {
sorted = unsorted;
} else {
//
middle = unsorted.length / 2;
left = new String[middle];
right = new String[unsorted.length - middle];
//split array on two
System.arraycopy(unsorted, 0, left, 0, middle);
System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
SimpleMerger leftSort = new SimpleMerger(left);
SimpleMerger rightSort = new SimpleMerger(right);
leftSort.sort();
rightSort.sort();
//sort and merge
sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
}
}
public String[] getSorted() {
return sorted;
}
}
Для сортировки частей массива была использована встроенная в java сортировка. Далее приведен код для сортировки с помощью пула потоков. Проводятся замеры времени для многопоточного и обычного варианта (спойлер: многопоточная дает ускорение только на большом объеме данных):
public static void main(String[] args) throws Exception {
int arrSize = 1_000_000_0;
String[] unsorted = new String[arrSize];
Random randomizer = new Random();
for ( int i = 0; i < arrSize; i++ ) {
unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
}
List<Future> futures = new ArrayList<>();
int processorCount = Runtime.getRuntime().availableProcessors();
int batchSize = arrSize/processorCount;
long startTime = System.currentTimeMillis();
// create ExecutorService
final ExecutorService executorService = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
ArrayList<Merger> mergers = new ArrayList<>();
for (int i = 0; i < processorCount; i++) {
String[] part = new String[batchSize];
System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
// create merger
Merger merger = new Merger(part);
futures.add(executorService.submit(merger));
//add merger to list to get result in future
mergers.add(merger);
}
for (Future<Double> future : futures) {
future.get();
}
executorService.shutdown();
int j = 0;
// array to get result
String[] mergered = new String[arrSize];
// sequential merge of all part of array
for (Merger merger:mergers){
if (j == 0) {
mergered = merger.getSorted();
j+=1;
}
else{
String[] part = merger.getSorted();
mergered = SimpleMerger.merge( mergered, part);
}
}
long timeSpent = System.currentTimeMillis() - startTime;
System.out.println("Program execution time is " + timeSpent + " milliseconds");
if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
startTime = System.currentTimeMillis();
Arrays.sort(unsorted);
timeSpent = System.currentTimeMillis() - startTime;
System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
}
В начале основной функции массив заполняется произвольными строками, которые содержат числа от 0 до 10000000. В качестве количества потоков берется количество доступное на устройстве. Переменная batchSize отвечает за размерность массивов для сортировки в параллельном режиме. Затем создается executorService с фиксированным количеством потоков.
Для каждого потока создается свой объект класса merge, затем этот ставим задачу по сортировке в очередь на выполнение. С помощью future ждем, пока все посчитается, собираем все отсортированные части массива и сливаем их по очереди в результирующий массив. Останавливаем executorService и можем посмотреть на временные затраты последовательного и параллельного варианта реализации.
Код лежит здесь
shishmakov
Почему не ForkJoinPool? Вроде бы это его задача — дробить и исполнять мелкие части.
introvertingCode Автор
Недавно начала изучать Java, хотелось освоить ExecutorService