Привет all!

Вступление

Приглянулась мне однажды идея реверс-инженеринга (реконструкции) StreamAPI из JDK8. Что и как из этого вышло опишу далее.

Ссылки

Актуальный репозиторий

Ветка актуальная для статьи

Вкратце

StreamAPI — это технология появившаяся в Java 8 позволяющая манипулировать данными в наборах (напр. коллекциями) в функциональном стиле (лямбда-выражениями). Более подробно про стримы можно почитать тут.

Зачем и для чего

Для обучения, для более глубокого понимания исследуемой темы. В процессе написания данной статьи было получено немало опыта, как по стримам, так и в целом по Java и программированию. Поэтому эта реверсия является неплохой обучающей практикой. Я бы порекомендовал и новичкам и тем кто уровнем повыше выполнить самостоятельную реализацию стрима. И независимо от вашего опыта, вас ждут открытия :)

Название

Название было выбрано следующим образом: Stream Reversed → StreamRe → StreamEr → Streamer

Возможные возможности/невозможности

Поскольку это обучающий реверс-инженеринг, целью которого является максимально понятно реализовать уже реализованное, то не стоит ожидать от стримера эффективного расхода памяти и быстродействия уровня Enterprise. Более того, в конце статьи я приведу пример сравнения производительности разных стримов, в котором стример уступает оригиналу из JDK по быстродействию.

Так же из реализации была исключена возможность распараллеливания стримов (Parallel stream), т.к. приемлемая реализация этого подхода потребует иных принципов построения и выходит за рамки этого материала.

Описанная тут реализация сохраняет следующие преимущества гибкости StreamAPI:

  • чтения данных не происходит до вызова одного из терминальных методов. Т.е., пока мы собираем стример, мы никак не влияем на источник данных, не читаем его, а только формируем набор правил, по которым будет работать конвеер стрима когда он будет запущен в работу вызовом одного из терминальных методов.

  • после вызова терминального метода, данные последовательно читаются из родительского набора и так же последовательно проходят по цепочке операций, не накапливаясь при этом в коллекциях, массивах и т.д. Конечно, за исключением метода сортировки, который по очевидным причинам требует предварительного накопления данных.

Spliterator vs. Iterator

Для того чтобы Stream мог функционировать, ему необходим источник данных. Стандартная реализация JDK (далее «оригинал»), под капотом, для чтения источника, использует сплитератор - Spliterator<T>.

Основная задача сплитератора — разделять данные на блоки, т.е. порционировать их. Порционирование используется «оригиналом» для возможности распараллеливания стримов, когда разные порции обрабатываются разными потоками. Более подробно, о сплитераторах можно почитать тут.

Поскольку мы не будем реализовывать parallel для стримера, то и в разделении данных на блоки тоже нет необходимости. Для простоты примера хватит итераторa - Iterator<T>, поэтому «под капотом» именно через него и будем получать данные из родительского источника.

Жизненный цикл (внутренние состояния)

Жизненный цикл стримера я разделил на три состояния:

  • Ожидание (WAITING) — начальное состояние стримера. В этом состоянии экземпляры создаются. Пока стример находится в этом состоянии, мы можем конструировать его из операций и вызвать один из терминальных методов когда потребуется «включить конвеер».

  • В работе (OPERATED) — в это состояние стример переходит после вызова любого терминального метода. Это состояние означает, что либо стример находится в работе — «конвеер запущен», либо готов к запуску, т.е. уже сконструирован, а значит вызовы как конвеерных так и терминальных методов более невозможны.

  • Завершен (CLOSED) — Это состояние означает, что стример завершил выполнение работы, ссылка на внешний итератор-источник обнулена (RefCount для GC). В это состояние стример переходит после того как:

    • завершилась работа любого из терминальных методов. Даже если в источнике остались данные. Например findFirst() вернул «первый» элемент. Данные возможно еще остались, но стример отработал свою задачу и может освободить неиспользуемые ссылки.

    • В источнике закончились данные - hasNext() изменил свое состояние с true на false.

    • Извне, был вызван метод явного закрытия — close() и при этом стример находился в WAITING состоянии. Данное условие (WAITING) является обязательным, поскольку мы не можем по запросу завершать работающий стрим. Так работает «оригинал», далее вернемся к этому.

Подготовка

Разделим методы стрима, которые будем реализовывать на три группы:

Порождающие (factory): empty, of, generate, iterate

Промежуточные(intermediate)/конвеерные: peek, onClose, distinct, filter, skip, limit, sorted, map, mapToInt, mapToLong, mapToDouble, flatMap, flatMapToInt, flatMapToLong, flatMapToDouble

Завершающие (terminal): spliterator, unordered, forEachOrdered, collect, min, max, reduce, count, forEach, allMatch, anyMatch, noneMatch, findFirst, findAny, iterator, toArray

Прочие: close, isParallel, sequential

Создадим проект с начальной структурой и классом Streamer<T>, реализующий интерфейс java.util.stream.Stream<T>. Позволим IDE сгенерировать пустую реализацию всех методов. Сгенерированные методы заглушим при помощи UnsupportedOperationException.

В итоге должно получиться примерно так.

Так же, сразу напишем реализацию простых методов - «однострочников», чтобы более к ним не возвращаться:

@Override
public Optional<T> findFirst() {
    return findAny(); //поскольку у нас упорядоченный стрим, 
                      //то первый элемент (First) 
  										//и есть "произвольный" (Any)
}

@Override
public boolean isParallel() {
    return false; //мы не поддерживаем параллелизм, поэтому всегда false
}

@Override
public Stream<T> sequential() {
    return this; //мы "последовательны", поэтому вернем себя же
}

@Override
public void forEachOrdered(Consumer<? super T> action) {
    forEach(action); //опять же, мы упорядочены источником, 
  									 //поэтому в нашем случае 
  									 //forEach и forEachOrdered эквивалентны
}

@Override
public Spliterator<T> spliterator() {
    return Spliterators.spliteratorUnknownSize(this.iterator(),
             Spliterator.ORDERED); //создадим сплитератор 
  																 //на основе «внутреннего» итератора
}

@Override
public Stream<T> unordered() {
    return this; //так же, можно вернуть себя
}

Создание экземпляров

