API structured concurrency в Java наконец-то обрёл устойчивость. В новом переводе от команды Spring АйО подробно рассматриваются последние изменения, появившиеся с выходом JEP 505: фабричный метод open(), политики Joiner'ов, улучшенная отмена задач, дедлайны, передача контекста через ScopedValues и строгая защита от ошибок использования. Всё это делает параллельное программирование в Java более безопасным, читаемым и управляемым.


API structured concurrency вновь изменился — после двух инкубационных этапов и четырёх раундов превью.

В идеале такой сценарий кажется неожиданным.

Однако, учитывая статус API как предварительного, подобные изменения возможны — как и произошло в данном случае.

Эти изменения значительно повлияли на зрелость API, и есть надежда, что теперь он стабилизируется и больше не потребует доработок.

Что на самом деле изменилось на этот раз

Когда я впервые начал работать со structured concurrency ещё в инкубационной фазе, меня вдохновила идея более чистого и понятного concurrent кода.

Идея была проста: рассматривать параллельные задачи как структурированный блок, в котором все порождённые (spawned) задачи завершаются до выхода из блока. В теории звучало идеально, но API продолжал меняться, и за этими изменениями было сложно уследить.

Последняя итерация в JEP 505 привнесла важные усовершенствования, которые, на мой взгляд, наконец-то сделали этот функционал устойчивым. Самое заметное изменение — это введение более гибкой обработки задач и лучшая интеграция с виртуальными потоками. В этой статье я расскажу о различиях и поясню, почему они важны.

Основная концепция остаётся неизменной

Прежде чем перейти к изменениям, давайте вспомним, какую проблему пытается решить structured concurrency. В традиционном конкурентном программировании управление тасками часто оказывается разрозненным:

import java.util.Random;
import java.util.concurrent.*;

public class TraditionalConcurrencyExample {
  private static final Random random = new Random();

  private static String fetchUserData(String userId) throws InterruptedException {
    Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds
    if (random.nextBoolean()) {
      throw new RuntimeException("User service unavailable");
    }
    return "UserData[" + userId + "]";
  }

  private static String fetchUserPreferences(String userId) throws InterruptedException {
    Thread.sleep(800 + random.nextInt(1500)); // 0.8-2.3 seconds
    if (random.nextBoolean()) {
      throw new RuntimeException("Preferences service down");
    }
    return "Preferences[" + userId + "]";
  }

  private static String combineUserInfo(String userData, String preferences) {
    return userData + " + " + preferences;
  }

  public static String getUserInfoTraditional(String userId) throws Exception {
    try (ExecutorService executor = Executors.newCachedThreadPool()) {
      Future<String> future1 = executor.submit(() -> fetchUserData(userId));
      Future<String> future2 = executor.submit(() -> fetchUserPreferences(userId));

      try {
        String userData = future1.get();
        String preferences = future2.get();
        return combineUserInfo(userData, preferences);
      } catch (Exception e) {
        // Cleanup is messy - what about the other task?
        System.out.println("Error occurred, attempting cleanup...");
        future1.cancel(true);
        future2.cancel(true);
        throw e;
      }
    }
  }

  void main() {
    for (int i = 0; i < 5; i++) {
      try {
        System.out.println("Attempt " + (i + 1) + ": " +
            getUserInfoTraditional("user123"));
      } catch (Exception e) {
        System.out.println("Attempt " + (i + 1) + " failed: " +
            e.getMessage());
      }
      System.out.println();
    }
  }
}

Когда вы запускаете этот код, обычно проявляется несколько проблем:

  • Сложная обработка ошибок: если одна из задач завершается с ошибкой, приходится вручную отменять вторую. В противном случае она продолжит выполняться, несмотря на то что больше не нужна, что приводит к утечке ресурсов.

  • Управление жизненным циклом потоков: вы сами несёте ответственность за полный жизненный цикл потоков.

  • Передача исключений: проверяемые (checked) исключения часто оборачиваются не самым правильным образом.

  • Отсутствие гарантии очистки: если основной поток завершается неожиданно, задачи могут продолжить выполняться.

Structured concurrency призвана решить эти проблемы.

Главное изменение: статические фабричные методы

Наиболее заметное новшество в JEP 505 — больше не нужно вызывать new StructuredTaskScope<>(). Вместо этого используется метод open():

