Привет, Хабр!

На этой неделе мы ожидаем из типографии новую книгу по Spring 5:


Среди интересных возможностей Spring 5 отдельного упоминания заслуживает реактивное программирование, о реализации которого в этом фреймворке кратко рассказывает предлагаемая статья Мэтта Рэйбла (Matt Raible). В вышеупомянутой книге реактивные паттерны рассмотрены в главе 11.

Соавтором Мэтта выступил Джош Лонг, автор еще одной отличной книги про Java и Spring, "Java в облаке", вышедшей у нас прошлым летом.

Реактивное программирование – ваш путь к созданию систем, устойчивых к высоким нагрузкам. Обработка огромного трафика – уже не проблема, так как сервер неблокирующий, и клиентским процессам не приходится дожидаться откликов. Клиент не может непосредственно наблюдать, как программа выполняется на сервере, и синхронизироваться с нею. Когда API затрудняется с обработкой запросов, он должен все равно давать разумные отклики. Не должен отказывать и отбрасывать сообщения неконтролируемым образом. Должен сообщать вышестоящим компонентам о том, что работает под нагрузкой, чтобы те могли его частично от этой нагрузки освободить. Такой прием называется «обратный поток» (backpressure), это важный аспект реактивного программирования.

Эту статью мы написали в соавторстве с Джошем Лонгом. Джош – Java-чемпион, Spring Developer Advocate и вообще мировой парень, работающий в Pivotal. Я давно работаю со Spring, но именно Джош показал мне Spring Boot, это было на конференции Devoxx в Бельгии. С тех пор мы крепко сдружились, увлекаемся Java и пишем классные приложения.

Реактивное программирование или I/O, I/O, на работу мы идем…

Реактивное программирование – это такой подход к созданию ПО, при котором активно используется асинхронный ввод/вывод. Асинхронный ввод/вывод – небольшая идея, чреватая большими переменами в программировании. Сама идея проста: исправить ситуацию с неэффективным распределением ресурсов, высвобождая те ресурсы, которые без нашего вмешательства простаивали бы впустую, дожидаясь завершения ввода/вывода. Асинхронный ввод/вывод инвертирует привычный подход к обработке I/O: клиент освобождается и может заниматься другими задачами, ожидая новых уведомлений.

Рассмотрим, что общего между синхронным и асинхронным вводом/выводом, и каковы отличия между ними.

Напишем простую программу, считывающую данные из источника (конкретно – речь о ссылке java.io.File). Начнем с реализации, в которой используется старый добрый java.io.InputStream:

Пример 1. Синхронное считывание данных из файла

package com.example.io;

import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.function.Consumer;

@Log4j2
class Synchronous implements Reader {

    @Override
    public void read(File file, Consumer<BytesPayload> consumer) throws IOException {
        try (FileInputStream in = new FileInputStream(file)) { //1
            byte[] data = new byte[FileCopyUtils.BUFFER_SIZE];
            int res;
            while ((res = in.read(data, 0, data.length)) != -1) { //2
                    consumer.accept(BytesPayload.from(data, res)); //3
            }
        }
    }
}

  1. Предоставляем файл на считывание при помощи обычного java.io.File
  2. Вытягиваем результаты из источника по одной строке за раз …
  3. Этот код я написал, чтобы принимать Consumer<BytesPayloadgt;, вызываемый при поступлении новых данных

Достаточно незамысловато, что скажете? Запустите этот код – и увидите в логе вывод (левее каждой строки), свидетельствующий, что все действия происходят в единственном потоке.
Здесь мы вытягиваем байты из наших данных, взятых в источнике (в данном случае речь идет о подклассе java.io.FileInputStream, унаследованном от java.io.InputStream). Что же не так с этим примером? В данном случае мы используем InputStream, указывающий на данные, расположенные в нашей файловой системе. Если файл там есть, и жесткий диск функционирует, то этот код сработает как ожидается.

Но, что же произойдет, если мы будем считывать данные не из File, а из сетевого сокета, причем, воспользуемся иной реализацией InputStream? Беспокоиться не о чем! Конечно, беспокоиться будет совсем не о чем, если скорость работы сети бесконечно велика. И если сетевой канал между этим и другим узлом никогда не откажет. Если эти условия выполняются, то код будет работать превосходно.