Под капотом, экземпляры будут создаваться единственым закрытым (private) конструктором, который в качестве аргумента принимает внешний итератор-источник. Этот итератор и будем использовать в качестве источника данных. Клиенты же, как и в оригинале, будут получать экземпляры стримера из статических фабрик. Стоит добавить, что к статической фабрике of() я дополнительно добавил перегруженные методы получения экземпляров стримера из коллекций, перечисляемых (Iterable) типов, и непосредственно из самих Iterator`ов.

Примеры порождения стримера:

package pw.komarov.streamer;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class StreamerInstancesCreationExamples {
    public static void main(String[] args) {
        Streamer.empty(); //пустой

        Streamer.of(new Object()); //единичный объект
        Streamer.of(new Integer[]{1, 4, 8, 17}); //массив
        Streamer.of(Arrays.asList(7.34, 9, 18.7, 3)); //Iterable (List)
        Streamer.of("Foo", "Bar", "Juice", "hello", "streamer"); //из констант

        //Infinite
        Streamer.generate(() -> ThreadLocalRandom.current().nextInt()); //бесконечный (рэндом-число)
        Streamer.generate(() ->
            {
                List strings = Arrays.asList("randomly", "returned", "string", "value");
                return strings.get(ThreadLocalRandom.current().nextInt(strings.size()));
            }); //рэндом значение

        Streamer.iterate(100, (i) -> i * 2); //последовательность {100,200,400.........n}
    }
}

Методы generate() и iterate() порождают бесконечный стрим, который на каждом шаге получает значение из бесконечного итератора, у которого hasNext() всегда == true и «заглушен» метод forEachRemaining():

private static abstract class AbstractInfiniteIterator<E> 
																 implements Iterator<E> {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super E> consumer) {
        throw new UnsupportedOperationException();
    }
}

Итератор для generate():

private static class InfiniteGenerator<E> 
				extends AbstractInfiniteIterator<E> {
        
    private final Supplier<E> supplier;
    
    InfiniteGenerator(Supplier<E> supplier) {
        this.supplier = supplier;
    }

    @Override
    public E next() {
        return supplier.get();
    }
}

Далее, сначала создаем экземпляр этого генерирующего итератора, и затем из него стример:

public static <E> Streamer<E> generate(Supplier<E> supplier) {
    return of(new InfiniteGenerator<>(supplier));
}

Похожим образом реализован и iterate():

public static class InfiniteIterator<E> 
							extends AbstractInfiniteIterator<E> {
              
    private E value; //значение предыдущего шага, 
  									 //при первом вызове — initial

    private final UnaryOperator<E> unaryOperator; //клиенсткая функция 
    																							//генерации значения

    InfiniteIterator(E initial, UnaryOperator<E> unaryOperator) {
        this.value = initial;
        this.unaryOperator = unaryOperator;
    }

    @Override
    public E next() {
        E prev = this.value;
        this.value = unaryOperator.apply(prev);
        return prev;
    }
}

public static <E> Streamer<E> iterate(E initial, 
                                      UnaryOperator<E> unaryOperator) {
  
    return of(new InfiniteIterator<>(initial, unaryOperator));
}

В итоге должно получиться примерно так.

Закрытие/завершение

Опишем два метода закрытия/завершения стримера. Первый - internalClose() для внутреннего использования. Вызывать его будем когда работа стримера логически завершена. Например закончились данные в источнике или завершена работа одного из терминальных методов. В общем, в тех случаях, когда использование стримера более невозможно. Этот метод будет так же обнулять ссылки на внешние ресурсы (чтобы уменьшить RefCount для GC) и переводить стример в CLOSED состояние.

Второй метод — внешнего закрытия, реализует close() интерфейса AutoCloseable. Фактически же, завершает стример только из состояния WAITING. Это сделано для того, чтобы внешний вызов не мог повлиять на работу выполняющегося стрима. Так работает оригинал. На мой взгляд это поведение не совсем очевидно и вот почему: Предположим, что стрим выполняет тяжеловесную операцию одним из терминальных методов. В какой то момент (к примеру, пользователь запросил отмену действия), мы понимаем, что больше не нуждаемся в этой тяжеловесной работе и хотим ее принудительно прекратить. Стрим исполняется в другом потоке, но у нас есть указатель на этот стрим. Вызываем close() в надежде прекратить выполнение операции, но он продолжает работать как ни в чем небывало… А жаль... Ведь так хотелось… :).

Второй важной частью работы этого метода является вызов пользовательских onClose последовательностей. Но и тут скрывается подвох. Эти onClose выполняются только в случае явного вызова метода close(). Т.е. если стрим завершил работу, допустим найдено искомое (min, max и т.д.), то onClose будут просто проигнорированы, а ведь возможно там были важные финализаторы... При описанном поведении, инструмент onClose() представляет сомнительную ценность, поскольку те же самые операции можно вызвать «вручную» из клиентского кода, после вызова close() например. Можно будет даже более гибко обработать возможные исключения.

Тесты, демонстрирующие вышеописанное поведение

Ну что же, имеем то, что имеем... поэтому для сохранения совместимости реализуем эти особенности в том же виде:

private enum State {WAITING, OPERATED, CLOSED}

private State state = State.WAITING;

private final List<Runnable> onCloseSequences = new LinkedList<>();

@Override
public void close() {
    if (state == State.WAITING)
        internalClose();

    //обработаем (выполним) клиентские onClose последовательности...
    RuntimeException rte = null;
    for (Iterator<Runnable> iterator = onCloseSequences.iterator(); iterator.hasNext(); ) {
        Runnable runnable = iterator.next();
        try {
            runnable.run();
        } catch (RuntimeException e) {
            if (rte == null) //если это первое исключение в цепочке...
                rte = e; //...сохраним его
            else //если не первое...
                rte.addSuppressed(e); //...сохраним его в suppressed первого
        } finally {
            iterator.remove();
        }
    }

    if (rte != null)
        throw rte;
}

private void internalClose() {
    externalIterator = null;

    state = State.CLOSED;
}

private void throwIfNotWaiting() {
    if (state != State.WAITING)
        throw new IllegalStateException("stream has already been operated upon or closed");
}

@Override
public Stream<T> onClose(Runnable closeHandler) {
    throwIfNotWaiting();

    onCloseSequences.add(closeHandler);

    return this;
}

Контракт onClose() для стрима гласит, что первое исключение, если оно есть - погашается и сохраняется. Прочие исключения (если они есть), добавляются в suppressed первого. И если было первое, то оно и бросается после выполнения всех onClose`ов. Этот контракт поддерживается в реализации приведенной выше.

Расстановка по шаблону

Ранее мы реализовали метод проверки текущего состояния стримера который бросает IllegalStateException если стример не в WAITING состоянии. Теперь пришло время его расставить в места где это нужно. А нужно это сделать во всех терминальных и конвеерных методах, кроме «однострочников» описанных ранее, т.к. они вызывают эти методы.

