Цель/введение

Реактивные паттерны программирования становятся всё более востребованы при реализации высоконагруженных сервисов. Реактивные фреймворки предоставляют инструменты, позволяющие с минимальными затратами на кодирование использовать механизмы асинхронности и многопоточности.

В качестве примера, предлагаю рассмотреть реализацию сервис индексации данных в ElasticSearch. Данные хранятся в MongoDB, ключевые атрибуты которых синхронизируются с ElasticSearch (функционально похоже на Logstash). В проекте используется стек: Java/Spring Boot/Reactor/WebFlux/WebClient/RabbitMQ/MongoDB. На выбор RabbitMQ и MongoDB повлияло, в том числе, наличие реактивных драйверов.

Описание задачи

  1. Сервис должен принимать поток данных из очереди, выбирать связанные данные из базы и передавать их ElasticSearch. Формат данных очереди: действие (index/delete); id документа; имя индекса; тип индекса (опционально).

  2. Через web-интерфейс должен быть реализован функционал добавления, удаления и перестроения индекса.

  3. Должна быть возможность формирования агрегированных полей, содержащих данные из нескольких исходных полей документов, и добавление данных в индекс из связанных коллекций.

  4. Описание индексируемых данных должно быть в формате JSON.

DFD-диаграмма процесса индексации

Схема процесса индексации запросов, поступающих из очереди, выглядит следующим образом:

Алгоритм перестроения индекса выглядит практически также, за исключением того, что в нем отсутствует обработка запросов, отложенных из-за ошибок.

Описание функционала

Описание функционала коснется только работы реактивной части сервиса. Конфигурационные настройки, обработка формата описания индексируемых данных, формирование данных для запросов к ElasticSearch вынесены за рамки данной статьи, но вы можете посмотреть код на GitHub, по ссылке.

Теперь попробуем реализовать эту схему сквозным потоком Reactor, не используя подписки на отдельные элементы, в том числе отправку через WebClient HTTP-запросов и обработку полученных ответов. Отдадим, почти полностью, синхронизацию выполнения Reactor.

Код, запускающий процесс переиндексации выглядит следующим образом:

Task task = new Task(mongoElasticIndex);
ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
        .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic());
Flux<Tuple2<String,Document>> processingData = processingData(dataEventsFlux, (p) -> "index",
        (p) -> (Document)p,
        (p) -> mongoElasticIndex,
        Flux.just(),
        task);

task.setDispose(subscribe(processedData, task));

Получаем поток данных из коллекции, настраиваем параллелизм, формируем объект обработки потока и подписываемся на поток. Здесь класс Task – внутренний класс, назначение которого: собирать статистику и предоставлять информацию о выполняемых задачах индексации.

Метод processingData возвращает поток запросов и ответов, отправленных WebClient’ом:

private <T> Flux<Tuple2<String,Document>>
    processingData(ParallelFlux<T> events,
            Function<T, String> getAction,
                   Function<T, Document> getDocument,
                   Function<T, MongoElasticIndex> getMongoElasticIndex,
                   Flux<String> mergeFlux,
                   Task task) {
    return  events
            // Добавление данных к исходному документу из присоединяемых коллекций
        .transform(joinData(getDocument, getMongoElasticIndex))
            // Генерация данных для передачи в ElasticSearch
        .transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))
        .sequential()
            // Агрегирование данных для _bulk
        .transform(grouping(task))
            // Добавление потока данных, на которые не получен ответ от ElasticSearch
        .mergeWith(mergeFlux)
            // Отправка запросов в ElasticSearch
        .transform(postBulk(task))
        .subscribeOn(Schedulers.single())
        .doOnNext(testAliveResponses(task))
        .doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))
        .doOnComplete(() -> { logger.info("Start: {} End: {} read {} write {}",
                formatDate(task.getStartDate()),
                formatDate(new Date()),
                task.getDocumentsRead(),
                task.getIndexesWrite(), getMaxProcessingRequest());
            fileStorage.writeCollection2Files(waitingForResponse);
            removeTask(task);
        });
}

Методом transform Reactor соединяем отдельные обработчики потоков. Здесь есть одно существенное ограничение: входящий и исходящий потоки должны быть однотипными (Flux или ParallelFlux). Нельзя, например, с помощью transform встроить обработчик у которого вход Flux, а выход ParallelFlux.