А что будет, если сеть станет притормаживать или ляжет? В данном случае я имею в виду, что у нас будет увеличиваться срок до возврата операции in.read(…). В самом деле, она может и вообще не вернуться! Это проблема, если мы пытаемся еще что-нибудь делать с потоком, из которого считываем данные. Разумеется, всегда можно создать еще один поток и считывать данные через него. До некоторого момента этим можно обходиться, но, в конце концов, достигнем предела, при котором простого добавления потоков для дальнейшего масштабирования будет уже недостаточно. У нас не будет истинной конкурентности сверх того количества ядер, которые имеются на нашей машине. Тупик! В данном случае мы можем наращивать обработку ввода/вывода (здесь имеется в виду считывание) только за счет дополнительных потоков, а здесь мы рано или поздно достигнем предела.

В данном примере основной кусок работы приходится на считывание – на других фронтах почти ничего не происходит. Мы зависим от ввода/вывода. Рассмотрим, как асинхронное решение помогает нам частично побороть монополизацию наших потоков.

Пример 2. Асинхронное считывание данных из файла

package com.example.io;

import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@Log4j2
class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> {

    private int bytesRead;
    private long position;
    private AsynchronousFileChannel fileChannel;
    private Consumer<BytesPayload> consumer;
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public void read(File file, Consumer<BytesPayload> c) throws IOException {
        this.consumer = c;
        Path path = file.toPath(); // 1
        this.fileChannel = AsynchronousFileChannel.open(path,
            Collections.singleton(StandardOpenOption.READ), this.executorService); //2
        ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
        this.fileChannel.read(buffer, position, buffer, this); //3
        while (this.bytesRead > 0) {
                this.position = this.position + this.bytesRead;
                this.fileChannel.read(buffer, this.position, buffer, this);
        }
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) { //4
        
        this.bytesRead = result;

        if (this.bytesRead < 0)
            return;

        buffer.flip();

        byte[] data = new byte[buffer.limit()];
        buffer.get(data);

        //5
        consumer.accept(BytesPayload.from(data, data.length));

        buffer.clear();

        this.position = this.position + this.bytesRead;
        this.fileChannel.read(buffer, this.position, buffer, this);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        log.error(exc);
    }
}

  1. На этот раз приспосабливаем java.io.File, делая из него Java NIO java.nio.file.Path
  2. Создавая Channel, мы, в частности, указываем сервис java.util.concurrent.ExecutorService, который будет использоваться для вызова обработчика CompletionHandler, когда появятся нужные для этого данные
  3. Начинаем считывание, передавая ссылку на CompletionHandler<Integer, ByteBuffer> (this)
  4. В обратном вызове считываем байты из ByteBuffer в емкость byte[]
  5. Точно как в примере с Synchronous, данные byte[] передаются потребителю.

Сразу оговоримся: этот код получился гораааааздо сложнее! Здесь происходит такая куча вещей, что сразу голова кружится, однако, позвольте отметить… этот код считывает данные из Java NIO Channel, а затем обрабатывает эти данные, в отдельном потоке, отвечающем за обратные вызовы. Таким образом, тот поток, в котором началось считывание, не монополизируется. Мы возвращаемся практически мгновенно после вызова .read(..), и когда, наконец, у нас в распоряжении появляются данные, совершается обратный вызов – уже в другом потоке. При наличии задержки между вызовами .read() можно переходить к другим делам, выполняя их в нашем потоке. Длительность асинхронной операции считывания, от первого байта до последнего, в лучшем случае не больше, чем у синхронной операции считывания. Обычно асинхронная операция несущественно дольше. Однако, идя на такие дополнительные сложности, мы можем эффективнее обращаться с нашими потоками. Выполнять больше работы, мультиплексировать ввод/вывод в пуле с конечным количеством потоков.

Я работаю в компании, занимающейся облачными вычислениями. Мы хотели бы, чтобы для решения проблем с горизонтальным масштабированием вы приобретали все новые инстансы приложения! Разумеется, здесь я немного лукавлю. Асинхронный ввод/вывод немного усложняет ситуацию, но, надеюсь, этот пример иллюстрирует, чем же так полезен реактивный код: он позволяет обрабатывать больше запросов и выполнять больше работы на имеющемся аппаратном обеспечении, если производительность сильно зависит от ввода/вывода. Если производительность работы зависит от использования процессора (скажем, речь об операциях над числами Фибоначчи, майнинге биткойнов или криптографии), то реактивное программирование ничего нам не даст.