Поскольку конвеерные методы будут работать по принципу Builder`a — иметь возможность телескопического построения (прим.: object.method1().method2().method3().methodN()…), то каждый из них должен возвращать экземпляр себя.

В итоге шаблон конвеерного метода приобрел такой вид:

{
    throwIfNotWaiting();
        
          //todo: тут будет создание и добавление операций

    return this;
}

Каждый терминальный метод должен переводить стример из WAITING в OPERATED состояние, а по завершению работы — корректно закрывать его.

Резюмируя вышесказанное, «шаблон» терминального метода приобретает такой вид:

{
    throwIfNotWaiting(); //IllegalStateException если пытаемся использовать запущенный или завершенный стример

    state = State.OPERATED; //переведем в OPERATED

	try {
	    ;//todo: терминальные операции…
	} finally {
	    internalClose(); //выполним завершение
	}
    throw new UnsupportedOperationException("will be soon");  //чтобы не забыть про return :)
}

В итоге получилось так.

Промежуточные операции (intermediate/conveyor)

Ну вот мы и подошли к логике работы стримера. Как известно стрим состоит из набора операций которые последовательно применяются к данным, которые представлены этому стриму. Поставим вопрос, как будем хранить и как будем «строить» наборы этих операций?

Тут все очень просто. Для обозначения самой операции, объявим интерфейс:

private interface IntermediateOperation {}

Набор операций — список элементов этого интерфейса:

private List<IntermediateOperation> intermediateOperations = 
  new LinkedList<>();

А добавлять в этот список конкретные операции будем из конвеерных методов.

Из всех конвеерных операций стрима выделим отдельную группу — фильтрующие операции. Это операции, которые на основании некоторого условия - предиката зависящего от типа операции, определяют — пройдет ли элемент данных далее по конвееру или будет отброшен на текущем шаге.

Вот список всех конвеерные методов, относящихся к фильтрующим операциям: skip(), limit(), distinct(), filter().

Для обозначения этих операций, объявим еще один интерфейс:

private interface FilteringOperation<T> 
									extends IntermediateOperation, Predicate<T> {}

Predicate<T> является функциональным интерфейсом (FunctionalInterface, подробнее), и его функциональный метод - boolean test(). Реализацией этого метода в конкретной операции мы и будем определять - пройдет ли элемент по конвееру дальше, или будет «отброшен».

Вот так будет выглядеть класс конкретной операции (в приведенном случае skip):

private static class SkipOperation implements FilteringOperation {
    private final long totalCount; //количество элементов 
  																 //которые требуется "пропустить"
    private long processedCount; //количество уже "пропущеных" элементов 
  															 //текущей операцией
    
    SkipOperation(long totalCount) {
        this.totalCount = totalCount;
    }

    @Override
    public boolean test(Object o) {
        if (processedCount < totalCount) {
            processedCount++;
            
            return true; //пропустим элемент далее
        }

        return false; //отбросим/отфильтруем элемент
    }
}

@Override
public Stream<T> skip(long n) {
    throwIfNotWaiting(); //проверим текущее состояние

    intermediateOperations.add(new SkipOperation(n)); //создадим 
  												//Skip-операцию, и добавим ее в список операций.

    return this; //вернем экземпляр «себя»
  							 //для возможности телескопического построения
}
Реализуем добавление остальных фильтрующих операций:
//limit()
private long filteredByLimit; //количество "отсеяных" limit'ом элементов

private class LimitOperation implements FilteringOperation {
    private final long maxSize; //собственно и есть лимит

    LimitOperation(long maxSize) {
        this.maxSize = maxSize;
    }

    @Override
    public boolean test(Object o) {
        return maxSize < ++filteredByLimit;
    }
}

@Override
public Stream<T> limit(long maxSize) {
    throwIfNotWaiting();

    intermediateOperations.add(new LimitOperation(maxSize));

    return this;
}

//distinct()

private static class DistinctOperation implements FilteringOperation {
    private Set<Object> objects = new HashSet<>();
    
    @Override
    public boolean test(Object o) {
        return !objects.add(o);
    }
}

@Override
public Stream<T> distinct() {
    throwIfNotWaiting();

    intermediateOperations.add(new DistinctOperation());

    return this;
}

private static class FilterOperation<T> implements FilteringOperation<T> {
    private final Predicate<? super T> predicate;

    public FilterOperation(Predicate<? super T> predicate) {
        this.predicate = predicate;
    }

    @Override
    public boolean test(T t) {
        return !predicate.test(t);
    }
}

@Override
public Stream<T> filter(Predicate<? super T> predicate) {
    throwIfNotWaiting();

    intermediateOperations.add(new FilterOperation<>(predicate));

    return this;
}
Не фильтрующие:
//sorted()
public static class SortedOperation<T> implements IntermediateOperation {
    private final Comparator<? super T> comparator;

    public SortedOperation() {
        this.comparator = null;
    }

    public SortedOperation(Comparator<? super T> comparator) {
        this.comparator = comparator;
    }
}

@Override
public Stream<T> sorted() {
    throwIfNotWaiting();

    intermediateOperations.add(new SortedOperation<>());

    return this;
}

@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
    throwIfNotWaiting();

    intermediateOperations.add(new SortedOperation<>(comparator));

    return this;
}

//map()
private static class MapOperation<T, R> implements IntermediateOperation {
    private final Function<? super T, ? extends R> function;
    MapOperation(Function<? super T, ? extends R> function) {
        this.function = function;
    }
}

@SuppressWarnings("unchecked")
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
    throwIfNotWaiting();

    intermediateOperations.add(new MapOperation<>(mapper));

    return (Streamer<R>) this;
}

//flatMap()
private static class FlatMapOperation<T, R> implements IntermediateOperation {
    private final Function<? super T, ? extends Stream<? extends R>> function;
	    FlatMapOperation(Function<? super T, ? extends Stream<? extends R>> 								function) {
        this.function = function;
    }
}

@SuppressWarnings("unchecked")
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> 				mapper) {
    throwIfNotWaiting();

    intermediateOperations.add(new FlatMapOperation<>(mapper));

    return (Streamer<R>) this;
}

Отдельно тут стоит отметить peek. peek-непьющий трудовик. peek-операции, как и onClose, будем хранить в отдельном списке, без классов-оберток над ним, т.к. peek хоть и Intermediate операции, но работают немного по другому принципу — peek-последовательности выполняются ВСЕ, РАЗОМ, ПОСЛЕ вычисления КАЖДОГО элемента стримом.

//peek()

private final List<Consumer<? super T>> peekSequences = new LinkedList<>();

@Override
public Stream<T> peek(Consumer<? super T> action) {
    throwIfNotWaiting();

    peekSequences.add(action);

    return this;
}

В итоге.

Конвеерная логика

Теперь реализуем главное - логику работы «конвеера». На пальцах можно описать работу этого механизма примерно так: внешний итератор (externalIterator) получает элемент от источника, затем он проходит (или не проходит) по конвееру и передается запросившему клиенту через «обратный» (streamerIterator) итератор.

Похоже на систему водопровода — когда вода подается в систему насосом (насос тут externalIterator, качает речную воду), проходит по трубам, фильтрам (конвеер), которые отсеивают нежелательные элементы, и подается потребителю по средствам открытия крана. Кран для потребителя - streamerIterator.

Если где-то, на этом пути элемент был отброшен (отфильтрован), то из источника будет запрошен следующий. И так далее. До тех пор, пока либо в итераторе-источнике не закончатся данные и в этом случае мы так же сообщим потребителю: «данных для Вас больше нет» (hasNext() == false), либо представим этот элемент потребителю.

Реализация конвеера (водопровода):
private class StreamerIterator implements Iterator<T> {
    private Boolean hasNext;
    private T next;

    @Override
    public boolean hasNext() {
        if (hasNext == null) {

            calcNextAndHasNext();

            if (!hasNext && state != State.CLOSED) //если нет больше данных...
                internalClose(); //...завершим
        }

        return hasNext;
    }

    @Override
    public T next() {
        if (!hasNext()) //запросили, но нет больше элементов?...
            throw new NoSuchElementException(); //... получите exception

        hasNext = null; //переведем состояние hasNext в «неизвестно»

        return next;
    }

    private void calcNextAndHasNext() { //метод расчитывающий внутренние закрытые поля next и hasNext на основании наличия опционала из getNext()
        Optional<T> opt = getNext(intermediateOperations);

        //noinspection OptionalAssignedToNull
        hasNext = opt != null; //если опционал не null — естьСледующий = true
        if (hasNext) //а если следующий есть — то...
            next = opt.orElse(null); //… это значение из опционала (если значения в опционале нет — то оно null
    }
	
    //водопровод:
    @SuppressWarnings("unchecked")
    private Optional<T> getNext(List<IntermediateOperation> operations) {
        T next = null;
        boolean terminated = false;
        boolean hasNext = externalIterator.hasNext();

        while (hasNext && !terminated) {
            next = externalIterator.next();
            boolean filtered = false;
            for (IntermediateOperation operation : operations) //пройдем по всем операциям
                if (operation instanceof FilteringOperation) { //если операция - фильтрующая...
                    if (!filtered) { //… и не была отфильтрована ранее
                        filtered = ((FilteringOperation<T>)operation).test(next); //фильтруем? (test`ом)
                        if (filtered && operation instanceof LimitOperation)
                            terminated = true; //а если была отфильтрована лимитной — то еще и прервем while
                    }
                } else if (operation instanceof MapOperation) //если map-операция
                    next = (T) ((MapOperation)operation).function.apply(next);
                else
                    throw new UnsupportedOperationException("getNext(): " + operation.getClass().getSimpleName()); //неизвестная
            if (!filtered)
                break;
            else
                hasNext = externalIterator.hasNext();
        }

        if (hasNext && !terminated) {
						//применим к полученному в итоге значению peek-операции
            for (Consumer<? super T> peekSequence : peekSequences)
                peekSequence.accept(next);

            return Optional.ofNullable(next);
        }

        //noinspection OptionalAssignedToNull
        return null;
    }
}