try (var scope = StructuredTaskScope.open()) {
  // ...
}

Метод open() без аргументов возвращает область (scope), которая ожидает успешного завершения всех подзадач либо сбоя хотя бы одной — это политика по умолчанию «все-или-ошибка» (“all-or-fail”). Если требуется более гибкое поведение, используйте перегруженный вариант open(joiner) и передайте пользовательскую политику завершения через Joiner (об этом чуть позже). Почему используется фабрика? Она предоставляет вменяемые значения по умолчанию и, что особенно важно, допускает развитие имплементации без нарушения совместимости с вашим кодом. Я считаю это изменение удачным: использование одного ключевого метода делает код более лаконичным и снижает вероятность ошибок.

Теперь перепишем предыдущий пример с использованием нового API:

public static String getUserInfoTraditional(String userId) throws Exception {
  try (var scope = StructuredTaskScope.open()) {
    StructuredTaskScope.Subtask<String> task1 = scope.fork(() -> fetchUserData(userId));
    StructuredTaskScope.Subtask<String> task2 = scope.fork(() -> fetchUserPreferences(userId));

    scope.join();

    String userData = task1.get();
    String preferences = task2.get();

    return combineUserInfo(userData, preferences);
  }
}

Разница колоссальна. С использованием structured concurrency очистка выполняется автоматически и гарантированно. Если какая-либо из задач завершается с ошибкой, все остальные задачи в области отменяются. Если область завершается (нормально или с исключением), все ресурсы освобождаются. Это сопоставимо с механизмом try-with-resources — но для параллельных задач.

Такой подход даёт несколько очевидных преимуществ, которые я особенно ценю:

  • Гарантированная очистка: задачи не могут пережить свою область.

  • Явное владение: задачи принадлежат конкретной области.

  • Безопасность при исключениях: сбои обрабатываются последовательно.

  • Управление ресурсами: управление пулами потоков не требуется.

  • Композиционность: scope-ы/области можно складывать и комбинировать.

Joiner'ы: задаём политику успеха

Joiner перехватывает события завершения задач и определяет:

  1. следует ли отменять одноуровневые (соседние) задачи,

  2. что должен возвращать метод join().

JDK включает несколько фабричных помощников для таких случаев:

«Побеждает первая» (также известен как гонка реплик).

try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
    urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
    return scope.join();             // returns first successful String
}

«Все должны завершиться успешно, и мне нужны их результаты»

try (var scope = StructuredTaskScope.open(Joiner.<Result>allSuccessfulOrThrow())) {
    tasks.forEach(scope::fork);
    return scope.join()              // Stream<Subtask<Result>>
                 .map(Subtask::get)
                 .toList();
}

Эти небольшие помощники упрощают распространённые шаблоны — «гонка» (“race”), «сбор» (“gather”), «ожидание всех» (“wait-for-all”).

Создание собственного Joiner'а

Иногда требуется своя политика. Предположим, я хочу собрать все успешные подзадачи, проигнорировав сбои:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.StructuredTaskScope;
import java.util.stream.Stream;

void main() {

  List<String> urls = List.of("https://bazlur.ca", "https://foojay.io", "https://github.com");

  try (var scope = StructuredTaskScope.open(new MyCollectingJoiner<String>())) {
    urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
    List<String> fetchedContent = scope.join().toList();

    System.out.println("Total fetched content: " + fetchedContent.size());
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }

}

private String fetchFrom(String url) {
  return "fetched from " + url + "";
}

class MyCollectingJoiner<T> implements StructuredTaskScope.Joiner<T, Stream<T>> {
  private final Queue<T> results = new ConcurrentLinkedQueue<>();

  @Override
  public boolean onComplete(StructuredTaskScope.Subtask<? extends T> st) {
    if (st.state() == StructuredTaskScope.Subtask.State.SUCCESS)
      results.add(st.get());
    return false;
  }

  @Override
  public Stream<T> result() {
    return results.stream();
  }
}

Интерфейс минимален — onFork, onComplete и result() — но при этом достаточно мощный для большинства пользовательских сценариев. Чтобы запустить этот код, потребуется JDK 25, и его можно выполнить из командной строки с помощью следующей команды:

java --enable-preview CollectingJoiner.java
Комментарий от эксперта команды Spring АйО, Михаила Поливахи

