Читая многочисленные статьи по теме реактивных потоков, читатель может прийти к выводу, что:

  • backpressure это круто
  • backpressure доступно только в библиотеках, реализующих спецификацию reactive streams
  • эта спецификация настолько сложна, что не стоит и пытаться ее реализовать самому

В этой статье я попытаюсь показать, что:

  • backpressure — это очень просто
  • для реализации асинхронного backpressure достаточно сделать асинхронный вариант семафора
  • при наличии реализации асинхронного семафора, интерфейс org.reactivestreams.Publisher реализуется в несколько десятков строк кода

Backpressure (обратное давление) — это обратная связь, регулирующая скорость работы производителя данных для согласования со скоростью работы потребителя. В отсутствие такой связи более быстрый производитель может переполнить буфер потребителя либо, если буфер безразмерный, исчерпать всю оперативную память.

В многопоточном программировании эта проблема была решена Дейкстрой, предложившим новый механизм синхронизации — семафор. Семафор можно трактовать как счетчик разрешений. Предполагается, что перед совершением ресурсоемкого действия производитель запрашивает разрешение у семафора. Если семафор пуст, то поток производителя блокируется.

Асинхронные программы не могут блокировать потоки, поэтому не могут обращаться за разрешением к пустому семафору (но все остальные операции с семафором делать могут). Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.

Наиболее элегантный способ организовать приостановку и возобновление работы асинхронной программы — это структурировать ее как dataflow актор с портами:



A dataflow model – actors with ports, the directed connections between their ports, and initial tokens. Взято из: A Structured Description Of Dataflow Actors And Its Application

Порты бывают входными и выходными. Во входные порты поступают токены (сообщения и сигналы) от выходных портов других акторов. Если входной порт содержит токены, а выходной имеет место для размещения токенов, то он считается активным. Если все порты актора активны, он отправляется на исполнение. Таким образом, при возобновлении своей работы программа актора может безопасно считывать токены из входных портов и записывать в выходные. В этом нехитром механизме таится вся мудрость асинхронного программирования. Выделение портов как отдельных подобъектов акторв резко упрощает кодирование асинхронных программ и позволяет увеличить их разнообразие комбинированием портов разных типов.

Классический актор Хьюитта содержит 2 порта — один видимый, с буфером для входящих сообщений, другой скрытый бинарный, блокирующийся, когда актор отправляется на исполнение и, таким образом, препятствующий повторному запуску актора до окончания первоначального запуска. Искомый асинхронный семафор — нечто среднее между этими двумя портами. Как и буфер для сообщений, он может хранить много токенов, и как у скрытого порта, эти токены — черные, то есть неразличимые, как в сетях Петри, и для их хранения достаточно счетчика токенов.

На первом уровне иерархии у нас определен класс AbstractActor c тремя вложенными классами — базовый Port и производные AsyncSemaPort и InPort, a также с механизмом запуска актора на исполнение при отсутствии заблокированных портов. Вкратце это выглядит так:

public abstract class AbstractActor {
    /** счетчик заблокированных портов */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /** головной метод */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}

В него вложен минимальный набор классов-портов:

Port — базовый класс всех портов

    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }

Асинхронный семафор:

    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                // поток исполнения не блокируется
                // но актор не зайдет на следующий раунд исполнения,
                // пока счетчик разрешений не станет опять положительным
                block();
            }
        }
    }

InPort — минимальный буфер для одного входящего сообщения:

    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }

Полную версию класса AbstractActor можно посмотреть здесь.

На следующем уровне иерархии мы имеем три абстрактных актора с определенными портами, но с неопределенной процедурой обработки:

  • класс AbstractProducer — это актор с одним портом типа асинхронный семафор (и внутренним контрольным портом, присутствует у всех акторов по умолчанию).
  • класс AbstractTransformer — обычный актор Хьюита, со ссылкой на входной порт следующего актора в цепочке, куда он отправляет преобразованные токены.
  • класс AbstractConsumer — также обычный актор, но преобразованные токены он никуда не отправляет, при этом он имеет ссылку на семафор производителя, и открывает этот семафор после поглощения входного токена. Таким образом, количество находящихся в обработке токенов поддерживается постоянным, и никакого переполнения буферов не происходит.