Объявим переменную (кран водопровода):

private final StreamerIterator streamerIterator = new StreamerIterator();

UPD1:

P.S. Тут есть один нюанс. Optional у меня может быть null`ом. Да может, и это его логичное на мой взгляд применение. Не нашли значение — опционал == null, нашли значение, опционал его содержит (даже если оно null), иначе какая польза от этого опционала? Да никакой! К тому же, такое его использование осуществляется только внутри закрытых методов, а значит не нарушает никаких внешних соглашений. Но, в своем рабочем коде я использую свой класс NullableOptional<>. Помимо того что он может быть EMPTY (в случаях когда значение не найдено), в нем еще есть и некий сахар, например elseIf(), которого мне переодически нехватает в JDK Optional<> как дополнение для ifPresent(). К сожалению Optional<> объявлен как final и поэтому мой NullableOptional растет отдельной иерархией. Если кому интересно, можете глянуть (покрытие unit-тестами прилагается):

https://github.com/koma1/Streamer/compare/NullableOptional

иначе какая польза от этого опционала? Да никакой!

Беру эти слова назад, польза от него действительно есть, например использование в качестве монад. Подробнее: https://habr.com/ru/company/cit/blog/262055/

Так же, для сохранения контакта Optional эта часть кода была переписана. Optional<> больше нигде не может быть null. Для null-значений написал простенький wrapper -NullableValue<>. Этот класс никакого отношения к Optional не имеет и может быть null если элемент отсутствует, или содержать null, если элемент == null.

Diff

Тестовый запуск

Настало время выполнить первый тестовый запуск. Для этого добавим реализацию двух терминальных методов: iterator() и forEach():

@Override
public Iterator<T> iterator() {
    throwIfNotWaiting(); //бросим исключение если не в WAITING состоянии

    state = State.OPERATED; //сменим состояние на OPERATED

    return streamerIterator; //вернем «внутренний» итератор - (кран)
}

@Override
public void forEach(Consumer<? super T> action) {
    throwIfNotWaiting();

    state = State.OPERATED;

    while (streamerIterator.hasNext()) //пока в «кране» есть вода...
        action.accept(streamerIterator.next());//...применим action к этой воде
}

Коммитушка

Протестируем:

final Stream<?> stream =
    Streamer
        .of(108, 5, 12, 11, 4, 9, 7, 5) //инстанциируем стример из набора констант
            .distinct() //(108, 5, 12, 11, 4, 9, 7, [5]) - отбросим дубли
            .skip(1)    //([108], 5, 12, 11, 4, 9, 7) - отбросим из начала - один элемент
            .limit(6)   //(5, 12, 11, 4, 9, 7) - лимитируем выборку в шесть элементов
            .limit(5)   //(5, 12, 11, 4, 9, [7]) - их всего шесть,,,, значит лимитируем в пять :)
            .map(i -> i == 11 ? 12 : i) //(5, 12, [11]->12, 4, 9) //там где значение == 11, заменим на 12, в других случаях - оставим как есть
            .distinct() //(5, 12, [12], 4, 9) //повторно отсеим новые дубли
            .map(i -> (i & 1) == 1 ? i * 2 : i) //([5]->10, 12, 4, [9]->18) каждое нечетное умножим на два, остальные оставим как есть
            .skip(1) //([10], 12, 4, 18) - отбросим из начала - один элемент
            .map(String::valueOf) //("12", "4", "18") - преобразуем в строковые (изменится и тип стрима, поэтому он объявлен как <?>)
            .map(s -> s.equals("12") ? "twelve" : s.equals("18") ? "eighteen" : String.format("(%s)unknown", s)) //то, что знаем, преобразуем в строки
        ; 					

stream.forEach(System.out::println); //("twelve", "(4)unknown", "eighteen")

Версия на github`e

А вот и результат его выполнения:

twelve
(4)unknown
eighteen

Process finished with exit code 0

Вполне ожидаемый. Для проверки можно применить «хитрость», заменить Streamer.of на Stream.of и посмотреть как отработает «оригинал». Результат в обоих случаях должен быть одинаковый. Ну вот мы и реализовали большинство конвеерных методов и два терминальных, которых достаточно для проверки работы стримера. Из конвеерных, пока не реализованы: sorted() и мэпперы (mapTo…, flatMap(), flatMapTo...). Эти методы имеют некоторые особенности, поэтому рассмотрим их реализацию отдельно.

sorted()

Как следует из названия — данный метод сортирует данные в стриме. Делает он это Comparator`ом представленным в аргументе, либо компаратаром для представленного в стриме типа. Он должен быть Comparable с собой же (иначе - ClassCastException).

Особенностью работы данного метода является то, что операция сортировки не является последовательной, т.е. требует предварительного накопления всех имеющихся в источнике данных.

Рассмотрим условный пример:

	streamer.iterate(…)
			  .limit(…) // 1.1
		.sorted() //1
			  .distinct(…) //2.1
			  .filter(…) //2.2
		.sorted() //2
			  .map(…) //3.1
			  .distinct(…) //3.2
		.sorted() //3
			  .skip(…)   //n.1
			  .filter(…)  //n.2

В этом примере операции можно разделить на три условных блока с сортировкой в конце каждого. Последние операции «n.1» и «n.2» не замыкаются сортировкой, поэтому не входят в так называемый «условный блок».

Для того, чтобы выполнить этот пример, мы должны пойти примерно следующим путем:

  1. вычитать весь «внешний источник-итератор» в какой либо контейнер (коллекция, массив, файл и т.д.)

  2. «Прогнать» элементы получившегося контейнера, последовательно по конвееру операций текущего «условного блока»

  3. отсортировать этот контейнер с применением указанного в сорировщике компаратора (либо компаратором по умолчанию в случае отсутствия первого).

  4. заменить указатель внешнего итератора-источника, на итератор этой отсортированной коллекции. Теперь наш источник, это нами же порожденный внутренний контейнер.

  5. сменить «условный блок» на следующий и выполнить данный алгоритм заново, с пункта №1. Если это был последний «условный блок», то считать выполнение сортировок оконченой и вернуть управление конвееру с итератором, указывающим на итератор отсортированный ранее коллекции.

Если тоже самое выразить языком кода, то у меня получился следующий набор изменений

Код-спойлер
//sorted()
private int sortedCount; //поле стримера, хранит кол-во операций сортировки

...
  
intermediateOperations.add(...);
sortedCount++;

...

@Override
public boolean hasNext() {
    if (hasNext == null) {

        if (sortedCount > 0) //если есть сортировки...
            calculateSorted(); //… выполним сначала их...

        calcNextAndHasNext();
...
@SuppressWarnings({"OptionalAssignedToNull","unchecked"})
private void calculateSorted() {
    for (int i = 1; i <= sortedCount; i++) { //цикл по «условным блокам» 
        final List<IntermediateOperation> localOperations = new LinkedList<>();
        SortedOperation<T> sortedOperation = null;
        for (Iterator<IntermediateOperation> itr = intermediateOperations.iterator(); itr.hasNext(); ) {
            IntermediateOperation operation = itr.next();
            try {
                if (operation instanceof SortedOperation) { //если операция сортировки — выделим этот блок...
                    sortedOperation = (SortedOperation<T>) operation;
                    break;
                } else
                    localOperations.add(operation);
            } finally {
                itr.remove();
            }
        }

        //на основании «условного блока» соберем коллекцию
        final List<T> data = new ArrayList<>();
        Optional<T> nextOpt;
        do {
            nextOpt = getNext(localOperations);
            if (nextOpt != null)
                data.add(nextOpt.orElse(null));
        } while (nextOpt != null);

        //отсортируем получившийся список...
        if (sortedOperation != null)
            data.sort(sortedOperation.comparator);

        //подменим итератор
        externalIterator = data.iterator();
    }
}

diff commit`a

