На одном из проектов встретился Spring Reactor. Хорошая технология асинхронных потоков. Много копий сломано по поводу ее использования. Но сейчас не об этом. А о том, как я изобрел велосипед.

Или такой:


нет, реактивный.

Начнем по порядку:

поставлена задача искать адреса по индексу эластика, сказано — сделано. Используя реактивный драйвер эластика, делаем запрос и получаем результат, если надо, то делаем преобразование, берем нужное количество. Все хорошо. Но тут приходит заказчик и говорит: — слушай, надо сделать возможность, когда мы запрашиваем адрес до какого‑то уровня детализации ( город, улица, дом), и если ответов меньше чем мы запросили, можно ли добавить в результат еще результат поиска следующего уровня детализации?

Какие проблемы! Во флаксе есть метод concat() который и служит для объединения двух потоков. Но возникла проблема, метод concat() использует последовательную подписку на потоки. И он всегда запускает второй поток, так как он просто объединяет потоки, а ограничить результаты можно отдельным методом take(). Но как сделать запуск второго потока не обязательным? Да еще сделать второй поток зависимым от первого результата?

Пробовали разные комбинации, но никак не выходил каменный цветок. Тогда решили изобрести велосипед: сделать свой класс конкатенации.

Требуемое поведение нового класса:

  • должен вести учет количества данных;

  • должен фиксировать первый излученный объект из первоначального потока;

  • должен сам излучать полученные данные асинхронно;

  • и при получении сигнала об окончании первого потока, должен запустить второй поток, и подписаться на него, так же излучая полученные данные.

    Назвали его BridgeFlux, название как название..

вот его свойства

int capacity;   -- количество объектов, которые могут быть излучены
BlockingQueue<AddressHintResponseDto> queue; -- буффер из которого будем излучать
AtomicBoolean done;   -- флаг окончания генерации
AtomicBoolean doneParent; -- флаг работы родительского потока.
AtomicInteger counter; -- счетчик излученных объектов
AtomicReference<AddressHintResponseDto> atomicReference; -- хранитель первого излученного объекта

Коструктор будет выглядеть так:

public BridgeFlux (int capacity) {
this.capacity = capacity;
queue = new ArrayBlockingQueue<>(capacity);
done = new AtomicBoolean(true);
doneParent = new AtomicBoolean(true);
atomicReference = new AtomicReference<>(null);
counter = new AtomicInteger(0);
}

Вот так создается:

new BridgeFlux(requestDto.getMaxResultSize())

Мы только инициализировали буфер и установили флаги в стартовое состояние.

Так как было удобнее сделать подписание и обработку родительского потока снаружи, то в данном классе сделали только сохранение в буфер данных. Вообще говоря, можно было сделать более универсально, но получилось так, как получилось. Вот так данные из потока складываются в буфер:

public void add(AddressHintResponseDto address) {
    try {
        queue.put(address);
        counter.incrementAndGet();
        atomicReference.compareAndSet(null,address);
    } catch (InterruptedException e) {
        throw new RuntimeException(e.getMessage());
    }
}

Вот так производится подписка на поток во внешнем классе, что бы получать данные:

getFlux(query, requestDto, lang).distinct()
        .take(requestDto.getMaxResultSize())
        .subscribe(
                bridgeFlux::add, // onNext
                err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
                () -> complete(bridgeFlux, requestDto, lang)// onComplete
        );

Все хорошо, теперь мы успешно складываем данные в буфер. Далее, полученные данные нужно сразу же излучать!

public Flux<AddressHintResponseDto> createFlux() {
    return Flux.<AddressHintResponseDto>create(sink ->
                            sink.onRequest(n -> {
                                while (!sink.isCancelled() && done.get()) {
                                    try {
                                        var address = queue.poll(10, TimeUnit.MILLISECONDS);
                                        if (!doneParent.get() && queue.isEmpty()) {
                                            done.compareAndSet(true,false);
                                        }
                                        if (address != null) {
                                            sink.next(address);
                                        }
                                    } catch (InterruptedException e) {
                                        sink.error(e);
                                    }
                                }
                                if (!doneParent.get() && !sink.isCancelled()) {
                                    sink.complete();
                                }
                            }),
                    FluxSink.OverflowStrategy.IGNORE)
            .subscribeOn(Schedulers.boundedElastic());
}

здесь бесконечный цикл, постоянно крутится, и если есть данные в буфере то забирает из буфера и отправляет в поток, останавливается по выставленным флагам. Сам же он излучает вот так:

bridgeFlux.createFlux().cache();

На данном этапе мы обеспечили непрерывную передачу из обрабатываемых потоков в результирующий поток.

Но по условиям необходимо еще выдать сколько прошло через буфер объектов и выдать первый излученный объект.

/**
 * выдать количество данных прошедших через буфер
 */