В метод subscribe сервиса инкапсулирована подписка на поток. Ниже приведена его реализация:

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
    return  events
        .subscribe(
            p -> {
                if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                    waitingForResponse.remove(p.getT1());
                }
                int count = Optional.ofNullable(p.getT2().get("items", List.class))
                        .map(List::size)
                        .orElse(0);
                task.addIndexesWrite(count);
            },
            e -> {
                if(task != rabbitMQTask)removeTask(task);
                fileStorage.writeCollection2Files(waitingForResponse);
                logger.error("Error: {}", e.getMessage());
            }
        );
}

Далее коротко об отдельных функциях обработки потока.

Загрузка документов

Имеются два варианта загрузки:

  • Для всех документов основной коллекции индекса

ParallelFlux dataEventsFlux = reactorRepositoryMongoDB
        .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic());

Метод findAll возвращает поток для всех документов коллекции. Parallel и runOn настраивают многопоточность для выборки и дальнейшей обработки.

  • Для единичного запроса, приходящему из очереди

ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()
        .parallel(appConfig.getIndexParallelism())
        .runOn(Schedulers.boundedElastic())
        .map(msg -> {
            IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);
            try {
                return CreateIndexItem(indexEvent);
            } catch (IllegalObjectIdException | IOException | ConvertDataException e) {
                logger.error("{} For message: {}", String.join(", ",throwable2ListMessage(e)),
                        new String(msg.getBody(), StandardCharsets.UTF_8));
                return new IndexItem(null, null, null);
            }
        })
        .filter(e -> nonNull(e.getAction()))
        .flatMap(item ->
            Flux.zip("delete".equals(item.getAction())
                        // Для операции удаления создаётся Document, содержащий _id удаляемого документа
                    ? Flux.just(new Document().append("_id", item.getIdDocument().get("_id")))
                        // Для операции обновления индекса Document загружается из базы данных
                    : reactorRepositoryMongoDB.find(
                        item.getMongoElasticIndex().getCollection(),
                        item.getIdDocument(),
                        item.getMongoElasticIndex().getProjection()),
                Flux.just(item)
            )
            .map(d -> new EventDocument(d.getT2().getAction(),
                    d.getT1(),
                    d.getT2().getMongoElasticIndex()))
        );

Метод inboundFlux интерфейса reactiveQueue возвращает поток для очереди. Parallel и runOn идентичны предыдущему варианту. Далее событие преобразуется из JSON в объект IndexEvent, по содержимому которого документ извлекаются из базы, или создаётся объект для удаления документа из ElasticSearch.

Добавление данных к исходному документу из присоединяемых коллекций

private <T> Function<ParallelFlux<T>, ParallelFlux<T>>
    joinData(Function<T, Document> getDocument,
            Function<T, MongoElasticIndex> getMongoElasticIndex) {
    return (ParallelFlux<T> items) ->
            items.flatMap(p -> {
            if(getDocument.apply(p).size() == 1) {
                return Flux.just(p);
            }
            return
                Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))
                        .flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),
                                reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),
                                        it.getCondition(),
                                        it.getCollection().getProjection())))
                        .reduce(p, (acc, t) -> {
                            getDocument.apply(acc).put(t.getT1(), t.getT2());
                            return acc;
                        });
                }
        );
}

Метод joinData возвращает функциональный объект, добавляющий данные к исходному документу из документов присоединяемых коллекций. Использование flatMap и Flux.zip позволяет асинхронно запускать и обрабатывать потоки, в том числе и потоки, создаваемые запросами к базе данных mongodb. Все вопросы, связанные с синхронизацией, берет на себя Reactor.

Генерация JSON для ElasticSearch

private <T> Function<ParallelFlux<T>, ParallelFlux<String>>
    document2ElasticJson(
            Function<T, String> getAction,
            Function<T, Document> getDocument,
            Function<T, MongoElasticIndex> getMongoElasticIndex) {
    return (ParallelFlux<T> items) -> items.map(item -> {
        String elasticSend;
        try {
            Document document = getDocument.apply(item);
            MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);
            elasticSend = "delete".equals(getAction.apply(item))
                    ? mongoElasticIndex.deleteBuild(document)
                    : mongoElasticIndex.indexBuild(document);
        } catch (ConvertDataException e) {
            throw new RuntimeException(e);
        } catch (JsonProcessingException e) {
            throw new UncheckedIOException(e);
        }
        return elasticSend;
    });
}

