Данная статья написана по мотивам одной из лекций, которые я читаю на своем курсе, подробности можете найти в моем телеграм канале

Также в данной статье будут отсылки к моим прошлым статьям по многопоточности, поскольку они дополняют друг друга

Код из статьи можно найти в репозитории на GitHub

Для чего нужна многопоточность

Многопоточность неразрывно связана с отзывчивостью вашего приложения под нагрузкой. Если нагрузки нет, скажем 5 запросов в час, то и о многопоточности можно особо не задумываться

На коммерческих проектах чаще встречается ситуация, когда вашим приложением пользуются сотни и даже тысячи клиентов одновременно, в такие моменты без многопоточности никуда, при этом приложение должно работать эффективно, не расходуя лишние ресурсы

Я собрал самые часто встречаемые паттерны работы с многопоточностью из своей практики, которые помогают писать отказоустойчивые и надежные приложения

В данной статье будем рассматривать многопоточность без применения webflux, однако если будет спрос, то распишу те же паттерны на webflux'е, там они выглядят более органично

Также не буду затрагивать Project Loom, поскольку мне не приходилось отлаживать и поддерживать код с использованием Loom на коммерческом проекте (Java 21 еще не используем), но когда-нибудь я напишу подробную и понятную статью по Loom

Стоимость потоков в Java и пулы потоков

Про стоимость потоков в Java я расписал в предыдущей статье про webflux, поэтому здесь напишу тезисно. Потоки используют следующие ресурсы:

  • CPU - переключение между потоками, их создание и завершение требует процессорного времени, при этом если количество потоков становится слишком большим, примерно > 3-4 тысяч (зависит от мощностей сервера), то процессор начинает тратить значительно количество времени на переключение между ними, производительность приложения перестает расти и даже падает

  • RAM - стек потока занимает место в оперативной памяти

  • Количество потоков ограничено возможностями Операционной системы

Для решения этих проблем мы должны создавать ограниченное число потоков, при этом если количество задач превышает количество созданных потоков, они должны помещаться в очередь, где будут дожидаться своего выполнения. Для этого предназначены пулы потоков - ExecutorService'ы

Подробно про ExecutorService'ы я рассказал в другой своей статье, там я разобрал как правильно создавать эластичные пулы потоков и объяснил почему не стоит использовать стандартные пулы из фабрики Executors

Для демонстрации паттернов нам понадобятся executorService’ы, создавать мы их будем по такому образцу:

Создание executorService:
@Bean(destroyMethod = "shutdown")
public ExecutorService elasticExecutor() {
    return createElasticExecutor(10, 100);
}

private ThreadPoolExecutor createElasticExecutor(int threads, int queueCapacity) {
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(queueCapacity);

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        threads, threads,
        60L, TimeUnit.SECONDS,
        queue, new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.allowCoreThreadTimeOut(true);

    return threadPoolExecutor;
}

Теперь, когда мы поняли, что нужно использовать пулы потоков для эффективного управления ресурсами, можем переходить непосредственно к разбору паттернов

Какие паттерны мы рассмотрим

  • Запуск асинхронного процесса по сигналу

  • Параллельное выполнение задач без ожидания результата

  • Ограничение количества потоков для вызова внешнего сервиса

  • Выполнение Scheduled задач

  • Параллельное обращение к разным источникам, с последующим объединением результатов

  • Контекст

  • Ожидание асинхронного ответа

Запуск асинхронного процесса по сигналу

Пожалуй это наиболее простой паттерн с которого стоит начать

Сигналом может выступать любое событие, которое не дожидается результата выполнения сценария, а лишь запускает его, например:

  • HTTP сообщение - приходит HTTP запрос на составление аналитического отчета, сам отчет будет формироваться несколько минут, при этом клиентский сервис не хочет ждать так долго, ему нужно лишь запустить процесс

  • Сообщение из брокера (Kafka, IBM MQ, RabbitMQ) - мы не всегда хотим обрабатывать сообщение в том же потоке, в котором оно поступает, при высоких нагрузках лучше переключиться на отдельный пул потоков, чтобы не блокировать чтение новых сообщений (в той же kafka количество читающих потоков равно количеству партиций, на которые подписался экземпляр приложения, а их, как правило, мало)

  • Запуск события по времени - например выполнить перерасчет данных в бд

Для запуска асинхронного выполнения какого-то процесса необходимо его передать пулу потоков, сделать это можно несколькими способами:

  • Напрямую вызвать executorService.execute(() -> { … }):

public void runWithStraightExecuteMethod() {
    runAsyncTasksElasticExecutor.execute(() -> {
        executeLongOperation();
    });
}
  • Через аннотацию @Async:

@Async("runAsyncTasksElasticExecutor")
public void runWithAnnotation() {
    executeLongOperation();
}
  • Через CompletableFuture с передачей executorService'а в качестве параметра метода:

public void runWithCompletableFuture() {
    CompletableFuture.runAsync(() -> {
            executeLongOperation();
        },
        runAsyncTasksElasticExecutor);
}

Все три вышеприведенных куска кода идентичны в плане поведения

Если запустить любой из них, то вызывающий поток будет отпущен, а выполнение метода executeLongOperation() начнется в потоке executorService’а

Параллельное выполнение задач без ожидания результата

Данный паттерн является альтернативным вариантом предыдущего, но все же заслуживает рассмотрения, т.к. часто встречается на практике

Допустим вам поступило сообщение, которое нужно отправить в разные внешние источники по HTTP или другому протоколу. При этом нам не важен порядок в котором будет выполняться отправка, а также результаты каждой отправки не зависят от результатов предыдущих (независимы)

Это означает, что данные задачи могут выполняться в параллельных потоках

Для начала создадим интерфейс с единственным методом, назначением которого будет отправить сообщение во внешний сервис:

public interface MessageSender {
	void send(Message message);
}

При этом может быть сколько угодно сервисов, реализующих данный интерфейс

Демонстрационный пример такого сервиса:
@Slf4j
@Service
public class MessageSender1implements MessageSender {

    @Override
	public void send(Message message) {
        // Выполнение операции отправки
        MentoringUtil.sleep(1500);

        log.info("Сообщение отправлено в источник #1");
    }
}

Напишем класс, который будет передавать обработку сразу нескольким MessageSender’ам в разных потоках:

@Service
@RequiredArgsConstructor
public class SendInParallelExample {

    private final List<MessageSender> messageSenders;
	private final ExecutorService runParallelTasksElasticExecutor;

	public void sendMessageToSeveralTargets(Message message) {
        messageSenders.forEach(messageSender ->
            CompletableFuture.runAsync(() -> 
    		    messageSender.send(message), runParallelTasksElasticExecutor)
        );
    }
}

Пояснения к коду:

  1. Импортируем коллекцию бинов, реализующих интерфейс MessageSender и бин ExecutorService, в котором будет происходить выполнение кода отправщиков

  2. Итерируемся по бинам MessageSender’ам, и для каждого выполняем асинхронный вызов CompletableFuture.runAsync(…), без ожидания результата, вторым параметром передаем ExecutorService, чтобы выполнение происходило в его потоках

Таким способом мы можем запустить параллельные задачи. При этом мы можем пользоваться и другими подходами, приведенными в предыдущем паттерне (прямой вызов executorService.execute(…) или аннотация @Async)

Пару слов про parallelStream. Мы могли бы сделать то же самое через .parallelStream().forEach(…), без использования CompletableFuture, но данный подход не подойдет для параллельного выполнения блокирующих операций (таких как внешние Rest вызовы), поскольку parallelStream использует ForkJoinPool, количество рабочих потоков которого ограниченно количеством ядер процессора, что может приводить к низкой производительности

Ограничение количества потоков для вызова внешнего сервиса

В микросервисной архитектуре желательно ограничивать количество одновременных вызовов внешних микросервисов, чтобы предотвратить возможные перегрузки. Если внешний сервис начинает деградировать, то и вызывающие сервисы могут начать деградировать вслед за ним, поскольку заканчиваются их потоки

Особенно остро данная проблема проявляется при работе с WebFlux, когда нет явного ограничения количеством потоков и ничто не мешает сделать одновременно десятки тысяч вызовов

Такая ситуация называется Chain Of Failures, когда деградация одного микросервиса приводит к каскадной деградации остальных, и ограничение количества вызовов позволяет предотвратить падение

Дополнительным решением данной проблемы является паттерн Circuit Breaker, но он скорее относится к паттернам микросервисов и не имеет отношения к многопоточности, поэтому здесь рассмотрен не будет

Варианты ограничения потоков в Java:

  • Semaphore

  • CompletableFuture с выделенным пулом потоков

  • Готовые инструменты библиотеки resilience4j: @Bulkhead и @RateLimiter

Semaphore

Исторически, для ограничения количества потоков, применяется класс Semaphore

Пример использования:

private final Semaphore semaphore = new Semaphore(10);