В настоящее время большинство из нас не пользуется реализациями Channel или InputStream при повседневной работе! О проблемах приходится размышлять на уровне более высокоуровневых абстракций. Речь о таких вещах как массивы или, скорее, об иерархии java.util.Collection. Коллекция java.util.Collection очень хорошо отображается на InputStream: обе сущности предполагают, что вы сможете оперировать сразу со всеми данными, причем, почти мгновенно. Ожидается, что вы сможете завершить считывание из большинства InputStreams пораньше, а не попозже. Типы коллекций становятся немного неудобными при переходе к более крупным суммам данных. Что делать, если вы имеете дело с чем-то потенциально бесконечным (неограниченным) – например, с веб-сокетами или серверными событиями? Что делать, если возникнет задержка между записями?

Нам нужен более оптимальный способ для описания данных такого рода. Мы говорим об асинхронных событиях, таких, которые произойдут в конечном итоге. Может показаться, что для такой цели хорошо подходят Future<T> или CompletableFuture<T>, но они описывают всего по одной вещи, происходящей в конечном итоге. На самом деле, в Java не предоставляется подходящей метафоры, которая позволяла бы описывать данные такого рода. Как Iterator, так и типы Stream из Java 8 могут быть несвязанными, однако, и те, и другие ориентированы на pull; вы сами запрашиваете следующую запись, а не тип должен отправлять обратный вызов вашему коду. Предполагается, что, если бы в данном случае поддерживалась обработка на основе push, позволяющая добиться на уровне потоков значительно большего, то и API также предоставляли бы поточность и управление планированием. В реализациях Iterator ничего не говорится о поточности, а все потоки Java 8 совместно используют один и тот же пул fork-join.

Если бы Iterator и Stream действительно поддерживали push-обработку, то мы столкнулись бы с другой проблемой, которая по-настоящему обостряется именно в контексте I/O: нам понадобится какой-либо механизм обратного проникновения! Поскольку потребитель данных обрабатывается асинхронно, мы понятия не имеем, когда данные окажутся в конвейере и в каком количестве. Не знаем, сколько данных потребуется обработать при следующем обратном вызове: один байт или один терабайт!

Вытягивая данные из InputStream, вы считываете столько информации, сколько готовы обработать, и не больше. В предыдущих примерах мы считываем данные в буфер byte[] фиксированной и известной длины. В асинхронном контексте нам нужен какой-то способ, чтобы сообщить поставщику, сколько данных мы готовы обработать.
Да-с. Здесь точно чего-то не хватает.

Поиск недостающей метафоры

В данном случае мы ищем метафору, которая бы красиво отражала суть асинхронного ввода/вывода, поддерживала такой механизм обратной передачи данных и позволяла контролировать поток выполнения в распределенных системах. В реактивном программировании возможность клиента просигнализировать, с какой нагрузкой он в состоянии справиться, называется «обратный поток».

Сейчас существует ряд хороших проектов — Vert.x, Akka Streams и RxJava – поддерживающих реактивное программирование. Команда Spring также ведет такой проект, именуемый Reactor. Между этими различными стандартами есть достаточно широкое обще поле, де-факто выделенное в стандарт Reactive Streams initiative. В Reactive Streams initiative определяется четыре типа:

Интерфейс Publisher<T&gt; производит значения, которые могут прибыть в конечном итоге. Интерфейс Publisher<T&gt; производит значения типа T для Subscriber<T>.

Пример 3. Реактивные потоки: интерфейс Publisher<T>.

package org.reactivestreams;

public interface Publisher<T> {

    void subscribe(Subscriber<? super Tgt; s);
}

Тип Subscriber подписывается на Publisher<T>, получая уведомления о любых новых значениях типа T через свой метод onNext(T). При возникновении каких-либо ошибок вызывается его метод onError(Throwable). Когда обработка завершилась нормально, вызывается метод onComplete подписчика.

Пример 4. Реактивные потоки: интерфейс Subscriber<T>.

package org.reactivestreams;

public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

Когда Subscriber впервые подключается к Publisher, он получает Subscription в методе Subscriber#onSubscribe. Подписка Subscription – пожалуй, важнейшая часть всей спецификации; именно она обеспечивает обратный поток. Подписчик Subscriber использует метод Subscription#request для запроса дополнительных данных или метод Subscription#cancel, чтобы остановить обработку.

Пример 5. Реактивные потоки: интерфейс Subscription<T>.

package org.reactivestreams;

public interface Subscription {

    public void request(long n);

    public void cancel();
}

В спецификации реактивных потоков предоставляется еще один полезный, хотя и очевидный тип: Processor<A,B> это просто интерфейс, наследующий как Subscriber<A>, так и Publisher<B>.

Пример 6. Реактивные потоки: интерфейс Processor<T>.