В этом коммите так же изменен и StressRunner — добавлено несколько сортировок.

flatMap()

Этот метод порождает новый стрим, «раскрывая/разворачивая/расхлопывая» элементы родительского стрима. Схематично это можно отобразить так:

	- element1
		- subelement1_FROM_element1
		- subelement2_FROM_element1
		- subelement3_FROM_element1
	- element2
		- subelement4_FROM_element2
		- subelement5_FROM_element2

Простейший способ реализации может выглядеть так:

@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);

    throwIfNotWaiting();

    Stream<R> result = Stream.empty();

    for (T t : this)
        result = Stream.concat(result, mapper.apply(t));

    return result;
}

Это решение не совсем корректно. В нем порождаемый стрим не является последовательным. Данные в нем накапливаются и затем соединяются (конкатенируются) в общий итоговый результат. Это наглядно демонстрирует пример.

Запустив указанный выше пример, мы увидим единоразовый, массовый «выброс» результата в консоль. Нас это не устраивает, мы предпочитаем «последовательность» «массовости».

Исправить это можно реализовав например такую идею:

Опишем класс итератора результатирующего типа — OfR (Iterator). На вход ему будем передавать итератор текущего стрима - OfT (Iterator) и клиентскую функцию mapper, которая будет раскладывать элементы из родительского — ofT на элементы подмножества - ofR, которые и будем по одному возвращать клиенту.

Другими словами:

У стрима запросили следующий элемент - next(). Он проверил, есть ли у него таковой. Если таковой имеется, его и вернем. Если же элемент отсутсвует - проверим наличие базового элемента в родительском стриме. Если в родительском стриме есть элемент - разложим его, и начнем заново. Если же нет - hasNext() = false

Код:

//flatMap()

@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);

    class IteratorOfR implements Iterator<R> {
        private final Iterator<T> OfT = Streamer.this.iterator(); //родительский итератор (содержит элементы множества, которые будем раскладывать)
        private Iterator<? extends R> ofR; //элементы подмножества Stream<R>, текущего элемента из ofT, которые и будем возвращать конечному клиенту
        
        @Override
        public R next() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofR.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofR == null || !ofR.hasNext()) && OfT.hasNext()) //если ofR не задан (напр.: первый запрос), или в ofR отсутствуют элементы и при этом есть что раскладывать в родительском (ofT)...
                ofR = mapper.apply(OfT.next()).iterator(); //...разложим элемент из ofT в подмножество ofR
            return ofR != null && ofR.hasNext();
        }
    }

    return Streamer.of(new IteratorOfR());
}

Кстати, IteratorOfR я решил сделать вложенным (enclosure) классом, так как его использование за пределами метода flatMap() не предполагается.

Коммит на github'e

[flat]MapTo{Int/Long/Double}()