public String parsePageWithSemaphoreRateLimiting() {
	boolean isAcquired;
	try {
        isAcquired = semaphore.tryAcquire(2, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
		throw new CustomAppException(e);
    }

	if (!isAcquired) {
		log.info("Не удалось захватить поток семафора");

		return "fallback result";
    }

	try {
		return marketPlaceParser.parsePage();
    } catch (Exception e) {
		return "fallback result";
    } finally {
        semaphore.release();
    }
}

Пояснения к коду:

  1. Создаем семафор на 10 разрешений

  2. В течение двух секунд поток пытается захватить разрешение семафора

  3. Если захватить не удалось, то прекращаем выполнение метода, возвращая заготовленный ответ (так называемый "fallback")

  4. Если удалось захватить, то выполняем внешний вызов с помощью метода marketPlaceParser.parsePage()

  5. Обязательно отпускаем разрешение семафора в блоке finally

  6. Также в случае возникновения исключений при обращении к внешнему сервису возвращаем fallback

У данного решения есть несколько недостатков:

  • код выглядит громоздко

  • выполнение и ожидание происходит в вызывающих потоках, а значит мы можем заставить ждать все вызывающие потоки, сильно ухудшая производительность системы

  • нет гибкости при ожидании результата выполнения, которую нам дает CompletableFuture

CompletableFuture с выделенным пулом потоков

В качестве более продвинутого подхода можно использовать CompletableFuture с выделенным пулом потоков (мы его уже видели ранее)

Благодаря CompletableFuture вы можете более гибко управлять результатом, дожидаясь его при необходимости, выставляя таймаут и обрабатывая ошибки

Решение:

public CompletableFuture<String> parsePageWithRateLimitingCompletableFuture() {
	try {
		return CompletableFuture.supplyAsync(marketPlaceParser::parsePage, parserElasticExecutor)
            .orTimeout(4, TimeUnit.SECONDS)
            .exceptionally(e -> {
                log.warn("Ошибка при вызове внешнего сервиса", e);

				return "fallback result";
            });
    } catch (RejectedExecutionException e) {
        log.warn("Переполнена очередь выполонения задач на вызов внешнего сервиса");

		return CompletableFuture.completedFuture("fallback result");
    }
}

Пояснения к коду:

  1. Передаем вызов внешнего сервиса выделенному пулу потоков, с помощью явного указания пула вторым параметром в методе CompletableFuture.supplyAsync(…, parserElasticExecutor)

  2. Выставляем таймаут в 4 секунды на ожидание ответа

  3. В операторе .exceptionally(…) отлавливаем возможные исключения при вызове сервиса и возвращаем fallback

  4. Оборачиваем весь код в try-catch, поскольку если очередь задач parserElasticExecutor’а будет переполнена, то при попытке вызвать supplyAsync(..) будет выброшено исключение RejectedExecutionException, в таком случае мы также возвращаем fallback

Данный подход лучше семафора по следующим параметрам:

  • есть таймаут на вызов внешнего сервиса

  • fallback результат в случае ошибок

  • вызовы внешнего сервиса происходят в потоках выделенного executorService’а, в котором мы можем регулировать как максимальное количество потоков, так и глубину очереди задач

Благодаря этому мы получаем ожидаемое поведение и можем гибко контроллировать подаваемую нагрузку, однако всё еще код выглядит громоздко

Готовые инструменты библиотеки resilience4j: @Bulkhead и @RateLimiter

Для использования resilience4j добавим зависимость:

implementation "io.github.resilience4j:resilience4j-spring-boot2:2.2.0"

Bulkhead

Имеет две реализации SEMAPHORE (по умолчанию) и THREADPOOL, которые по сути предлагают те же самые решения, что были продемонстрированы выше

Bulkhead semaphore

Добавим конфигурацию в application.yml:

resilience4j:
  bulkhead:
    instances:
      parserSemaphore:
        maxConcurrentCalls: 10
        maxWaitDuration: 5s
  • parserSemaphore - название конфигурации

  • maxConcurrentCalls - количество разрешений семафора

  • maxWaitDuration - время в течение которого поток будет ждать разрешения от семафора, если таймаут превышен, то будет выброшено исключение BulkheadFullException

Решение:

@Bulkhead(name = "parserSemaphore", fallbackMethod = "fallbackMethod")
public String parsePageWithSemaphoreBulkheadResilience4j() {
	return marketPlaceParser.parsePage();
}

public String fallbackMethod(Exception e) {
    log.warn("Ошибка при вызове внешнего сервиса", e);

	return "fallback result";
}

Пояснения к коду:

  1. Используем аннотацию @Bulkhead, в поле name указываем наименование конфигурации, а также указываем наименование fallback метода

  2. Вызываем внешний сервис

  3. Объявляем fallback метод - он должен быть публичным, принимать исключение в качестве аргумента и возвращать тот же тип данных в результате, что и основной метод

Все исключения при выполнении метода будут отловлены с помощью fallback-метода

По сути это более удобная запись того что было продемонстрировано ранее с классическим семафором, поэтому остается проблема с ожиданием разрешений семафора в вызывающих потоках

Bulkhead theadpool

Опишем конфигурацию:

resilience4j:
  bulkhead:
    instances:
      parserThreadPool:
        coreThreadPoolSize: 10
				maxThreadPoolSize: 10
        queueCapacity: 100

Конфигурация представляет собой стандартные настройки, которые используются для создания ThreadPoolExecutor:

  • coreThreadPoolSize - количество core потоков пула

  • maxThreadPoolSize - максимальное число потоков пула

  • queueCapacity - глубина очереди задач

Также нам понадобиться @TimeLimiter для корректной работы таймаута:

resilience4j:
  ...
  timelimiter:
    instances:
      parserThreadPool:
        timeoutDuration: 3s

Решение:

@TimeLimiter(name = "parserThreadPool", fallbackMethod = "fallbackMethodCompletableFuture")
@Bulkhead(name = "parserThreadPool", fallbackMethod = "fallbackMethodCompletableFuture", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<String> parsePageWithThreadPoolBulkheadAndTimeLimiterResilience4j() {
	String response = marketPlaceParser.parsePage();

	return CompletableFuture.completedFuture(response);
}

public CompletableFuture<String> fallbackMethodCompletableFuture(Exception e) {
    log.warn("Ошибка при вызове внешнего сервиса", e);

	return CompletableFuture.completedFuture("fallback result");
}

Пояснения к коду:

  1. В аннотации @Bulkhead указываем тип Bulkhead.Type.THREADPOOL

  2. Указываем @TimeLimiter

  3. Вызываем внешний сервис внутри метода, при этом выполнение будет происходить в пуле потоков, созданном в @Bulkhead

  4. Возвращаем CompletableFuture.completedFuture(…), во-первых возврат CompletableFuture в результате является обязательным для аннотаций @Bulkhead (с типом THREADPOOL) и @TimeLimiter, во-вторых это позволяет нам гибко управлять ожиданием результата

  5. Объявляем fallback метод

При вышеуказанном подходе, мы получаем то же самое поведение, что было описано выше с использованием выделенного пула потоков и CompletableFuture

Здесь также добавлен TimeLimiter, поскольку если указать оператор .orTimeout(…) в CompletableFuture, то таймаут будет работать некорректно - все потоки, ожидающие в очереди executorService’а bulkhead’а не увидят оператор .orTimeout(), поэтому будут ожидать своей очереди бесконечно долго

Будьте внимательны. Стандартный threadPool, который создается внутри кода resilience4j, будет использовать core потоки до тех пор, пока очередь задач не заполнится до конца, и только потом начнет увеличивать количество потоков до значения maxThreadPoolSize, что может приводить к нежелательному поведению. Для пресечения такого поведения я рекомендую создавать количество core потоков равное maxThreadPoolSize, и при этом их должно быть немного, поскольку они не будут завершаться

RateLimiter

Bulkhead паттерн ограничивает максимальное количество одновременно вызывающих потоков, однако этого не всегда достаточно. Представим что внешний сервис отвечает очень быстро - за 10мс каждый ответ, и мы использовали простое ограничение количества одновременных вызовов 10-ю штуками, тогда мы столкнемся с проблемой, что за секунду может выполняться до 1000 запросов (каждый поток по 100 вызовов в секунду), тем самым ограничение вроде бы есть, но нагрузки все равно могут быть высокими

Эту проблему решает RateLimiter. Он похож на Bulkhead, но при этом добавляет “скользящее окно”, с помощью которого ограничивается количество вызовов за определенное время, например не более 100 запросов за последние 10 секунд

Т.е. если в первую секунду выполниться 100 вызовов, то в остальные 9 секунд все дальнейшие запросы будут возвращать fallback (или обрабатываться с ошибкой RequestNotPermitted, если fallback не настроен)

Получается что данный паттерн дает нам более правильное поведение для сдерживания подаваемой нагрузки на внешний сервис

Реализовывать такой паттерн самостоятельно будет сложно, поэтому воспользуемся готовым решением от resilience4j

Конфигурация:

resilience4j:
  rateLimiter:
    instances:
      parser:
        timeoutDuration: 5s
        limitRefreshPeriod: 1s
        limitForPeriod: 10
  • timeoutDuration - время в течение которого поток ждет разрешения

  • limitRefreshPeriod - период за который ограничивается число вызовов

  • limitForPeriod - предельное число вызовов за время указанное в limitRefreshPeriod

Решение:

@RateLimiter(name = "parser", fallbackMethod = "fallbackMethodCompletableFuture")
public CompletableFuture<String> parsePageWithRateLimitingResilience4jCompletableFuture() {
	return CompletableFuture.supplyAsync(marketPlaceParser::parsePage, parserElasticExecutor)
        .orTimeout(2, TimeUnit.SECONDS);
}

public CompletableFuture<String> fallbackMethodCompletableFuture(Exception e) {
    log.warn("Ошибка при вызове внешнего сервиса", e);

	return CompletableFuture.completedFuture("fallback result");
}

Пояснения к коду:

  1. Используем аннотацию @RateLimiter, указываем имя конфигурации и fallback метод

  2. Передаем вызовы внешнего сервиса отдельному executorService’у с помощью метода CompletableFuture.supplyAsync(…)

  3. Выставляем таймаут

  4. Объявляем fallback метод

Что мы получаем:

  • код выглядит лаконично

  • есть таймаут на вызов внешнего сервиса

  • возвращаем fallback результат в случае ошибок

  • вызовы внешнего сервиса происходят в потоках выделенного executorService’а

  • в ответе возвращается CompletableFuture, благодаря чему мы можем гибко управлять результатом

Таким образом, мы получаем сплошные преимущества от такого подхода, поэтому я рекомендую применять именно RateLimiter для ограничения количества вызовов внешнего сервиса

Выполнение Scheduled задач

Данное решение я разобрал в другой статье, с более подробными примерами, здесь изложу подход более кратко

По умолчанию аннотация @Scheduled использует один и тот же поток для выполнения всех методов, помеченных данной аннотацией. В связи с этим могут возникать конфликты, когда один из @Scheduled методов еще не закончил свое выполнение, и пришло время выполнять другой @Scheduled метод, в таком случае второй метод просто не будет запущен, т.к. поток занят

Для решения следует использоваться связку аннотаций @Scheduled + @Async, тогда при запуске @Scheduled задачи она будет передаваться на выполнение пулу потоков, указанному в @Async, а единственный @Scheduled поток будет отпущен

При этом чаще всего нам подойдет поведение, когда в аннотации @Async используется executorService на 1 поток, но для разных @Scheduled методов используются разные executorService'ы. Тогда мы получаем следующее поведение - каждый @Scheduled метод будет выполняться в своем потоке, при этом повторные запуски метода будут игнорироваться, если предыдущий запуск не завершил свою работу

Создадим executorService на один поток:

@Bean(destroyMethod = "shutdown")
public ExecutorService refreshCacheSingleExecutor() {
	return Executors.newSingleThreadExecutor();
}

Обратите внимание. Executors.newSingleThreadExecutor() имеет важный недостаток - его очередь задач неограниченна, что может приводить к утечкам памяти, в случаях когда задачи в него поступают чаще, чем успевают выполняться

Пример правильного создания executorService’а на один поток:

@Bean(destroyMethod = "shutdown")
public ExecutorService correctSingleExecutor() {
	return new ThreadPoolExecutor(
		1, 1, 
		0L, TimeUnit.MILLISECONDS,
		new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
}

При попытке добавить новую задачу в такой executorService, она будет проигнорирована, если он занят другой задачей, поскольку мы использовали политику переполнения DiscardPolicy, которая тихо игнорирует добавление новой задачи в executorService. Если же вы хотите явно видеть ошибку о том, что пул потоков занят на момент добавления новой задачи, используйте политику AbortPolicy

Применим executorService с помощью аннотации @Async к @Scheduled задаче:

@Async("refreshCacheSingleExecutor")
@Scheduled(cron = "*/2 * * * * *")
public void refreshCache() {
	// долгая операция

    log.info("Кеш обновлен");
}

Таким образом мы получаем поведение, которое и ожидали:

  • плановая задача запускается всегда в нужное время при наличии конфликтов

  • и в то же время она не будет запущена, если предыдущий запуск этой же задачи не завершил свое выполнение

Параллельное обращение к разным источникам, с последующим объединением результатов

Часто встречается ситуация, когда необходимо обратиться в несколько микросервисов, полученные ответы объединить и в таком виде вернуть результат

Для наглядности допустим внешние сервисы довольно медлительны, первый возвращает ответ за 2 секунды, второй за 3 секунды

Напишем простой код с последовательным вызовом внешних сервисов:

public ProductPageResponse sequentialExecution(UUID productId) {
    ProductInfo productInfo = productInfoClient.fetchProductInfo(productId);
    List<Feedback> feedbacks = feedbacksClient.fetchFeedbacks(productId);

    ProductPageResponse productPage = productMapper.toPage(productInfo, feedbacks);

    log.info("Страница продукта: {}", productPage);

	return productPage;
}

Пояснения к коду:

  1. productInfoClient.fetchProductInfo(…) - вызывает сервис получения информации по товару, выполняется 2 секунды

  2. feedbacksClient.fetchFeedbacks(…) - вызывает сервис получения отзывов о товаре, выполняется 3 секунды

  3. productMapper.toPage(productInfo, feedbacks) - объединяет ответы в один результат

Запустим код:

: Время выполнения: 6132

Результат превышает суммарное время вызовов внешних микросервисов, но мы хотим сэкономить время и распараллелить их

Перепишем данный метод с использованием CompletableFuture:

public CompletableFuture<ProductPageResponse> parallelExecution(UUID productId) {
	CompletableFuture<ProductInfo> productInfoCompletableFuture = CompletableFuture.supplyAsync(() -> productInfoClient.fetchProductInfo(productId), productInfoElasticExecutor);
    CompletableFuture<List<Feedback>> feedbacksCompletableFuture = CompletableFuture.supplyAsync(() -> feedbacksClient.fetchFeedbacks(productId), feedbacksElasticExecutor);

	return CompletableFuture.allOf(productInfoCompletableFuture, feedbacksCompletableFuture)
        .thenApply(ignore -> {
            ProductInfo productInfo = productInfoCompletableFuture.getNow(new ProductInfo());
            List<Feedback> feedbacks = feedbacksCompletableFuture.getNow(List.of());

            ProductPageResponse productPage = productMapper.toPage(productInfo, feedbacks);

            log.info("Страница продукта: {}", productPage);

			return productPage;
        });
}

Пояснения к коду:

  1. Запускаем параллельные вызовы сервисов при помощи CompletableFuture.supplyAsync(…, elasticExecutor) - вызовы клиентов начнутся в тот же момент, когда мы создаем CompletableFuture

  2. Объединяем созданные CompletableFuture с помощью оператора .allOf(...)

  3. К сожалению нет такого оператора, который бы позволял сразу получить результаты выполнения CompletableFuture после оператора .allOf(…), поэтому нам приходится внутри оператора .thenApply() вызывать методы .getNow() у исходных CompletableFuture, поскольку они оба уже завершили свое выполнение, а значит их результаты могут быть получены немедленно

  4. Затем объединяем результаты запросов и возвращаем ответ

Обратите внимание, при вызове .supplyAsync(…, elasticExecutor) без передачи executorService’а вторым параметром, мы бы запустили выполнение кода в common ForkJoinPool из CompletableFuture. Ранее я уже объяснил почему FJP не подходит для выполнения блокирующих вызовов. Поэтому следует передавать вторым параметром executorService

Запустим код:

: Время выполнения: 4297

Результат в районе 4х секунд, а значит он меньше суммарного времени обращения к внешним микросервисам, тем самым мы получили ожидаемое поведение, при этом объединив результат вызова двух сервисов

Контекст

Бывает двух типов:

  • ограниченный одним потоком

  • доступный из разных потоков

Также бывает распределенный контекст (кеш) между несколькими микросервисами (redis, hazelcast), но это уже ближе к паттернам микросервисов

Контекст, ограниченный одним потоком

Контекст в рамках одного потока основывается на механизме ThreadLocal, который позволяет привязать объект к текущему потоку, тем самым данный объект будет невидим для остальных потоков

ThreadLocal

Создадим класс, который будет инкапсулировать в себе ThreadLocal:

@Component
public class ThreadContextHolder {

	private final ThreadLocal<Map<String,String>> threadLocalContext = ThreadLocal.withInitial(HashMap::new);

	public static void put(String key,String value) {
        threadLocalContext.get().put(key, value);
    }

	public static String get(String key) {
		return threadLocalContext.get().get(key);
    }
}

Пояснения к коду:

  1. Создаем ThreadLocal, в который помещаем пустую HashMap, в нашем случае достаточно иметь в качестве значения строку, но обычно используется Object, чтобы хранить любые объекты

  2. Создаем методы put и get, для более удобного использования контекста

Создадим web фильтр, в котором будем имитировать парсинг JWT:

@Component
public class ParseJwtThreadLocalFilterextends HttpFilter {

    @Override
	protected void doFilter(HttpServletRequest request,
                            HttpServletResponse response,
                            FilterChain chain) throws IOException, ServletException {
		String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
        JwtParams jwtParams = parseJwt(authorizationHeader);

        ThreadContextHolder.put("username", jwtParams.getUsername());

        chain.doFilter(request, response);
    }

	private JwtParams parseJwt(String authorizationHeader) {
        // здесь должен быть парсинг JWT
    }
}

Пояснения к коду:

  1. Создаем бин - наследник HttpFilter, который будет вызываться на каждый входящий HTTP запрос

  2. Парсим jwt из заголовка Authorization

  3. Кладем в контекст имя пользователя

  4. Вызываем следующий фильтр по цепочке

Теперь мы можем извлекать данные из контекста в любом месте, где используется данный поток:

@Slf4j
@RestController
@RequestMapping("/api/v1/thread-context")
public class ThreadContextController {

    @GetMapping
	public String getUsername() {
		return "Имя пользователя из контекста потока: " + ThreadContextHolder.get("username");
    }
}

Пояснения к коду:

  1. В методе контроллера обращаемся к контексту - поскольку наш HttpFilter будет вызываться в том же потоке что и метод контроллера, мы можем извлечь нужные данные из контекста

Чтобы не писать свою обертку над ThreadLocal можно воспользоваться готовым решением - MDC контекстом

MDC контекст

Данный класс поставляется вместе с библиотеками логгирования, в Spring по умолчанию это logback

Перепишем предыдущее решение с использованием MDC

HttpFilter:
@Component
public class ParseJwtMdcFilterextends HttpFilter {

    @Override
	protected void doFilter(HttpServletRequest request,
                            HttpServletResponse response,
                            FilterChain chain)throws IOException, ServletException {
		String authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION);
        JwtParams jwtParams = parseJwt(authorizationHeader);

        MDC.put("username", jwtParams.getUsername());

        chain.doFilter(request, response);
    }

	private JwtParams parseJwt(String authorizationHeader) {
        // здесь должен быть парсинг JWT
    }
}

Контроллер:
@Slf4j
@RestController
@RequestMapping("/api/v1/thread-context")
@RequiredArgsConstructor
public class MDCExampleController {

    @GetMapping
	public String getUsername() {
		return "Имя пользователя из контекста потока: " + MDC.get("username");
    }
}

Поведение будет полностью идентичным предыдущему примеру

Отличительной чертой MDC контекста является то, что его поля доступны при записи логов. Благодаря нему к логам добавляются разные параметры, например уникальный идентификатор входящего запроса для последующего поиска логов, принадлежащих одному запросу

В продемонстрированных примерах с ThreadLocal и MDC есть важный аспект - потоки томката переиспользуются для выполнения запросов разных клиентов (после выполнения запроса клиента, в этом же потоке начинает выполняться следующий запрос, т.е. типичное поведение ExecutorService’а), из-за чего в ThreadLocal контексте может оставаться информация из предыдущих запросов

Поэтому такой контекст необходимо очищать в момент завершения бизнес логики, либо перед началом его использования

Подход с контекстом в рамках одного потока очень удобен для традиционного синхронного приложения, где обработка каждого запроса клиента происходит в своем потоке. И не подходит для реактивных приложений на webflux, поскольку там разные потоки могут выполнять разные части одного запроса

Контекст, доступный из разных потоков

Возможные варианты реализации контекста:

  • volatile или Atomic переменная

  • Конкурентная коллекция - ConcurrentHashMap

  • Готовые реализации - Caffeine, Spring Cache

Здесь покажу только пример с ручным созданием кеша Caffeine

Работа с конкурентными коллекциями и volatile/Atomic переменными очень проста и почти не отличается от взаимодействия с обычной коллекцией или переменной

А если расписывать применение Spring Cache то статья начинает сильно уходить от основной темы

Кеш с использованием Caffeine

Импортируем библиотеку:

implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'

Решение:

@Slf4j
@Service
public class CaffeineExampleService {

	private final Cache<String,String> cache = Caffeine.newBuilder()
        .expireAfterAccess(Duration.ofMinutes(1))
        .maximumSize(100)
        .build();

	public String process(String request) {
		String cachedResult = cache.getIfPresent(request);
		if (cachedResult != null) {
            log.info("Возвращаю закешированный результат");

			return cachedResult;
        }

        MentoringUtil.sleep(1000);

		String response = "test response";
        cache.put(request, response);

		return response;
    }
}

Пояснения к коду:

  1. Создаем кеш (данный интерфейс похож на Map), выставляем время жизни и максимальное количество записей в кеше

  2. Реализуем простой метод с кешированием результата выполнения

Благодаря возможности выставлять время жизни и размер кеша, мы предотвращаем возможные утечки памяти

Ожидание асинхронного ответа

Иногда встречается необходимость в ожидании асинхронного ответа от внешнего сервиса, например мы отправляем запрос в один топик kafka, и хотим дождаться ответа из другого топика (прослушивание топика происходит в отдельном потоке)

В качестве решения нам потребуется хранить в контексте callback для каждого отправленного сообщения

Также есть важное условие, мы будем отправлять в запросе уникальный идентификатор, и этот же идентификатор должен быть передан внешним сервисом в ответе, чтобы мы могли сопоставить запрос и ответ

Создадим класс с контекстом:

@Service
public class KafkaMessageContext {

    private final Cache<String, CompletableFuture<KafkaResponse>> messageContext = Caffeine.newBuilder()
        .expireAfterAccess(Duration.ofMinutes(1))
        .maximumSize(1000)
        .build();

    public CompletableFuture<KafkaResponse> createMessageCompletableFuture(String id) {
        CompletableFuture<KafkaResponse> completableFuture = new CompletableFuture<>();
        messageContext.put(id, completableFuture);

        return completableFuture;
    }

    public CompletableFuture<KafkaResponse> findById(String id) {
        return messageContext.getIfPresent(id);
    }

    public void removeById(String id) {
        messageContext.invalidate(id);
    }
}

Пояснения к коду:

  1. Создаем кеш, где в качестве ключа будет выступать уникальный идентификатор, а в качестве значения callback в виде CompletableFuture, также указываем время жизни и размер кеша, чтобы предотвратить возможные утечки памяти

  2. Добавляем метод создания callback’а, и добавления его в контекст, callback создаем через new CompletableFuture<>(), который позволит нам подписаться на него для ожидания ответа

  3. Добавляем методы findById и removeById

Реализуем сервис обмена сообщениями с ожиданием ответа:

@Service
@RequiredArgsConstructor
public class AwaitAsyncRequestService {

    private final KafkaProducer kafkaProducer;
    private final KafkaMessageContext kafkaMessageContext;

    public KafkaResponse sendAndReceive(KafkaRequest request) {
        CompletableFuture<KafkaResponse> responseCompletableFuture = kafkaMessageContext.createMessageCompletableFuture(request.getId());

        kafkaProducer.send(request);

        try {
            return responseCompletableFuture
                .get(5000, TimeUnit.MILLISECONDS);
        } catch (TimeoutException | ExecutionException | InterruptedException e) {
            throw new CustomAppException(e);
        } finally {
            kafkaMessageContext.removeById(request.getId());
        }
    }
}

Пояснения к коду:

  1. Вызываем метод создания callback’а

  2. Выполняем отправку в kafka

  3. Ожидаем ответ от callback’а в течение 5 секунд

  4. Обрабатываем возможные исключения, удаляем запись из кеша в блоке finally

Создадим класс слушателя ответного топика:

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {

    private final JsonUtil jsonUtil;
    private final KafkaMessageContext kafkaMessageContext;

    @KafkaListener(topics = "${app.kafka.topicOut}", groupId = "test")
    public void receiveResponse(ConsumerRecord<String, String> consumerRecord) {
        KafkaResponse response = jsonUtil.fromJson(consumerRecord.value(), KafkaResponse.class);

        CompletableFuture<KafkaResponse> responseCompletableFuture = kafkaMessageContext.findById(response.getId());

        if (responseCompletableFuture == null) {
            log.info("Получено сообщение в ответный топик для которого не найден callback: {}", response);
        } else {
            responseCompletableFuture.complete(response);
        }
    }
}

Пояснения к коду:

  1. Слушаем исходящий топик

  2. По идентификатору входящего сообщения пытаемся найти callback в контексте

  3. Если callback найден, то вызываем метод complete, с этого момента продолжается выполнение кода в методе sendAndReceive класса AwaitAsyncRequestService

С помощью данного решения мы можем дождаться любого асинхронного ответа

В случае с kafka есть одна особенность, которую надо обязательно учитывать, если у вашего приложения будет больше одного экземпляра, то необходимо чтобы каждый экземпляр приложения получал все сообщения из ответного топика, поскольку есть вероятность, что ответ будет прочитан другим экземпляром и тогда ваш ответ будет потерян

Итог

В данной статье я постарался показать практические примеры, с которыми сталкивался в работе, не касаясь низкоуровневых инструментов volatile, synchronized, барьеров и др.

Представленные паттерны являются лишь кирпичиками, из которых складывается общая работа приложения

Правильное их применение требует внимательности и тщательного тестирования, обдумывайте и проверяйте возможные корнер кейсы, особенно если ваше приложение работает с повышенными нагрузками

Надеюсь вам неоднократно пригодится материал данной статьи на реальном проекте, спасибо что дочитали

Комментарии (0)