Из полученного документа формируется JSON-объект модификации индекса в ElasticSearch. Контролируемые исключения приходится конвертировать в неконтролируемые.

Агрегирование данных для _bulk-запроса

Function<Flux<String>, Flux<String>> grouping(Task task) {
    return (Flux<String> source) -> source
            .bufferTimeout(appConfig.getMaxSizeBuffer(),
                    Duration.ofMillis(appConfig.getMaxDurationBuffer()))
            .doOnNext(p -> task.addDocumentsRead(p.size()))
            .map(p -> String.join("\n", p)
            );
}

Использование _bulk-запроса к ElasticSearch позволяет существенно снизить трафик и повысить производительность индексации. Объединение отправляемых данных несложно сделать при помощи bufferTimeout. Значениями максимального размера буфера и времени ожидания можно найти компромисс между оперативностью обновления данных в ElasticSearch, размером запроса и производительностью.

Отправка запросов ElasticSearch

public Function<Flux<String>, Flux<Tuple2<String, Document>>> postBulk(Task task) {
    return (Flux<String> source) -> source
        .flatMap(buffer -> {
            if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                waitingForResponse.add(buffer);
            }
            return Flux.zip(Flux.just(buffer),
                webClientElastic.post()
                    .uri("/_bulk")
                    .body(BodyInserters.fromValue(buffer))
                    .retrieve()
                    .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
                            response -> Mono.error(new HttpServiceException("System is overloaded",
                                    response.rawStatusCode())))
                    .onStatus(httpStatus -> httpStatus.is4xxClientError() && !httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
                            response -> Mono.error(new RuntimeException("API not found")))
                    .onStatus(HttpStatus::is5xxServerError,
                            response -> Mono.error(new HttpServiceException("Server is not responding",
                                    response.rawStatusCode())))
                    .bodyToFlux(Document.class)
                    .retryWhen(Retry.backoff(appConfig.getWebClientRetryMaxAttempts(),
                                Duration.ofSeconds(appConfig.getWebClientRetryMinBackoff()))
                        .filter(throwable -> throwable instanceof HttpServiceException)
                        .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
                            throw new HttpServiceException("External Service failed to process after max retries",
                                    HttpStatus.SERVICE_UNAVAILABLE.value());
                        }))
            );
        });

}

Создаётся поток, отправляющий через WebClient запросы к ElasticSearch. Поток, формируемый методом post WebClient’а, Flux.zip объединяет с запросом, это позволяет при обработке ответа связать полученный ответ с отправленным запросом. С помощью retryWhen, Retry.backoff настроена обработка некоторых ошибок.

Обработка ответов ElasticSearch

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {
    return  events
        .subscribe(
            p -> {
                if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация
                    waitingForResponse.remove(p.getT1());
                }
                int count = Optional.ofNullable(p.getT2().get("items", List.class))
                        .map(List::size)
                        .orElse(0);
                task.addIndexesWrite(count);
            },
            e -> {
                if(task != rabbitMQTask)removeTask(task);
                fileStorage.writeCollection2Files(waitingForResponse);
                logger.error("Error: {}", e.getMessage());
            }
        );
}

Обработка ответов ElasticSearch минимальна. Если ответ получен на контролируемый запрос (не запрос на переиндексацию), то запрос удаляется из множества запросов, для которых контролируется получение ответа. Ответы на переиндексацию не контролируются. В полученном ответе атрибут items должен быть списком, содержащим информацию об обработанных документах. На количество элементов в списке увеличивается счетчик обработанных документов.

Настройка WebClient

Основная часть настройки делается в конфигурационном классе, бин возвращает объект WebClient.Builder:

@Bean
@Qualifier("elastic")
public WebClient.Builder webClientWithTimeout() {
    final TcpClient tcpClient = TcpClient
            .create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
            .doOnConnected(connection -> {
                connection.addHandlerLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
                connection.addHandlerLast(new WriteTimeoutHandler(timeout, TimeUnit.MILLISECONDS));
            });

    return WebClient.builder()
            .baseUrl(baseUrl +":" + port.toString())
            .filter(basicAuthentication(user, password))
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
}