package org.reactivestreams;

public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> {
}

Спецификация не позиционируется в качестве предписания для реализаций, на самом деле, ее цель – определять типы для поддержки интероперабельности. Очевидная польза типов, связанных с реактивными потоками – в том, что для них все-таки нашлось место в релизе Java 9, причем, семантически они «один к одному» соответствуют интерфейсам из класса java.util.concurrent.Flow, напр.: java.util.concurrent.Flow.Publisher.

Знакомьтесь: Reactor

Одних только типов реактивных потоков недостаточно; нужны реализации более высокого порядка для поддержки таких операций как фильтрация и преобразование. В таком качестве удобен проект Reactor; он надстраивается над спецификацией Reactive Streams и предоставляет две специализации Publisher<T>.

Первая, Flux<T> — это Publisher, производящий ноль или более значений. Вторая, Mono<T> — это Publisher<T>, производящий ноль или одно значение. Оба они публикуют значения и могут обращаться с ними соответствующим образом, однако, их возможности гораздо шире спецификации Reactive Streams. Оба они предоставляют операторы, позволяют обрабатывать потоки значений. Типы Reactor хорошо компонуются – вывод одного из них может служить вводом для другого, а если типу требуется работать с другими потоками данных, они опираются при этом на экземпляры Publisher<T>.

Как Mono<T>, так и Flux<T> реализуют Publisher<T>; рекомендуем делать так, чтобы ваши методы принимали экземпляры Publisher<T>, но возвращали Flux<T> или Mono<T>; это поможет клиенту различать, какие именно данные он получает.

Допустим, вам выдали Publisher<T> и попросили отобразить пользовательский интерфейс для этого Publisher<T>. Следует ли в таком случае выводить страницу с детализацией для одной записи, так как вы можете получить CompletableFuture<T>? Либо вывести обзорную страницу со списком или сеткой, где постранично отображаются все записи? Сложно сказать.

В свою очередь, Flux<T> и Mono<T> очень специфичны. Вы знаете, что нужно вывести обзорную страницу, если полученFlux<T>, и страницу с детализацией для одной (или ни одной) записи, когда получаете Mono<T>.

Reactor – это опенсорсный проект, запущенный компанией Pivotal; сейчас он стал очень популярен. Facebook использует его в своем реактивном механизме для вызова удаленных процедур, также применяется в Rsocket, под руководством создателя RxJava Бена Кристенсена. Salesforce использует его в своей реактивной реализации gRPC. Reactor реализует типы Reactive Streams, поэтому может взаимодействовать с другими технологиями, поддерживающими эти типы, например, с RxJava 2 от Netflix, Akka Streams от Lightbend и с проектом Vert.x от Eclipse Foundation. Дэвид Кэрнок, руководитель RxJava 2, также активно сотрудничал с Pivotal по разработке Reactor, благодаря чему проект стал еще лучше. Плюс, естественно, он в той или иной форме присутствует в Spring Framework, начиная с Spring Framework 4.0.

Реактивное программирование с Spring WebFlux

При всей своей полезности Reactor – это просто базис. Наши приложения должны коммуницировать с источниками данных. Должны поддерживать аутентификацию и авторизацию. Spring все это предоставляет. Если Reactor дает нам недостающую метафору, то Spring помогает всем нам разговаривать на общем языке.

Spring Framework 5.0 вышел в сентябре 2017. Он строится на Reactor и спецификации Reactive Streams. В нем есть новая реактивная среда исполнения и модель компонентов под названием Spring WebFlux.

Spring WebFlux не зависит от Servlet API и не требует их для работы. Он поставляется с адаптерами, позволяющими использовать его поверх движка Servlet, если потребуется, но это не обязательно. Также в нем предоставляется совершенно новая среда исполнения на основе Netty, называется Spring WebFlux. Spring Framework 5, работающий с Java 8 и Java EE 7 и выше, теперь служит основой для большей части экосистемы Spring, в том числе, для Spring Data Kay, Spring Security 5, Spring Boot 2 и Spring Cloud Finchley.

Комментарии (2)


  1. ssh24
    14.01.2019 11:05

    >Spring Developer Advocate
    >Java-чемпион
    Ннда. спринг-адвокаты, хтмл-евангелисты, бизнес-ангелы, джава-чемпионы, криптоэнтузиасты…
    Похоже, что быть «простым» профессионалом нынче не модно.


    1. a-tk
      14.01.2019 14:46

      Во-во, что они натворили такого, что им адвокат понадобился?