Привет, Хабр! Представляю вашему вниманию перевод статьи "Java 8 Stream Tutorial".

Это руководство, основанное на примерах кода, представляет всесторонний обзор потоков в Java 8. При моем первом знакомстве с Stream API, я был озадачен названием, поскольку оно очень созвучно с InputStream и OutputStream из пакета java.io; Однако потоки в Java 8 — нечто абсолютно другое. Потоки представляют собой монады, которые играют важную роль в развитии функционального программирования в Java.
В функциональном программировании монада является структурой, которая представляет вычисление в виде цепи последовательных шагов. Тип и структура монады определяют цепочку операций, в нашем случае — последовательность методов с встроенными функциями заданного типа.
Это руководство научит работе с потоками и покажет как обращаться с различными методами, доступными в Stream API. Мы разберем порядок выполнения операций и проследим как последовательность методов в цепочке влияет на производительность. Познакомимся с мощными методами Stream API, такими как reduce, collect и flatMap. В конце руководства уделим внимание параллельной работе с потоками.

Если вы не чувствуете себя свободно в работе с лямбда-выражениями, функциональными интерфейсами и ссылочными методами, вам будет полезно ознакомиться с моим руководством по нововведениям в Java 8 (перевод на Хабре), а после этого вернуться к изучению потоков.

Как работают потоки


Поток представляет последовательность элементов и предоставляет различные методы для произведения вычислений над данными элементами:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);
// C1
// C2

Методы потоков бывают промежуточными (intermediate) и терминальными (terminal). Промежуточные методы возвращают поток, что позволяет последовательно вызывать множество таких методов. Терминальные методы либо не возвращают значения (void) либо возвращают результат типа отличного от потока. В вышеприведенном примере методы filter, map и sorted являются промежуточными, а forEach — терминальным. Для ознакомления с полным списком доступных методов потока обратитесь к документации. Такая цепочка потоковых операций также известна как конвейер операций (operation pipeline).

Большинство методов из Stream API принимают в качестве параметров лямбда-выражения, функциональный интерфейс, описывающие конкретное поведение метода. Большая их часть должна одновременно быть невмешивающейся (non-interfering) и не запоминающей состояние (stateless). Что же это означает?

Метод является невмешивающимся (non-interfering), если он не изменяет исходные данные, лежащие в основе потока. Например, в вышеприведенном примере никакие лямбда-выражения не вносят изменений в списочный массив myList.

Метод является не запоминающим состояние (stateless), если порядок выполнения операции определен. Например, ни одно лямбда-выражение из примера не зависит от изменяемых переменных или состояний внешнего пространства, которые могли бы меняться во время выполнения.

Различные виды потоков


Потоки могут быть созданы из различных исходных данных, главным образом из коллекций. Списки (Lists) и множества (Sets) поддерживают новые методы stream() и parllelStream() для создания последовательных и параллельных потоков. Параллельные потоки способны работать в многопоточном режиме (on multiple threads) и будут рассмотрены в конце руководства. А пока рассмотрим последовательные потоки:

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

Здесь вызов метода stream() для списка возвращает обычный объект потока.
Однако для работы с потоком вовсе не обязательно создавать коллекцию:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

Просто используйте Stream.of() для создания потока из нескольких объектных ссылок.

Помимо обычных потоков объектов Java 8 располагает специальными типами потоков для работы с примитивными типами: int, long, double. Как можно догадаться это IntStream, LongStream, DoubleStream.

Потоки IntStream могут заменить обычные циклы for(;;) используя IntStream.range():

IntStream.range(1, 4)
    .forEach(System.out::println);
// 1
// 2
// 3

Все эти потоки для работы с примитивными типами работают так же как и обычные потоки объектов за исключением следующего:

  • Потоки примитивов используют специальные лямбда-выражения. Например, IntFunction вместо Function, или IntPredicate вместо Predicate.
  • Потоки примитивов поддерживают дополнительные терминальные методы: sum() и average()

    Arrays.stream(new int[] {1, 2, 3})
        .map(n -> 2 * n + 1)
        .average()
        .ifPresent(System.out::println);  // 5.0
    


Иногда полезно превратить поток объектов в поток примитивов или наоборот. Для этой цели потоки объектов поддерживают специальные методы: mapToInt(), mapToLong(), mapToDouble():

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3

Потоки примитивов могут быть преобразованы в потоки объектов посредством вызова mapToObj():

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