В конструкторе сервиса добавляются фильтры, вызываемые при отправке запроса и получении ответа:

this.webClientElastic = webClientElastic
        .filter(onRequest())
        .filter(onResponse())
        .build();

Методы, возвращающие фильтры:

private ExchangeFilterFunction onRequest() {
    return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
        addSendRequest();
        int sleepCycleCount = 0;
        while (getProcessingRequest() > getMaxProcessingRequest()) {
            try {
                logger.info("Sleep: {} ProcessingRequest reached {} (MaxProcessingRequest {})", getSleepOverRequest(),
                        getProcessingRequest() - 1, getMaxProcessingRequest());
                sleep(getSleepOverRequest());
                if (sleepCycleCount++ > appConfig.getSleepCycleCountMax()) {
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
        return Mono.just(clientRequest);
    });
}

private ExchangeFilterFunction onResponse() {
    return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
        addReceiveResponse();
        logger.info("Response Status {}", clientResponse.statusCode());
        return Mono.just(clientResponse);
    });
}

Фильтры выводят информацию об отправке запросов, получении ответов и модифицируют счетчики отправленных запросов и полученных ответов. Перед отправкой запроса, если превышено количество не полученных ответов, процесс “засыпает” на некоторое время.

Настройка среды выполнения

Для того чтобы запустить этот сервис нам нужны: rabbitmq, mongodb и elasticsearch. Всё это проще установить в Docker. Ещё в самом начале проекта установил Docker Desktop и настроил контейнеры для запуска нужных cервисов. Как это делается можно посмотреть, например, в этой статье. По аналогии установил rabbitmq, mongodb. Добавил конфигурационные файлы и внес изменения в файл docker-compose.yml. Получившиеся настройки Docker можно найти в папке проекта docker-elk. Ниже скриншот запущенного контейнера:

Запуск сервиса

Для тестирования загрузил в базу mongodb 1000 документов. Из Postman и отправляю запрос:

В полученном логе видно, что обработка выполняется в разных потоках:

2022-11-02 15:23:17.396  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.418  INFO 8336 --- [      Thread-42] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.447  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.462  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.475  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.477  INFO 8336 --- [      Thread-33] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:855, serverValue:83}] to localhost:27017
2022-11-02 15:23:17.484  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.485  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.489  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.493  INFO 8336 --- [       Thread-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:856, serverValue:84}] to localhost:27017
2022-11-02 15:23:17.566  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.588  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.622  INFO 8336 --- [       Thread-4] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.673  INFO 8336 --- [      Thread-52] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:940, serverValue:85}] to localhost:27017
2022-11-02 15:23:17.676  INFO 8336 --- [       Thread-7] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:941, serverValue:86}] to localhost:27017
2022-11-02 15:23:17.715  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.738  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.748  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.772  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.790  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.811  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.818  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.849  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:993, serverValue:87}] to localhost:27017
2022-11-02 15:23:17.851  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:994, serverValue:88}] to localhost:27017
2022-11-02 15:23:17.895  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.898  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:17.911  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.926  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.944  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.966  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:17.993  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1078, serverValue:90}] to localhost:27017
2022-11-02 15:23:18.002  INFO 8336 --- [      Thread-48] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1079, serverValue:89}] to localhost:27017
2022-11-02 15:23:18.041  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.059  INFO 8336 --- [      Thread-31] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.076  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.083  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.096  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.135  INFO 8336 --- [      Thread-34] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1156, serverValue:92}] to localhost:27017
2022-11-02 15:23:18.138  INFO 8336 --- [      Thread-39] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1155, serverValue:91}] to localhost:27017
2022-11-02 15:23:18.140  INFO 8336 --- [      Thread-39] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk
2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK
2022-11-02 15:23:18.181  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Start: 2022-11-02 15:23:17.250 End: 2022-11-02 15:23:18.181 read 1000 write 1000

Теперь проверим, что загрузилось.

Запрос к ElasticSearch показывает наличие индекса с 1000 документами:

И попробуем найти что-то в ElasticSearch:

Получен ответ ElasticSearch с найденным документом!

Заключение