Этот набор методов стоит рассмотреть отдельно. Все они возвращают стрим одного из трех типов: IntStream, LongStream, DoubleStream. Спецификой данных типов стримов, является то, что они оперируют примитивами, а не обертками. Изза этого, например IntStream гораздо быстрее работает с числами, чем Stream<Integer>, ведь первый работает со значением, а второй с оберткой. Тут стоит отметить, что поскольку mapTo[Int/Long/Double]() возвращают указанные выше типы, то принцип их реализации немного отличается от обычного map() и более похож на реализацию flatMap(), за тем исключением что элементы родительского стрима не раскладываются на подмножества, а модифицируются соответствующим mapper`ом и возвращаются по одному. Звучит запутанно, смотрим код:

@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {

    Objects.requireNonNull(mapper);

    class OfInt implements PrimitiveIterator.OfInt {

        @Override
        public int nextInt() {
            return mapper.applyAsInt(streamerIterator.next());
        }
        @Override
        public boolean hasNext() {
            return streamerIterator.hasNext();
        }
    }

    return StreamSupport
            .intStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfInt(),
                            0),
                    false);
}

Оставшиеся методы ...mapTo...() сделаны схожим образом, поэтому просто приведу реализацию

Код-спойлер
@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
    Objects.requireNonNull(mapper);

    class OfLong implements PrimitiveIterator.OfLong {
        private final Iterator<T> ofT = Streamer.this.iterator();

        @Override
        public long nextLong() {
            return mapper.applyAsLong(ofT.next());
        }

        @Override
        public boolean hasNext() {
            return ofT.hasNext();
        }
    }

    return StreamSupport
            .longStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfLong(),
                            0),
                    false);
}

@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
    Objects.requireNonNull(mapper);

    class OfDouble implements PrimitiveIterator.OfDouble {
        private final Iterator<T> ofT = Streamer.this.iterator();

        @Override
        public double nextDouble() {
            return mapper.applyAsDouble(ofT.next());
        }

        @Override
        public boolean hasNext() {
            return ofT.hasNext();
        }
    }

    return StreamSupport
            .doubleStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfDouble(),
                            0),
                    false);
}

@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
    Objects.requireNonNull(mapper);

    class OfInt implements PrimitiveIterator.OfInt {
        private final Iterator<T> ofT = Streamer.this.iterator();
        private PrimitiveIterator.OfInt ofInt;


        @Override
        public int nextInt() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofInt.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofInt == null || !ofInt.hasNext()) && ofT.hasNext())
                ofInt = mapper.apply(ofT.next()).iterator();

            return ofInt != null && ofInt.hasNext();
        }
    }

    return StreamSupport.intStream(
            Spliterators.spliteratorUnknownSize(new OfInt(), 0),
            false
    );
}

@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
    Objects.requireNonNull(mapper);

    class OfLong implements PrimitiveIterator.OfLong {

        private final Iterator<T> ofT = Streamer.this.iterator();
        private PrimitiveIterator.OfLong ofLong;

        @Override
        public long nextLong() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofLong.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofLong == null || !ofLong.hasNext()) && ofT.hasNext())
                ofLong = mapper.apply(ofT.next()).iterator();

            return ofLong != null && ofLong.hasNext();
        }
    }

    return StreamSupport.longStream(
            Spliterators.spliteratorUnknownSize(new OfLong(), 0),
            false
    );
}

@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
    Objects.requireNonNull(mapper);

    class OfDouble implements PrimitiveIterator.OfDouble {
        private final Iterator<T> ofT = Streamer.this.iterator();

        private PrimitiveIterator.OfDouble ofDouble;

        @Override
        public double nextDouble() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofDouble.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofDouble == null || !ofDouble.hasNext()) && ofT.hasNext())
                ofDouble = mapper.apply(ofT.next()).iterator();

            return ofDouble != null && ofDouble.hasNext();
        }
    }

    return StreamSupport.doubleStream(
            Spliterators.spliteratorUnknownSize(new OfDouble(), 0),
            false
    );
}

Терминальные методы

Все терминальные методы стримера работают по общей последовательности действий:

1 — проверка корректности переданных аргументов (как правило — «не null»)

2 — проверка текущего состояния - должно быть WAITING, если ДА, то переводим стример в OPERATED, в противном случае бросаем исключение

3 — выполнение требуемой логики и возврат результата

4 — завершение стримера.

Шаблон описанный выше выглядит так:

public ... someTerminalMethod(... args) {
		Objects.requireNonNull(args); //1 — проверка аргументов
		throwIfNotWaitingOrSetOperated(); //2 — проверка и 
  																		//переключение состояния
		try {
				... //3 - выполнение требуемых действий...
		} finally {
				internalClose(); //4 — завершение стримера
		}
}

P.S., так же, во всех конвеерных методах я заменил возвращаемый тип со Stream<T> на Streamer<T> т.к. это помогает избежать ненужных приведений типов в клиентском коде.

diff

P.P.S. так же, в этом коммите я покрыл тестами терминальные методы. Отчасти, логику работы этих методов можно понять по этим тестам.

Баги

В процессе написания этой публикации были обнаружены некоторые баги. Например такой код приведет к ClassCastException:

    Streamer<String> streamer = Streamer.of("10", "5", "15");
    streamer.map(Integer::valueOf); //CCE не тут...
    streamer.forEach(System.out::println); //← а тут!

Это происходит потому, что стример объявлен как Streamer<String>, но после вызова map() происходит смена типа на Streamer<Integer>. В forEach(), параметр action объявлен как Consumer<? super T>. На этапе компиляции, компилятор неявно добавит приведение к типу: (String)streamerIterator.next() в методе forEach(), которое и приведет к CCE в момент выполнения.

Чтобы решить эту проблему, можно воспользоваться способом, которым реализован flatMap() — возвращать экземпляр нового стримера, который связан с текущим, а текущий переводить в OPERATED. Похожим образом работает и оригинальный стрим, но подробнее об этом поговорим позже. Для внедрения исправления, необходимо всего лишь заменить:

return (Streamer<R>) this;

на

return (Streamer<R>)Streamer.of(this);

Тут порождаем новый стример, завязанный (слинкованный) на "вызывающего". Кстати, вызов this.iterator() и переведет наш стример в OPERATED состояние, поэтому даже об этом мы уже позаботились ранее :) Теперь вызов forEach(), который сменил свой тип невозможен, поскольку он в OPERATED состоянии. Но появляется новая проблема. Мы исправили CCE, ценой размытого контракта. Этот конвеерный метод теперь является фабрикой, пораждающей новый экземпляр, но не билдером, как остальные конвеерные методы. Что с этим делать, обдумаем позже, а сейчас рассмотрим другой баг…

Код:

final AtomicInteger ai = new AtomicInteger();
Streamer.generate(ai::getAndIncrement).limit(10).forEach((v) -> System.out.print(v + " "));
System.out.print(ai);

Этот код выдает в результат:

0 1 2 3 4 5 6 7 8 9 11

В этой последовательности пропущен один элемент — «10». Потеря произошла по причине того, что значение «10» было сгенерировано (т.е. закэшировано внутренним итератором), но было отфильтровано операцией LimitOperation, поэтому значение внешней переменной за циклом, будет равно «11». Я исправил это следующим образом: LimitOperation начинает отбрасывать значения не при фактическом достижении лимита, а в момент, когда итератор подошел к «границе» - когда пресечением лимита будет следующий, а не текущий элемент. Затем, там, где раньше он «отбрасывался», теперь такой элемент пропускается, но с установкой флага noNext = true, который и проверяем в дальнейшем перед генерацией элемента.

diff

Тестирование примерами

Числа Фибоначчи:

Streamer
        .iterate(new int[]{0,1}, ints -> new int[]{ints[1],ints[0] + ints[1]})
        .limit(10)
        .mapToInt(ints -> ints[1])
        .forEach(System.out::println); //1, 1, 2, 3, 5, 8, 13, 21, 34, 55

Сгенерируем пенсионеров:

Streamer
        .generate(PersonUtils::generateRandomPerson)
        .filter(person ->
                (person.gender == Person.Gender.MALE && person.age >= 63)
                    ||
                (person.gender == Person.Gender.FEMALE && person.age >= 58))
        .limit(10)
        .sorted(Comparator.comparing(Person::getGender).thenComparingInt(Person::getAge).reversed()) //отсортируем сначала по полу, затем по возрасту (в порядке убывания)
        .forEach(System.out::println); //выведем результат

Сгенерируем армейский призыв:

Streamer
        .generate(PersonUtils::generateRandomPerson) //сгенерируем персону
            .filter(person -> person.gender == Person.Gender.MALE) //отберем по полу
            .filter(person -> person.age >= 18 && person.age <= 27) //отберем по возрасту
            .limit(10) //остановим генерацию, когда набрали 10 "кандидатов"
            .sorted(Comparator.comparingInt(Person::getAge).thenComparing(Person::getName)) //отсортируем сначала по возрасту, затем по ФИО
        .forEach(System.out::println); //выведем результат

Гонки стримов

Ну вот и пришло время прощаться тестировать результат на скорость. Функциональное тестирование мы проводили во время разработки и относительно плотно его покрыли. Осталось проверить производительность… И так, устроим числовые гонки стримов... На трассу выходят три боллида: №1 — JDK IntStream; №2 — JDK stream (оригинал); №3 — Streamer; Задача — проехать на время прямую дистанцию, длиной в 100000000 шагов итераций, на финише будем измерять время затраченое на это. Всё просто.

Код гонки:

public class SpeedRunner {
    public static void main(String[] args) {
        //CASE #1 - IntStream.iterate
        System.out.println("-----------------\nCASE #1 - IntStream.iterate");
        printStreamTiming(IntStream.iterate(1, i -> i + 1));
        //CASE #2 - Stream.iterate
        System.out.println("-----------------\nCASE #2 - Stream.iterate");
        printStreamTiming(Stream.iterate(1, i -> i + 1));
        //CASE #3 - Streamer.iterate
        System.out.println("-----------------\nCASE #3 - Streamer.iterate");
        printStreamTiming(Streamer.iterate(1, i -> i + 1));
    }

    private static void printStreamTiming(Stream<Integer> stream) {
        long start = System.currentTimeMillis();
        stream
                .filter(i -> i > 100000000)
                .limit(10)
                .forEach(i -> {});
        long elapsed = System.currentTimeMillis() - start;
        System.out.printf("Elapsed time: %dms (%s)\n", elapsed, new SimpleDateFormat("mm:ss.S").format(new Date(elapsed)));
    }

    private static void printStreamTiming(IntStream stream) {
        long start = System.currentTimeMillis();
        stream
                .filter(i -> i > 100000000)
                .limit(10)
                .forEach(i -> {});
        long elapsed = System.currentTimeMillis() - start;
        System.out.printf("Elapsed time: %dms (%s)\n", elapsed, new SimpleDateFormat("mm:ss.S").format(new Date(elapsed)));
    }
}

Результат гонки:

-----------------
CASE #1 - IntStream.iterate
Elapsed time: 58ms (00:00.58)
-----------------
CASE #2 - Stream.iterate
Elapsed time: 582ms (00:00.582)
-----------------
CASE #3 - Streamer.iterate
Elapsed time: 7662ms (00:07.662)

Фаворитом гонки ожидаемо оказался IntStream из JDK — 58ms, против 582ms у прибывшего вторым JDK Stream. Различие в скорости 10 раз! Поэтому, если вам потребуется работать с числами в стримах, необходимо использовать IntStream, вместо Stream<Integer>. Стример же прибыл к финишу третим, с результатом 7662ms, что в 13 раз медленнее оригинала из JDK и в ~130 раз медленнее JDK IntStream`a.