В следующем примере поток из чисел с плавающей точкой отображается в поток целочисленных чисел и затем отображается в поток объектов:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);
// a1
// a2
// a3

Порядок выполнения


Сейчас, когда мы узнали как создавать различные потоки и как с ними работать, погрузимся глубже и рассмотрим, как потоковые операции выглядят под капотом.

Важная характеристика промежуточных методов — их лень. В этом примере отсутствует терминальный метод:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

При выполнении этого фрагмента кода ничего не будет выведено в консоль. А все потому, что промежуточные методы выполняются только при наличии терминального метода. Давайте расширим пример добавлением терминального метода forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

Выполнение этого фрагмента кода приводит к выводу на консоль следующего результата:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

Порядок, в котором расположены результаты, может удивить. Можно наивно ожидать, что методы будут выполняться “горизонтально”: один за другим для всех элементов потока. Однако вместо этого элемент двигается по цепочке “вертикально”. Сначала первая строка “d2” проходит через метод filter затем через forEach и только тогда, после прохода первого элемента через всю цепочку методов, следующий элемент начинает обрабатываться.

Принимая во внимание такое поведение, можно уменьшить фактическое количество операций:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

Метод anyMatch вернет true, как только предикат будет применен к входящему элементу. В данном случае это второй элемент последовательности — “A2”. Соответственно, благодаря “вертикальному” выполнению цепочки потока map будет вызван только дважды. Таким образом вместо отображения всех элементов потока, map будет вызван минимально возможное количество раз.

Почему последовательность имеет значение


Следующий пример состоит из двух промежуточных методов map и filter и терминального метода forEach. Рассмотрим как выполняются данные методы:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

Нетрудно догадаться, что оба метода map и filter вызываются 5 раз за время выполнения — по разу для каждого элемента исходной коллекции, в то время как forEach вызывается только единожды — для элемента прошедшего фильтр.

Можно существенно сократить число операций, если изменить порядок вызовов методов, поместив filter на первое место:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1
// filter:  b3
// filter:  c

Сейчас map вызывается только один раз. При большом количестве входящих элементов будем наблюдать ощутимый прирост производительности. Помните об этом составляя сложные цепочки методов.

Расширим вышеприведенный пример, добавив дополнительную операцию сортировки — метод sorted:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

Сортировка — это специальный вид промежуточных операций. Это так называемая операция с запоминанием состояния (stateful), поскольку для сортировки коллекции необходимо учитывать ее состояния на протяжении всей операции.

В результате выполнения данного кода получаем следующий вывод в консоль:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

Сперва производится сортировка всей коллекции целиком. Другими словами метод sorted выполняется “горизонтально”. В данном случае sorted вызывается 8 раз для нескольких комбинаций из элементов входящей коллекции.

Еще раз оптимизируем выполнение данного кода посредством изменения порядка вызовов методов в цепочке:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2
  

В этом примере sorted вообще не вызывается т.к. filter сокращает входную коллекцию до одного элемента. В случае с большими входящими данными производительность выиграет существенно.

Повторное использование потоков


В Java 8 потоки не могут быть использованы повторно. После вызова любого терминального метода поток завершается:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Вызов noneMatch после anyMatch в одном потоке приводит к следующей исключительной ситуации:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

Для преодоления этого ограничения следует создавать новый поток для каждого терминального метода.

Например, можно создать поставщика (supplier) для конструктора нового потока, в котором будут установлены все промежуточные методы:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Каждый вызов метода get создает новый поток, в котором можно безопасно вызвать желаемый терминальный метод.

Продвинутые методы


Потоки поддерживают большое количество различных методов. Мы уже ознакомились с наиболее важными методами. Чтобы самостоятельно ознакомиться с остальными, обратитесь к документации. А сейчас погрузимся еще глубже в более сложные методы: collect, flatMap и reduce.

Большая часть примеров кода из этого раздела обращается к следующему фрагменту кода для демонстрации работы:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect


Collect очень полезный терминальный метод, который служит для преобразования элементов потока в результат иного типа, например, List, Set или Map.

Collect принимает Collector, который содержит четыре различных метода: поставщик (supplier). аккумулятор (accumulator), объединитель (combiner), финишер (finisher). На первый взгляд это выглядит очень сложно, однако Java 8 поддерживает различные встроенные коллекторы через класс Collectors, где реализованы наиболее используемые методы.

Популярный случай:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