В этом материале мне хотелось привести пример сервиса, реализованного с использованием Spring Boot, WebFlux, WebClient, Reactor - надеюсь, что у меня это получилось.

Несколько выводов:

  1. Реактивные фреймворки, в том числе и Reactor, делает за нас существенную часть работы по реализации асинхронных многопоточных алгоритмов, позволяя сосредоточиться на предметной области.

  2. С их помощью можно, достаточно просто, создавать высоконагруженные сервисы.

  3. Для получения максимального эффекта от перехода на реактивные паттерны программирование нужно чтобы вся цепочка вычислений была реактивной, начиная с драйверов доступа к базам данных, очередям, файлам и т.д.

Ещё раз, репозиторий с кодом и настройками находится здесь

Несколько ссылок на используемые материалы:

Шпаргалка по Spring Boot WebClient

Reactive Programming: Reactor и Spring WebFlux — часть 2

Реактивное программирование со Spring, часть 2 Project Reactor

Комментарии (26)


  1. BugM
    14.11.2022 02:14
    +5

    А чем это лучше обычных потоков и обычных пулов? Почему не просто пулы в сумме потоков на 500 максимум, а вероятно меньше?

    Чем хуже сразу видно: Нечитаемые портянки бойлерплейта, отсутствие стектрейсов и смесь бизнес логики со всякими техническими штуками.


    1. imanushin
      14.11.2022 02:46

      А чем это лучше обычных потоков и обычных пулов?
      Чем хуже сразу видно

      Это типичный вопрос про плюсы/минусы неблокирующего кода и пр.

      При IO вызовах процесс будет ждать ответа, а значит операционная система переключит контекст выполнения. Эта операция долгая по времени, так что, если есть большой объем IO, то производительность упадет.

      К сожалению, у меня нет точных цифр, но, судя по этому ответу, один context switch может стоить 5-7 микросекунд (а их надо два - чтобы остановить текущий поток, и чтобы вернуться к нему): "I can't find results for nehalem (is there lmbench in phoronix suite?), but for core2 and modern Linux context switch may cost 5-7 microseconds."

      Чтобы подобного не происходило, можно использовать неблокирующий код, и способов довольно много:

      1. В некоторых языках есть встроенная поддержка (async/await, корутины, горутины и так далее).

      2. В Java неблокирующие вызовы прописываются явно (то есть, в итоге, системе оставляют callback для вызова, когда все данные уже готовы).

      Плюсы неблокирующих вызовов:

      1. Меньше context switching (по сути, мы заменяем вытесняющую многозадачность кооперативной).

      2. Меньше потоков (суммарно).

      3. В некоторых случаях - легче синхронизация (особенно, если приложение использует вообще один поток для определенной операции - тогда можно быть уверенным, что никто другой не выполняет ничего параллельного, а потому можно использовать обычные HashMap для кешей и пр.).

      Минусы:

      1. Если не переиспользовать callback'и, то создается большее число объектов на каждый вызов.

      2. Меньше возможностей для оптимизации, так как JVM теперь сложнее аллоцировать объекты на стек и тд.

      3. Код сложнее читать (это неправда для kotlin/scala/C#/go и пр., где поддержка асинхронности добавлена в язык) из-за большого числа callback'ов и пр.

      4. Очень легко допустить ошибки, которые будут увеличивать объем стека - см. пример из Spring


      1. BugM
        14.11.2022 03:00
        +1

        К сожалению, у меня нет точных цифр, но, судя по этому ответу, один context switch может стоить 5-7 микросекунд (а их надо два - чтобы остановить текущий поток, и чтобы вернуться к нему): "I can't find results for nehalem (is there lmbench in phoronix suite?), but for core2 and modern Linux context switch may cost 5-7 microseconds."

        А это точно проблема? Попробуйте не выносить в отдельный поток то что выполняется быстрее 10 миллисекунд. Не риалтайм у вас там, нет смысла совсем в мелочи упираться. 0.1% производительности это вроде не очень большая жертва?

        Меньше context switching (по сути, мы заменяем вытесняющую многозадачность кооперативной).

        А это проблема? Сколько процентов производительности вы бы на этом потеряли?

        Большая часть продакшен кода это не про скорость, а про надежность. Допустим рекорды. Плюс нагрузка на gc плюс память, но гарантия что в объекте лежит то что ты в него положил. При правильном их использовании. Чистый обмен производительности на надежность. И это прекрасно.

        Меньше потоков (суммарно).

        А это проблема? Сколько памяти вы бы на этом потеряли?

        В некоторых случаях - легче синхронизация (особенно, если приложение использует вообще один поток для определенной операции - тогда можно быть уверенным, что никто другой не выполняет ничего параллельного, а потому можно использовать обычные HashMap для кешей и пр.).

        Точно нет. Обычные мапы в многопоточном окружении вас всегда накажут. В самый неподходящий момент. Вы потом неделю+ будете ловить эту ошибку и все закончится всё равно переписыванием на канкаррент типы. Канкаррент версия мапы это проблема? Насколько она у вас хуже будет (по любому из параметров на ваш выбор)?

        Для примера у меня в проде есть канкаррент мапа с мнопоточной записью. Она принимает 5000+ обращений на запись в секунду. На флеймграфе приложения ее вообще не заметно. Вроде не страшно?

        Минусы:

        Минусы понятные. Ищем плюсы их перевешивающие.

        Мы именно про Джаву. В других языках другие проблемы. Разные языки для разного.


    1. splix
      14.11.2022 06:45
      +2

      На самом деле это как давний вопрос "чем SQL лучше чем FoxPro". Т.е. декларативный vs императивный код.

      Reactor это декларативный подход. Пулы это императивный подход. Для разных задач разное подходит. Декларативный же (Reactor) позволяет размышлять в терминах какой результат нужен, для систем с множеством ветвлений/зависимостей результата это помогает. С другой стороны как только вы видите что-то что вы знаете как оптимизировать как процесс выполнения - тут у вас все руки связаны.

      Ну и еще в императивном стиле писать просто - любой школьник может. Декларативный же надо долго учить, и все равно есть сотня способов отстрелить себе ногу. Я был фанатом декларативного / rector, но уже устал от простреленных ног.


      1. BugM
        14.11.2022 11:38

        SQL прекрасен. И писать на нем легко и просто. Может не в этом дело?


        1. splix
          14.11.2022 17:48

          Reactor тоже прекрасен. И писать на нем легко и просто. Не задумываясь о потоках, локах и пр., пишешь за час решение задачи которое в императивном стиле писал бы неделю.


    1. ValentinMorozov Автор
      14.11.2022 08:53

      Под капотом всё равно пул потоков, только работу по управлению ими берёт на себя Reactor.


      1. BugM
        14.11.2022 11:39

        И?


        1. ValentinMorozov Автор
          14.11.2022 12:03

          Управление пулом, запуск на выполнение, ожидание завершения - этого всего нет в коде, всё делает reactor. Я думаю, что можно сравнить с управлением транзакциями JDBC vs JPA.


          1. BugM
            14.11.2022 12:15
            +1

            Вы так описали как будто для этого требуется какая-то значимая работа.

            На самом деле:

            Создание эксекутора - 1 строка и 1 настройка в которой не страшно промахнуться. Это я про количество потоков. Их можно сделать в разумное число раз больше чем получилась экспертная оценка сколько там надо.

            Выполнение задачи экзекутором - 1 строчка.

            Все очень чистенько, красиво и любому читающему ваш код понятно что происходит.

            Сравните с тем что у вас.


            1. ValentinMorozov Автор
              14.11.2022 13:25

              Может быть Вы правы. В своё оправдание могу лишь сказать только то, что реактивное программирование я осваивал на Scala, по книге Akka in Action by Rob Williams, Raymond Roestenburg, Robertus Bakker, и на Java - Практика реактивного программирования в Spring 5 Oлег Докука Игорь Лозинский. Да, функциональный стиль предполагает инкапсуляцию кода непосредственно в операции. Потоки в этом примере запускают: 1001 запрос к базе данных; из базы данных выбираются 2000 документов; все выбранные документы обрабатываются, сериализуются; группируются, отправляются серверу ElasticSearch, обрабатываются ответы - каждый из этапов выполняется асинхронно, данные обрабатываются параллельно. В этом примере время выполнения < 1 сек. (можно посмотреть в последней строке лога). Не думаю, что я бы смог это сделать (организовать параллельное выполнение как между этапами конвейера, так и внутри блоков конвейера) в 2-х строках кода с использованием notify(All), wait, start, lock, tryLock, synchronized, CompletableFuture, Timer. Reactor, в том числе, и позиционируется его создателями как фреймворк избавляющий от вложенности callback'ов. Да и Spring анонсировал завершение поддержки RestTemplate и рекомендует переходить на WebClient и WebFlux, основывающихся на Reactor.

              Согласен, реактивное(событийное) программирование не упрощает процесс кодирования, по сравнению с привычным многопоточным, но при правильном использовании позволяет снизить потребность в ресурсах, а фреймворки Reactor, Akka, Akka Stream, ... избавляют нас от рутины управления процессами обработки данных, позволяя сосредоточится на самой обработке.


              1. BugM
                14.11.2022 22:53
                +1

                Зачем вы пишите этот корпоративный/бюрократический буллшит? Я в институте в презентациях и то столько мусора не писал.

                Еще раз. Насколько вы снизили потребность в ресурсах? За счет чего? Переключение контекста и расход памяти на потоки это понятные издержки. Сколько они у вас составили?

                фреймворки Reactor, Akka, Akka Stream избавляют нас от рутины управления процессами обработки данных, позволяя сосредоточится на самой обработке.

                Вы это на полном серьезе пишите? Я уж вижу очень толстый троллинг. Есть мировой консенсус что они затрудняют примерно все. Написание, отдладку, поддержку, стоимость владения кодом, тайм ту маркет и тому подобное. Но есть какие-то кейсы, где этого стоит. Почему у вас такой кейс?

                В этом примере время выполнения < 1 сек.

                Зачем вы оцениваете процессинг по задержке? Её можно сделать любой покрутив конфижки и уменьшив пакет обрабатываемых данных до одной строки или что там у вас.

                Процессинг всегда оценивают по пропускной способности. Сколько гигабайт в секунду вы можете переложить на одном ядре.

                Эти оценки часто противоположны. И это нормально.


                1. ValentinMorozov Автор
                  15.11.2022 07:44
                  +1

                  На Ваши замечания могу сказать, что целью статьи было показать пример решения задачи асинхронного взаимодействия с web-сервисом с использованием Spring Boot/Reactor/WebClient. Оптимизация и оценка производительности - это отдельная интересная тема.

                  Но есть какие-то кейсы, где этого стоит. Почему у вас такой кейс?

                  Типичная задача в системах, использующих ElasticSearch, где есть существенная задержка ответа сервиса.

                  Насколько вы снизили потребность в ресурсах? За счет чего?

                  Event loop WebClient'а снижает количество потоков ожидающих ответ. Здесь этим занимается 1 поток (.subscribeOn(Schedulers.single()). Подготовка данных многопоточная, отправка запросов - асинхронная однопоточная.


  1. harios
    14.11.2022 09:13
    +1

    Пул потоков возможно будет быстрее(нужны замеры) и будет понятнее(это уж точно и великолепно для тех кто будет подобные портянки читать и поддерживать, особенно если писал не он).


    1. splix
      14.11.2022 17:57

      Я делал замеры на высокнагруженной системе где на каждый шаг собираются метрики. Код переписанный почти в лоб с реактора на потоки оказался сильно медленней. Потратив в разы больше времени на программирование, чтобы выжать максимум, удалось добиться той-же производительности, по метрикам разница -5%..+5% (на разные этапах). Жалко потерянного времени.

      К слову, еще была попытка переписать на корутины Kotlin, это оказался полный провал с проседанием в 7 раз.


      1. ris58h
        14.11.2022 19:33
        +1

        Код переписанный почти в лоб с реактора на потоки оказался сильно медленней.

        Хотелось бы взглянуть на этот код.

        UPD: "сильно медленней" это сколько?


        1. splix
          14.11.2022 20:27

          Код закрыть. Но это в любом случае много кода, и это не про ошибки, а про архитектурные особенности подходов.

          Сильно медленней это раза в полтора, но точно уже не помню потому что такой вариант долго не жил в проде.


          1. ris58h
            14.11.2022 21:24

            С Loom не тестировали?


            1. splix
              15.11.2022 07:07
              -2

              Не тестировали, но я подозреваю что была бы та же проблема как и котлиновские корутины. А проблема в том что хочется запустить много корутин. Это же легко и просто. Но в какой-то момент у тебя их становится сотни тысяч, и почти все в очереди на выполнение, что особенно неприятно если результат потребуется лишь от нескольких. А времени у тебя с десяток милисекунд.

              Бесполезная работа (память, gc, пр.) лишь растет по мере того как переводишь на корутины. В Reactor это не проблема потому что pull модель и может обрабатывать условно бесконечный объем не требуя пропорционального увеличения ресурсов.

              Понятно что все это можно оптимизировать, но если начал то мне лично проще делать это на обычной Java Concurrency, со всеми локами, семафорами и пр. Просто потому что предсказуемо работает. Поэтому Loom наверное тоже не вариант.

              PS Довольно большая часть оптимизации (кроме переписывания на lock-free) сводится к тому что бы сделать свой backpressure. Больше положил в очередь - медленно, меньше - машина простаивает. Может лучше больше, но потом выкинуть лишнее. И т.д. Надо искать оптимум, и то как потоки с этим всем работают. Kotlin Flow все таки ограничен и неприменим когда у тебя множество входов. Нигде не получилось его применить. Но может для простого случая он и решает backpressure.


              1. mayorovp
                15.11.2022 08:26

                Не тестировали, но я подозреваю что была бы та же проблема как и котлиновские корутины. А проблема в том что хочется запустить много корутин. Это же легко и просто. Но в какой-то момент у тебя их становится сотни тысяч, и почти все в очереди на выполнение, что особенно неприятно если результат потребуется лишь от нескольких.

                Похоже на упущенное обратное давление (backpressure) либо некорректную отмену...


                Бесполезная работа (память, gc, пр.) лишь растет по мере того как переводишь на корутины. В Reactor это не проблема потому что pull модель и может обрабатывать условно бесконечный объем не требуя пропорционального увеличения ресурсов.

                Вы хотите сказать, что все эти лямбды не потребляют памяти и никак не нагружают gc?


                Kotlin Flow все таки ограничен и неприменим когда у тебя множество входов.

                Каналы же как раз для этой цели сделаны.


                1. splix
                  15.11.2022 23:26

                  Похоже на упущенное обратное давление (backpressure) либо некорректную отмену...

                  Да, вы правильно заметили основной посыл моего коментария

                  Вы хотите сказать, что все эти лямбды не потребляют памяти и никак не нагружают gc?

                  Потребляют, но пропорционально количеству одновременно обрабатываемых данных.

                  Каналы же как раз для этой цели сделаны.

                  Да, именно про это я и написал. Что после переписывания на каналы заработало гораздо быстрее. Но все это было все еще в 7 раз медленней тупого кода на Reactor. К этому времени код с корутинами стал выглядеть совсем страшно и весь смысл корутин пропал раз все равно каналы. Было решено переписать на стандартные потоки.


                  1. mayorovp
                    16.11.2022 06:08

                    Корутины тоже потребляют пропорционально количеству одновременно обрабатываемых данных. Как и потоки, так что если не терять обратное давление — то разница между всеми тремя подходами только в константном множителе.


                    И утверждение, что из трёх вариантов — Reactor, корутины и потоки — именно корутины обладают худшим множителем — я вижу сомнительным.


                    1. splix
                      16.11.2022 22:56

                      Корутины тоже потребляют пропорционально количеству одновременно обрабатываемых данных

                      Наверное мы делали что-то не так, но из коробки у нас так не получилось.

                      И утверждение ... я вижу сомнительным.

                      Я рассказал свой опыт переписывания реальной высоконагруженной системы на разные архитектуры. Зачем бы я вам врал?

                      У вам может быть другая архитектура приложения, другие задачи, другая нагрузка и другие компетенции команды, поэтому ваш опыт может отличаться. Лучше напишите об это, это будет полезней для всех.


            1. ValentinMorozov Автор
              15.11.2022 07:45

              Пока нет


  1. razornd
    14.11.2022 23:36
    +1

    Жаль, что у вас нулевое покрытие кода тестами. Тестировать реактивные системы это отдельная интересная тема, которую вы незаслуженно обошли стороной.


    1. ValentinMorozov Автор
      15.11.2022 07:07

      Согласен. Не дошли руки.