Java 25 GA на момент публикации статьи ещё официально не вышла. Выход 25-ой Java планируется на Сентябрь 2025. Пока есть только early access билды

Улучшенная отмена и дедлайны

Правила отмены по сути не изменились, но API стал строже. Если поток-владелец прерывается до или во время вызова join(), область автоматически отменяет все незавершённые подзадачи. Подзадачи должны корректно обрабатывать InterruptedException; в противном случае close() будет блокироваться в ожидании их завершения. (Если вы используете блокирующий ввод-вывод — всё в порядке; если используете polling — не забудьте проверять Thread.currentThread().isInterrupted().)

Нужен дедлайн? Передайте конфигурационную лямбду:

try (var scope = StructuredTaskScope.open(
         Joiner.<String>anySuccessfulResultOrThrow(),
         cfg -> cfg.withTimeout(Duration.ofSeconds(2)))) {
    // ...
}

Если срабатывает таймаут, scope отменяется, а join() выбрасывает TimeoutException. На практике я привязываю таймаут к каждому внешнему вызову, чтобы держать неуправляемые задачи под контролем.

Также можно заменить фабрику виртуальных потоков по умолчанию на ту, которая задаёт имена или устанавливает thread-local переменные:

ThreadFactory tagged = Thread.ofVirtual().name("api-%d").factory();

try (var scope = StructuredTaskScope.open(
         Joiner.<Integer>allSuccessfulOrThrow(),
         cfg -> cfg.withThreadFactory(tagged))) {
    // ...
}

Одни только имена потоков делают дампы значительно читаемее.

ScopedValues передаются автоматически

Все подзадачи наследуют привязки ScopedValues, установленные в родительском потоке. Это означает, что можно передавать контекст запроса, учетные данные безопасности или информацию MDC без необходимости вручную упаковывать их в каждую лямбду. Опробовав эту возможность, к ThreadLocal возвращаться уже не хочется.

Комментарий от команды Spring АйО

Про Scoped Values мы как-то уже рассказывали. Вот первая и вторая части.

Защита от неправильного использования

StructuredTaskScope строго соблюдает структуру. Если вызвать fork() из потока, отличного от владельца, будет выброшено исключение StructureViolationException. Забыли использовать try-with-resources и позволили scope выйти за пределы метода? Результат тот же. Подход строгий, но эффективно предотвращает случайное истощение ресурсов (по аналогии с «fork-бомбами»).

Комментарий от Михаила Поливахи