Как видите, создать список из элементов потока очень просто. Нужен не список а множество? Используйте Collectors.toSet().

В следующем примере люди группируются по возрасту:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Коллекторы невероятно разнообразны. Также можно агрегировать элементы коллекции, например, определить средний возраст:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

Для получения более исчерпывающей статистики используем резюмирующий коллектор, который возвращает специальный объект с информацией: минимальным, максимальным и средним значениями, суммой значений и количеством элементов:

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

Следующий пример объединяет все имена в одну строку:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

Соединяющий коллектор принимает разделитель, а также опционально префикс и суффикс.

Для преобразования элементов потока в отображение, следует определить каким образом должны отображаться ключи и значения. Помните, что ключи в отображении должны быть уникальными. В противном случае получим IllegalStateException. Можно опционально добавить функцию слияния для обхода исключения:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

Итак, мы ознакомились с некоторыми из наиболее мощных встроенных коллекторов. Попробуем построить собственный. Мы хотим преобразовать все элементы потока в одну строку, которая состоит из имен в верхнем регистре, разделенных вертикальной чертой |. Для этого создадим новый коллектор используя Collector.of(). Нам нужны четыре составные части нашего коллектора: поставщик, аккумулятор, соединитель, финишер.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

Поскольку строки в Java неизменяемы, нам нужен класс-помощник типа StringJoiner, позволяющий коллектору построить для нас строку. На первой стадии поставщик конструирует StringJoiner с присвоенным разделителем. Аккумулятор используется для добавления каждого имени в StringJoiner.

Соединитель знает как соединить два StringJoinerа в один. И в конце финишер конструирует желаемую строку из StringJoinerов.

FlatMap


Итак, мы узнали как превращать объекты потока в другие типы объектов при помощи метода map. Map — своего рода ограниченный метод, поскольку каждый объект может быть отображен в только один другой объект. Но что если нужно отобразить один объект в множество других, или вовсе не отображать его? Вот тут-то выручает метод flatMap. FlatMap превращает каждый объект потока в поток других объектов. Содержимое этих потоков затем упаковывается в возвращаемый поток метода flatMap.

Для того чтобы посмотреть на flatMap в действии, соорудим подходящую иерархию типов для примера:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

Создадим несколько объектов:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));


Теперь у нас есть список из трех foo, каждый из которых содержит по три bar.

FlatMap принимает функцию, которая должна вернуть поток объектов. Таким образом, чтобы получить доступ к объектам bar каждого foo, нам просто нужно подобрать подходящую функцию:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

Итак, мы успешно превратили поток из трех объектов foo в поток из 9 объектов bar.

Наконец, весь вышеприведенный код можно сократить до простого конвейера операций:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
 

FlatMap также доступен в классе Optional, введенном в Java 8. FlatMap из класса Optional возвращает опциональный объект другого класса. Это может быть использовано чтобы избежать нагромождения проверок на null.

Представьте себе иерархическую структуру типа этой:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

Для получения вложенной строки foo из внешнего объекта необходимо добавить множественные проверки на null для избежания NullPointException:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

Того же можно добиться, используя flatMap класса Optional:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

Каждый вызов flatMap возвращает обертку Optional для желаемого объекта, если он присутствует, либо для null в случае отсутствия объекта.

Reduce


Операция упрощения объединяет все элементы потока в один результат. Java 8 поддерживает три различных типа метода reduce.

Первый сокращает поток элементов до единственного элемента потока. Используем этот метод для определения элемента с наибольшим возрастом:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

Метод reduce принимает аккумулирующую функцию с бинарным оператором (BinaryOperator). Тут reduce является би-функцией (BiFunction), где оба аргумента принадлежат одному типу. В нашем случае, к типу Person. Би-функция — практически тоже самое, что и функция (Function), однако принимает 2 аргумента. В нашем примере функция сравнивает возраст двух людей и возвращает элемент с большим возрастом.

Следующий вид метода reduce принимает и начальное значение, и аккумулятор с бинарным оператором. Этот метод может быть использован для создания нового элемента. У нас — Person с именем и возрастом, состоящими из сложения всех имен и суммы прожитых лет:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

Третий метод reduce принимает три параметра: изначальное значение, аккумулятор с би-функцией и объединяющую функцию типа бинарного оператора. Поскольку начальное значение типа не ограничено до типа Person, можно использовать редуцирование для определения суммы прожитых лет каждого человека:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76
 

Как видим, мы получили результат 76, но что же на самом деле происходит под капотом?