Результат не очень хороший, но мне удалось буквально одним движением руки, ускорить его примерно в семь раз (1156ms). Сделал я это, заменив return this; на return Streamer.of(this); в методе filter(). Т.е., с точки зрения производительности, получилось выгоднее строить конвеер не из набора операций, а конвеер из связанных друг с другом стримеров, где каждый выполняет одну отдельную операцию, сам является как цепью, так и отдельным ее звеном.

Я не понимаю откуда такая разница, по моим соображениям такой подход наоборот должен замедлять производительность (и то не слишком заметно), поскольку внутренний цикл должен работать быстрее чем цепной вызов связанных итераторов (по сути, являющийся однонаправленным списком). Но это лишь теория, практика показала обратное.

Builder → Factory

Так же хочется отметить что замена применения вышеуказаного фикса - return this; на return Streamer.of(this); во всех терминальных методах приведет не только к ускорению стримера, но так же, к смене контракта его конвеерных методов. В данный момент он реализован по принципу билдера - возвращает экземпляр себя. Данная замена изменит это соглашение на фабрику — каждый вызов порождает новый экземпляр связанный с родительским. JDK стрим работает именно таким образом (фабрикой). Я свою рабочую копию оставлю на «медленном» варианте, так как в противном случае, чтобы было все по «уму», придется так же переписать всю логику работы конвеера. Например хранить не список операций List<IntermediateOperation>, а единичную операцию для данного экземпляра.

А если идти еще более правильным путем, то реализовывать это нужно наследованием. Примерно с такой иерархией классов:

xxxStreamerOperationAbstractStreamerOperationAbstractStreamerStream