Речь про fork-bomb уязвимость, суть которой заключается в том, что вредоносный процесс fork`ается бесконтрольно, потребляя при этом все ресурсы сервера. https://en.wikipedia.org/wiki/Fork_bomb

Улучшения в Observability

Теперь thread дампы включают дерево скоупов, так что инструменты могут напрямую отображать отношения родитель–потомок. Когда я запускаю jcmd <pid> Thread.dump_to_file -format=json, каждый scope отображается со своими forked потоками, вложенными под владельцем. Найти зависшую задачу, блокирующую виртуальный пул, теперь занимает пару секунд с grep — вместо получасового разбора.

Ещё несколько примеров для практики

Пример 1 — 360°-обзор продукта (сбор–затем–ошибка)

Классический endpoint в e-commerce: один HTTP-запрос должен агрегировать основную информацию о продукте, данные о наличии в реальном времени и персонализированную цену. Каждый подсервис вызывается параллельно внутри StructuredTaskScope, где применяется политика «всё или ничего»: любая ошибка или превышение односекундного дедлайна отменяет всю группу и возвращает ошибку вызывающему.

Таймаут области, пользовательские имена потоков и joiner allSuccessfulOrThrow() позволяют выразить то, что обычно требует сложной конфигурации CompletableFuture, всего в трёх декларативных строках.

import java.time.Duration;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ThreadFactory;

public class ThreeSixtyProductView {
  record Product(long id, String name) {}
  record Stock(long productId, int quantity) {}
  record Price(long productId, double amount) {}
  record ProductPayload(Product core, Stock stock, Price price) {}

  private static Product coreApi(long id) throws InterruptedException {
    Thread.sleep(100); // simulate latency
    return new Product(id, "Gadget‑" + id);
  }

  private static Stock stockApi(long id) throws InterruptedException {
    Thread.sleep(120);
    return new Stock(id, new Random().nextInt(100));
  }

  private static Price priceApi(long id) throws InterruptedException {
    Thread.sleep(150);
    return new Price(id, 99.99);
  }

  static ProductPayload fetchProduct(long id) throws Exception {
    ThreadFactory named = Thread.ofVirtual().name("prod-%d", 1).factory();

    try (var scope = StructuredTaskScope.open(
        StructuredTaskScope.Joiner.<Object>allSuccessfulOrThrow(),
        cfg -> cfg.withTimeout(Duration.ofSeconds(1))
            .withThreadFactory(named))) {

      StructuredTaskScope.Subtask<Product> core = scope.fork(() -> coreApi(id));
      StructuredTaskScope.Subtask<Stock> stock = scope.fork(() -> stockApi(id));
      StructuredTaskScope.Subtask<Price> price = scope.fork(() -> priceApi(id));

      scope.join(); // throws on first failure / timeout
      return new ProductPayload(core.get(), stock.get(), price.get());
    }
  }

  void main() throws Exception {
    ProductPayload productPayload = fetchProduct(1L);
    System.out.println(productPayload);
  }
}

Пример 2 — «Гонка зеркал» для загрузки файлов

Крупные бинарные файлы размещаются на нескольких зеркалах CDN. Поскольку задержки различаются, запросы отправляются на все зеркала одновременно, а Joiner.anySuccessfulResultOrThrow() используется для выбора первого успешно возвращённого InputStream, при этом все остальные задачи отменяются.

Пропускная способность и соединения освобождаются мгновенно, а пользователи получают максимально быструю загрузку — без необходимости вручную реализовывать логику отмены.

import java.io.*;
import java.net.URI;
import java.nio.file.*;
import java.util.List;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;

public class MirrorDownloaderDemo {
  void main() throws Exception {
    List<URI> mirrors = List.of(
        URI.create("https://mirror‑a.example.com"),
        URI.create("https://mirror‑b.example.com"),
        URI.create("https://mirror‑c.example.com"));

    Path target = Files.createFile(Path.of("download1.txt"));
    download(target, mirrors);
    System.out.println("Saved to " + target.toAbsolutePath());
  }

  static Path download(Path target, List<URI> mirrors) throws Exception {
    try (var scope = StructuredTaskScope.open(
        StructuredTaskScope.Joiner.<InputStream>anySuccessfulResultOrThrow())) {

      mirrors.forEach(uri -> scope.fork(() -> fetchFromMirror(uri)));
      try (InputStream in = scope.join()) {
        Files.copy(in, target, StandardCopyOption.REPLACE_EXISTING);
      }
      return target;
    }
  }

  private static InputStream fetchFromMirror(URI uri) throws InterruptedException {
    Thread.sleep(50 + new Random().nextInt(300));
    String data = "Downloaded from " + uri + "\n";
    return new ByteArrayInputStream(data.getBytes());
  }
}

Пример 3 — Пакетный генератор миниатюр с вложенными областями

Этап медиапайплайна получает каталог изображений. Внешний скоуп перебирает файлы, а внутренний — для каждого изображения — распараллеливает внутри ещё на три задачи ресайза (маленький, средний и большой размеры). Внутренний скоуп работает по принципу «быстрый отказ»: если один из ресайзов завершается с ошибкой, изображение пропускается, но внешний скуоп и его батчёвая обработка продолжается без прерывания.

Вложенные скоупы позволяют разделить консистентность на уровне отдельного элемента и общую производительность партии — при минимуме кода.

import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.StructuredTaskScope;

public class ThumbnailBatchDemo {
  enum Size {SMALL, MEDIUM, LARGE}

  void main() throws Exception {
    Path tmpDir = Files.createTempDirectory("images");
    for (int i = 0; i < 3; i++) Files.createTempFile(tmpDir, "img" + i, ".jpg");
    processBatch(tmpDir);
  }

  static void processBatch(Path dir) throws IOException, InterruptedException {
    try (var batch = StructuredTaskScope.open()) {
      try (var files = Files.list(dir)) {
        files.filter(Files::isRegularFile)
            .forEach(img -> batch.fork(() -> handleOne(img)));
      }
      batch.join();
    }
  }

  private static void handleOne(Path image) {
    try (var scope = StructuredTaskScope.open(
        StructuredTaskScope.Joiner.<Void>allSuccessfulOrThrow())) {
      scope.fork(() -> resizeAndUpload(image, Size.SMALL));
      scope.fork(() -> resizeAndUpload(image, Size.MEDIUM));
      scope.fork(() -> resizeAndUpload(image, Size.LARGE));
      scope.join();
    } catch (Exception ex) {
      System.err.println("Skipping " + image.getFileName() + ": " + ex);
    }
  }

  private static Void resizeAndUpload(Path image, Size size) throws InterruptedException {
    Thread.sleep(80); // simulate resize
    Thread.sleep(40); // simulate upload
    System.out.println("Uploaded " + image.getFileName() + " [" + size + "]");
    return null;
  }

Пример 4 — Служба котировок в реальном времени с резервным механизмом по таймауту

Трейдинговый UI требует котировку не позднее чем через 30 мс. Пользовательский joiner захватывает первую успешную цену из основного рыночного источника, с таймаутом области в 30 мс. Если источник зависает, scope.join() возвращает пустой результат, и сервис мгновенно переключается на вчерашнюю кэшированную цену закрытия.

Вызывающие стороны всегда получают значение вовремя, а вся логика обработки таймаута выражена одной декларативной строкой.

import java.time.Duration;
import java.util.*;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

public class QuoteServiceDemo {
  void main() throws Exception {
    double q = quote("ACME");
    System.out.printf("Quote for ACME: %.2f%n", q);
  }

  static double quote(String symbol) throws InterruptedException {
    var firstSuccess = new StructuredTaskScope.Joiner<Double, Optional<Double>>() {
      private volatile Double value;

      public boolean onComplete(Subtask<? extends Double> st) {
        if (st.state() == Subtask.State.SUCCESS) value = st.get();
        return value != null;           // stop when we have one
      }

      public Optional<Double> result() {
        return Optional.ofNullable(value);
      }
    };

    try (var scope = StructuredTaskScope.open(firstSuccess,
        cfg -> cfg.withTimeout(Duration.ofMillis(30)))) {
      scope.fork(() -> marketFeed(symbol));
      Optional<Double> latest = scope.join();
      return latest.orElseGet(() -> cache(symbol));
    }
  }

  private static double marketFeed(String symbol) throws InterruptedException {
    long delay = new Random().nextBoolean() ? 20 : 60; // 50 % chance timeout
    Thread.sleep(delay);
    return 100 + new Random().nextDouble();
  }

  //for demo purposes only
  private static double cache(String symbol) {
    return 95.00;
  }
}

Заключительные мысли

Эти изменения представляют значительный этап в развитии API structured concurrency.

Сегодняшняя версия API structured concurrency значительно лучше той, с которой всё начиналось, и я уверен, что она станет прочной основой для многопоточного программирования на Java в будущем.


Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано

Комментарии (4)


  1. medvedmike
    25.07.2025 04:18

    Насчёт MDC там не так все радужно, на самом деле. API ScopedValue подразумевает, что хранимое значение неизменяемо и может быть только переопределено во вложенных скоупах.

    А MDC внутри содержит HashMap, соответственно там между сабтасками может быть неразбериха (если они пишут в MDC).

    Я как-то экспериментировал и писал адаптеры для MDC и SpringSecurity для поддержки StructuredScope и ScopedValues (чтобы код оставался таким же простым как в примерах из JEP) и (не знаю как сейчас, может за год изменилось что) там все равно нужно было писать обёртку, чтобы каждая подзадача получила свою копию MDC, либо гарантировать что в подзадаче никогда не будет вызываться MDC.put, а только та его версия, что использует стек внутри.


    1. Artyomcool
      25.07.2025 04:18

      Со временем все адаптируются, а пока да, будет большое количество краевых случаев.


  1. ris58h
    25.07.2025 04:18

    Выглядит интересно. Хотелось бы, чтобы API был бы более foolproof. Например, пользователь может вызвать fork после join или не вызвать join вообще - в этих случаях будет ошибка. Да - сам виноват, но может быть можно было бы защититься и от этого.


    1. Artyomcool
      25.07.2025 04:18

      По моим ощущениям, баланс сейчас идеален. Ещё более foolproof реализовывать разумно на стороне фреймворков и библиотек.