На последнем уровне, уже в директории test, определены конкретные акторы, используемые в тестах:

  • класс ProducerActor генерирует конечный поток целых чисел.
  • класс TransformerActor принимает очередное число из потока и отправляет его дальше по цепочке.
  • класс ConsumerActor — принимает и печатает полученные числа

Теперь мы можем построить цепочку асинхронных, параллельно работающих обработчиков следующим образом: производитель — любое количество трансформеров — потребитель



Тем самым мы реализовали backpressure, и даже в более общем виде, чем в спецификации reactive streams — обратная связь может охватывать произвольное число каскадов обработки, а не только соседние, как в спецификации.

Чтобы реализовать спецификацию, надо определить выходной порт, чувствительный к количеству переданных ему с помощью метода request() разрешений — это будет Publisher, и дополнить уже существующий InPort вызовом этого метода — это будет Subscriber. То есть, мы принимаем, что интерфейсы Publisher и Subscriber описывают поведение портов, а не акторов. Но судя по тому, что в списке интерфейсов присутствует также Processor, который никак не может быть интерфейсом порта, авторы спецификации считают свои интерфейсы интерфейсами акторов. Ну что же, мы можем сделать акторы, реализующие все эти интерфейсы, с помощью делегирования исполнения интерфейсных функций соответствующим портам.

Для простоты пусть наш Publisher не имеет собственного буфера и будет писать сразу в буфер Subscriber'а. Для этого нужно, чтобы какой-либо Subscriber подписался и выполнил request(), то есть, у нас есть 2 условия и, соответственно, нам нужно 2 порта — InPort<Subscriber> и AsyncSemaPort. Ни один из них не подходит в качестве базового для реализации Publisher'а, так как содержит ненужные методы, поэтому мы сделаем эти порты внутренними переменными:

public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}

На этот раз мы определили класс ReactiveOutPort не как вложенный, поэтому ему понадобился параметр конструктора — ссылка на объемлющий актор, чтобы создать экземпляры портов, определенных как вложенные классы.

Метод subscribe(Subscriber subscriber) сводится к сохранению подписчика и вызову subscriber.onSubscribe():

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }

что обычно приводит к вызову Publisher.request(), который сводится к поднятию семафора с помощью вызова AsyncSemaPort.release():

    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }

И теперь нам осталось не забыть опускать семафор с помощью вызова AsyncSemaPort.aquire() в момент использования ресурса:

    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }

Проект AsyncSemaphore был специально разработан для этой статьи. Он намеренно сделан максимально компактным, чтобы не утомлять читателя. Как результат, он содержит существенные ограничения:

  • Одновременно к Publisher'у может быть подписано не более одного Subscriber
  • размер входного буфера Subscriber'а равен 1

Кроме того, AsyncSemaPort не является полным аналогом синхронного семафора — только один клиент может выполнять операцию aquire() у AsyncSemaPort (имеется в виду объемлющий актор). Но это не является недостатком — AsyncSemaPort хорошо выполняет свою роль. В принципе, можно сделать и по другому — взять java.util.concurrent.Semaphore и дополнить его асинхронным интерфейсом подписки (см AsyncSemaphore.java из проекта DF4J). Такой семафор может связывать акторы и потоки исполнения в любом порядке.

Вообще, каждый вид синхронного (блокирующего) взаимодействия имеет свой асинхронный (неблокирующий) аналог. Так, в том же проекте DF4J имеется реализация BlockingQueue, дополненная асинхронным интерфейсом. Это открывает возможность поэтапного преобразования многопоточной программы в асинхронную, по частям заменяя потоки на акторы.