public int getCount() {
    return counter.get();
}

/**
 * выдать первый пришедший в буффер элемент
 */
public AddressHintResponseDto getSavedAddressHintResponseDto() {
    return atomicReference.get();
}

Кроме того, нужно обработать ситуацию когда внешний поток окончил излучение и нужно запустить следующий поток:

/**
 * при получении команды окончания верхнего потока, запустить принятый поток и подписаться на него,
 * получая данные из него в прописывая их в буффер
 * при окончании потока выставить флаг окончания излучения
 */
public void stop(Flux<AddressHintResponseDto> addFlux) {
    addFlux.subscribe(
            this::add, // onNext
            err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
            () -> doneParent.compareAndSet(true,false) // onComplete
    );
}

В итоге появилась возможность получить список улиц по маске запроса, и если количество ответов меньше чем запросили, то сделать дополнительный запрос получения домов на первой полученной улице. А при реактивном клиенте все выглядит так, сначала приходят улицы, и отрисовываются, а чуть позже приходят дома, находящиеся на первой улице. При этом нам не всегда нужно запускать второй запрос, что сильно экономит ресурсы.

Вот весь код класса:

import com.address_hints.client.dto.response.AddressHintResponseDto;
import com.address_hints.fias.parser.LanguageParser;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;


/**
 * класс мост потоков
 * обрабатывает полученные данные из буфера
 * и генерирует поток данных из этого буфера.
 * так же запускает и обрабатывает дополнительный поток
 */
@Slf4j
public class BridgeFlux {

    int capacity;
    BlockingQueue<AddressHintResponseDto> queue;

    /**
     * флаг окончания генерации
     *
     * комбинация этих флагов нужна для тестирования, так как тестирование идет в один трэд
     * и потоки сразу финализируются
     */
    AtomicBoolean done;
    /**
     * флаг окончания работы порождаемого потока
     */
    AtomicBoolean doneParent;
    AtomicInteger counter;
    AtomicReference<AddressHintResponseDto> atomicReference;

    public BridgeFlux (int capacity) {
        this.capacity = capacity;
        queue = new ArrayBlockingQueue<>(capacity);
        done = new AtomicBoolean(true);
        doneParent = new AtomicBoolean(true);
        atomicReference = new AtomicReference<>(null);
        counter = new AtomicInteger(0);
    }

    /**
     * при получении команды окончания верхнего потока, запустить принятый поток и подписаться на него,
     * получая данные из него в прописывая их в буффер
     * при окончании потока выставить флаг окончания излучения
     */
    public void stop(Flux<AddressHintResponseDto> addFlux) {
        addFlux.subscribe(
                this::add, // onNext
                err -> log.error("onError: Exception occurred: " + err.getMessage(), err),  // onError
                () -> doneParent.compareAndSet(true,false) // onComplete
        );
    }

    /**
     * выдать количество данных прошедших через буфер
     */
    public int getCount() {
        return counter.get();
    }

    /**
     * выдать первый пришедший в буффер элемент
     */
    public AddressHintResponseDto getSavedAddressHintResponseDto() {
        return atomicReference.get();
    }

    /**
     * складывает в буфер, если не получилось, то падает с ошибкой
     */
    @SuppressWarnings({"java:S2142","java:S112"})
    public void add(AddressHintResponseDto address) {
        try {
            queue.put(address);
            counter.incrementAndGet();
            atomicReference.compareAndSet(null,address);
        } catch (InterruptedException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * излучатель данных, излучает при налии данных в буфере, если есть в буфере вынимает из него и отправляет в поток
     * останавливается либо по прерыванию либо по выставленному флагу остановиться.
     */
    @SuppressWarnings({"java:S2142","java:S3776"})
    public Flux<AddressHintResponseDto> createFlux() {
        return Flux.<AddressHintResponseDto>create(sink ->
                                sink.onRequest(n -> {
                                    while (!sink.isCancelled() && done.get()) {
                                        try {
                                            var address = queue.poll(10, TimeUnit.MILLISECONDS);
                                            if (!doneParent.get() && queue.isEmpty()) {
                                                done.compareAndSet(true,false);
                                            }
                                            if (address != null) {
                                                sink.next(address);
                                            }
                                        } catch (InterruptedException e) {
                                            sink.error(e);
                                        }
                                    }
                                    if (!doneParent.get() && !sink.isCancelled()) {
                                        sink.complete();
                                    }
                                }),
                        FluxSink.OverflowStrategy.IGNORE)
                .subscribeOn(Schedulers.boundedElastic());
    }
}

Комментарии (2)


  1. aleksandy
    19.07.2024 07:04
    +2

    Блокирующая очередь как-то не очень сочетается с реактивным стеком.


  1. JerryI
    19.07.2024 07:04

    Если бы введение и терминологию внедрить в начало статьи было бы отлично