Расширим вышеприведенный фрагмент кода выводом текста для дебага:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

Как видим, всю работу выполняет аккумулирующая функция. Впервые она вызывается с изначальным значением 0 и первым человеком Max. В последующих трех шагах sum постоянно возрастает на возраст человека из последнего шага пока не достигает общего возраста 76.

И что дальше? Объединитель никогда не вызывается? Рассмотрим параллельное выполнение этого потока:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

При параллельном выполнении получаем совершенно другой консольный вывод. Сейчас объединитель действительно вызывается. Поскольку аккумулятор вызывался параллельно, объединитель должен был суммировать значения, сохраненные по-отдельности.

В следующей главе более детально изучим параллельное выполнение потоков.

Параллельные потоки


Потоки могут выполняться параллельно для повышения производительности при работе с большими количествами входящих элементов. Параллельные потоки используют обычный ForkJoinPool доступный через вызов статического метода ForkJoinPool.commonPool(). Размер основного пула потоков может достигать 5 потоков выполнения — точное число зависит от количества доступных физических ядер процессора.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

На моем компьютере обычный пул потоков по умолчанию инициализируется с распараллеливанием на 3 потока. Это значение можно увеличить или уменьшить посредством установки следующего параметра JVM:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Коллекции поддерживают метод parallelStream() для создания параллельных потоков данных. Также можно вызвать промежуточный метод parallel() для превращения последовательного потока в параллельный.

Для понимания поведения потока при параллельном выполнении, следующий пример печатает информацию про каждый текущий поток (thread) в System.out:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

Рассмотрим выводы с записями для дебага чтобы лучше понять, какой поток (thread) используется для выполнения конкретных методов потока (stream):

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

Как видим, при параллельном выполнении потока данных используются все доступные потоки (threads) текущего ForkJoinPool. Последовательность вывода может отличаться, так как не определена последовательность выполнения каждого конкретного потока (thread).

Давайте расширим пример добавлением метода sort:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

На первый взгляд результат может показаться странным:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

Кажется, будто sort выполняется последовательно и только в потоке main. На самом деле при параллельном выполнении потока под капотом метода sort из Stream API спрятан метод сортировки класса Arrays, добавленный в Java 8, — Arrays.parallelSort(). Как указано в документации, этот метод на основании длины входящей коллекции определяет, как именно — параллельно или последовательно будет произведена сортировка:
Если длина определенного массива меньше минимальной “зернистости”, сортировка производится посредством выполнения метода Arrays.sort.
Вернемся к примеру с методом reduce из предыдущей главы. Мы уже выяснили, что объединительная функция вызывается только при параллельной работе с потоком. Рассмотрим, какие потоки задействованы:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

Консольный вывод показывает, что обе функции: аккумулирующая и объединяющая, выполняются параллельно, используя все возможные потоки:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

Можно утверждать, что параллельное выполнение потока способствует значительному повышению эффективности при работе с большими количествами входящих элементов. Однако следует помнить, что некоторые методы при параллельном выполнении требуют дополнительных расчетов (объединительных операций), которые не требуются при последовательном выполнении.

Кроме того, для параллельного выполнения потока используется все тот же ForkJoinPool, так широко используемый в JVM. Так что применение медленных блокирующих методов потока может негативно отразиться на производительности всей программы, за счет блокирования потоков (threads), используемых для обработки в других задачах.

Вот и все


Мое руководство по использованию потоков в Java 8 окончено. Для более подробного изучения работы с потоками можно обратиться к документации. Если вы хотите углубиться и больше узнать про механизмы, лежащие в основе работы потоков, вам может быть интересно прочитать статью Мартина Фаулера (Martin Fowler) Collection Pipelines.

Если вам так же интересен JavaScript, вы можете захотеть взглянуть на Stream.js — JavaScript реализацию Java 8 Streams API. Возможно, вы также захотите прочитать мои статьи Java 8 Tutorial (русский перевод на Хабре) и Java 8 Nashorn Tutorial.

Надеюсь, это руководство было полезным и интересным для вас, и вы наслаждались в процессе чтения. Полный код хранится в GitHub. Чувствуйте себя свободно, создавая ответвление в репозитории.

Комментарии (2)


  1. PqDn
    22.01.2019 11:46
    +1

    Добавте теги — для новичков, туториал


    1. Exosphere
      22.01.2019 12:36

      Добавлена метка «Tutorial».