Optional и null`ы

JDK Stream имеет такую особенность:

//NullPointerException изза Optional:
Stream.of(null, 1, 2, 3).findFirst();
Stream.of(null, 1, 2, 3).findAny();
Stream.of(1, null, 2, 3).min(Comparator.nullsFirst(Integer::compareTo));
Stream.of(1, null, 2, 3).max(Comparator.nullsLast(Integer::compareTo));
Stream.of(1, null, 2, 3).reduce((total, curr) -> { Integer t = total + (curr == 					null ? 0 : curr); return (t == 6 ? null : t); });

В приведенных выше примерах источником NPE является Optional накладывающий дополнительные ограничения на вышеуказанные методы. Довольно неприятная вещь, которую нужно держать в голове работая с терминальными методами, возвращаемый тип которых Optional<>: findAny(), findFirst(), min(), max(), reduce()

Переименования

Так же решил переименовать некоторые методы/поля/классы. Например of подразумевает создание экземпляра на основе константных значений(я). Поэтому методы, порождающие стример из источника, а не из значений я переименовал из of() в from(): from(Iterable<T>); from(Iterator<T>);

Так же, добавил методы порождения из мэпы:

from(Map<K,V>) - Streamer<Map.Entry<K,V>> (стример из элементов мэпы) fromMapKeys(Map<K,V>) - Streamer<K> (стример из ключей мэпы) fromMapValues(Map<K,V>) - Streamer<V> (стример из значений мэпы)

Как всегда подтверждаю коммитом :)

Для меньшей путанности так же переименовал: externalIterator → sourceIterator; StreamerIterator → InternalStreamerIterator, милиция → полиция, медики → медведики

Вишенка без тортика :)

Если flatMap() раскладывает элементы стрима на подмножества, то должен быть метод группирующий их. Встречайте:

public <K> Map<K,Collection<T>> groupBy(Function<? super T,? extends K> groupMapper) {
    return collect(HashMap::new,
            (map, object) -> map.merge(
                    groupMapper.apply(object),
                    new ArrayList<>(Collections.singletonList(object)),
                    ((left, right) -> {
                        left.addAll(right);
                        return left;
                    }) ),
            null);
}

Он позволяет группировать элементы в Map, где ключ (<K>) это группа, а значение (<V>) это коллекция элементов этой группы.

Рассмотрим на примере:

Допустим есть Streamer<Number> который содержит разные подклассы чисел (Number): Integer, Long, Double, Float. Сгруппируем значения этого стримера по их типу (классу), затем «оформлено» выведем результат:

//создадим стример из констант
Streamer<Number> streamer = Streamer.of(1, 4L, 6.5d, 18.1f, 8, 15L, 16.111125d, 218.12f, 41, 45L, 1116.5d, 222.3f);
//сгруппируем по классу
Map<Class, Collection<Number>> groupedByClasses = streamer.groupBy(Number::getClass);
//оформим вывод результата
for (Class numberClass : groupedByClasses.keySet()) {
  System.out.println(numberClass.getSimpleName() + ":");
  for (Number number : groupedByClasses.get(numberClass))
    System.out.println("\t" + number);
}

Результат:

Long:
	4
	15
	45
Double:
	6.5
	16.111125
	1116.5
Float:
	18.1
	218.12
	222.3
Integer:
	1
	8
	41

Process finished with exit code 0

Или по признаку - четное/нечетное:

Map<Boolean, Collection<Number>> groupedByParity = streamer.groupBy(number -> 	(number.intValue() & 1) == 0); //вернет Boolean.TRUE для четного и Boolean.FALSE для НЕ четного number, по которому и «объединим»
System.out.println("Четные: " + groupedByParity.get(true));
System.out.println("Нечетные: " + groupedByParity.get(false));

П.С.ы.

Изменил поведение from(Iterable) - на «отложенный запуск». Теперь, при порождении из коллекции, итератор этой коллекции запрашивается при запуске стримера в работу. Это позволяет не блокировать коллекцию на изменения до запуска стримера, и в случае изменения этой коллекции после создания стримера избежать ConcurrentModificationException.

Финал

По мере наличия у меня времени на эту работу, я буду продолжать свои исследования в этой области и скорее всего репозиторий будет обновлятся. Если есть идеи выраженные в коде, прошу их pull-реквестить :), текстовые и теоретические идеи, замечания, оскорбления, допущеные мною ошибки и т.д. , прошу в комменты. :) П.П.С. текст по ходу статьи может незначительно отличаться от того что в репозитории, т.к. в процессе были и ошибки и идеи, но это не изменяет принципов описанных тут, изменения не значительны.

Благодарю за внимание!

Комментарии (9)


  1. sshikov
    16.08.2021 23:01

    >Если есть идеи выраженные в коде, прошу их pull-реквестить :)
    Не очень понятно, какой стимул будет у тех, кто контрибьютит в ваш проект.

    У вас же идея была в самообучении, но все самое основное вы уже реализовали. А практическое использование такого проекта — штука сомнительная, хотя бы потому, что уже есть довольно широко известные похожие проекты. Грубо говоря — стримы без parallel, это же просто коллекции, у которых есть map/flatMap/etc. Ну т.е. как минимум, есть скала, груви, vavr в виде библиотеки, где все очень похоже. Чтобы развивать такой проект, должен быть какой-то еще смысл в нем, мне кажется.


  1. angry-architect
    17.08.2021 00:32
    +3

    Самообучение это конечно очень важная вещь, но статья полна неточностей, заблуждений и незнания матчасти. К примеру про Optional

    Optional у меня может быть null`ом. Да может, и это его логичное на мой взгляд применение. Не нашли значение — опционал == null, нашли значение, опционал его содержит (даже если оно null), иначе какая польза от этого опционала? Да никакой! 

    Советую посмотреть про Optional у Stuart Marks https://www.youtube.com/watch?v=fBYhtvY19xA&ab_channel=Devoxx и сразу многое прояснится.

    И такого у вас много...

    Кстати для "гонок" производительности стоит попробовать написать микробенчмарк с помощью JMH. Благо статей по нему много .

    P.S. Статью стоит перечитать и переписать, что должно принести не меньшую пользу.


    1. komkom Автор
      17.08.2021 14:38

      Советую посмотреть про Optional у Stuart Marks

      пока статья была на модерации (и я уже не мог ее редактировать) я нашел использование Optional "в полезном ключе", об этом я упомянул в UPD1 к данной публикации.

      И такого у вас много...

      приведите еще примеры

      Кстати для "гонок" производительности стоит попробовать написать микробенчмарк с помощью JMH.

      согласен, но для данной статьи не критично, так как тест производительности тут дается лишь "постольку-поскольку" вкратце. И если копать в сторону производительности то нужно начинать не с теста, НО хорошим тестом заканчивать :)

      P.S. Статью стоит перечитать и переписать, что должно принести не меньшую пользу.

      какие еще неточности обнаружились кроме указанных выше?

      P.S. за них - благодарность, особенно с Optional. добавил UPD1 с сохранением своего "ляпа"


      1. komkom Автор
        19.08.2021 22:04

        Советую посмотреть про Optional у Stuart Marks

        Посмотрел. Нужные рекомендации, с которыми я уже был знаком. Но напрашивается один нюанс...

        В этом видео Стюарт Маркс показывает то, как введение Optional в Java 8 позволяет избежать лишних проверок return'ов на null. Но он упустил один важный баг нюанс связанный с Optional и Stream.

        Как я писал выше в этой публикации, методы возвращающие Optional (а это: findFirst(), findAny(), min(), max(), reduce() ) не могут работать с null-значениями. Как то странно получается... На пустом стриме/коллекции они вернут Optional.empty(), а значение (хоть оно и null, ! но это значение !), приведет к трудноуловимому NPE. Конечно это баг особенность Stream, но и "вина" Optional тут имеется. Ведь если бы Optional не существовало, то findFirst() просто вернул бы null как first значение. В нынешнем же варианте (Java 8 и 11), он вообще ничего не вернет, а упадет с NPE. Поэтому тут мы теряем больше, чем приобретаем, так как теперь, чтобы гарантировать предсказуемую работу этих методов мы должны проверять на наличие null не возвращаемое значение а входные аргументы... !входные аргументы, Карл! (что очевидно еще хуже)

        Примеры кода, который будет падать, если personList будет содержать null:

        №1:

        String getNameOfMinimalPerson(List<Person> personList) {
          return personList.stream()
            .min(Comparator.nullsFirst(Comparator.comparingInt(Person::getId))) //#2
            .orElse("Empty persons list, please fill it.");
        }

        №2:

            String getNameOfRandomPerson(List<Person> personList) { //<- throws an exception
                int rnd = ThreadLocalRandom.current().nextInt(personList.size());
        
                return Streamer.from(personList)
                        .skip(rnd) 
                        .findFirst() //упадет если "rnd" окажется равным индексу null значения в списке
                        .map(Person::getName)
                        .orElse("No persons! Please fill it and try again");
            }

        Ну и на последок...

        Ироничным выглядит тот факт, что код, который Стюарт приводит в качестве примера безопасного использования Optional для предотвращения NPE (время 9:38) лишен проверки на null там где она нужна. В итоге код подвержен NPE прямо "из коробки", даже без малейших его изменений:

        String customerNameById(List<Customer> custList, int custId) {
          Optional<Customer> opt =
            custList.stream()
            	.filter(c -> c.getId() == custId) //нет проверки "c" на null
              .findFirst(); //#1
            
          return opt.isPresent() ? opt.get().getName()
        }

        Если в custList'e будет хотя бы один null упадет метод filter() который не проверяет c на null

        Вот, как то так...


        1. angry-architect
          23.08.2021 14:52

          Ироничным выглядит тот факт, что код, который Стюарт приводит в качестве примера безопасного использования Optional для предотвращения NPE (время 9:38) лишен проверки на null там где она нужна. В итоге код подвержен NPE прямо "из коробки", даже без малейших его изменений

          Думаю, что код, который Стюарт привёл как пример, написан с допущением, что коллекция не содержит null, так как мы не видим контекста в котором будет вызываться этот метод. А так то можно и вместо custList передать null и получить NPE.

          На пустом стриме/коллекции они вернут Optional.empty(), а значение (хоть оно и null, ! но это значение !), приведет к трудноуловимому NPE. Конечно это баг особенность Stream, но и "вина" Optional тут имеется. Ведь если бы Optional не существовало, то findFirst() просто вернул бы null как first значение.

          Так как findFirst возвращает Optional, то можно написать вот такой код (пример синтетический):

                  var result = players.stream()
                          .filter(player -> player.getScore() > 100)
                          .findFirst()
                          .map(Player::getData)
                          .map(History::filterData)
                          .map(History::processData)
                          .orElseGet(Collections::emptyList);

          В случае если бы findFirst возвращал null, то код оброс бы проверками c if и красивой цепочки не получилось. Считаем, что коллекция players не содержит null значений, но можно конечно сделать фильтрацию по ним, если уж режет глаз.


          1. komkom Автор
            23.08.2021 17:25

            Вы плохо прочитали/поняли то, что я имел в виду:

            Так как findFirst возвращает Optional, то можно написать вот такой код (пример синтетический):

            В том то и дело, findFirst() НЕ возвращает Optional если value == null. В этом случае он падает с NPE... Порождение происходит вызовом of(), вместо ofNullable(). Поэтому, код который вы привели так же не будет работать, до ваших map'ов дело не дойдет. Падение будет в findFirst(), никакого Optional'a создано не будет


            1. angry-architect
              23.08.2021 18:33

              Я не зря написал про допущение, что коллекции не содержат null, потому как в обоих примерах, если будет null, то будет NPE и я с этим фактом не спорю, но я пытался объяснить почему findFirst не возвращает null, а возвращает Optional. Это один момент, а второй момент, что в javadoc к методу findFirst написано, что метод может выбросить NPE и это говорит о том, что у создателей API были на то причины. Другой вопрос, что нам такое положение вещей может не нравится и поэтому появляются альтернативы по типу vavr .


              1. komkom Автор
                23.08.2021 19:24

                да, но это как то странно,,, мы можем быть пустыми (size == 0), но вот null содержать не можем....

                наверняка у дизайнеров JDK были какие то причины на это, узнать бы их еще..... а так выглядит очень ляпистым просчетом))


  1. aleksandy
    17.08.2021 06:28

    А что по этому поводу думает товарищ @lany?