API structured concurrency в Java наконец-то обрёл устойчивость. В новом переводе от команды Spring АйО подробно рассматриваются последние изменения, появившиеся с выходом JEP 505: фабричный метод open(), политики Joiner'ов, улучшенная отмена задач, дедлайны, передача контекста через ScopedValues и строгая защита от ошибок использования. Всё это делает параллельное программирование в Java более безопасным, читаемым и управляемым.
API structured concurrency вновь изменился — после двух инкубационных этапов и четырёх раундов превью.
В идеале такой сценарий кажется неожиданным.
Однако, учитывая статус API как предварительного, подобные изменения возможны — как и произошло в данном случае.
Эти изменения значительно повлияли на зрелость API, и есть надежда, что теперь он стабилизируется и больше не потребует доработок.
Что на самом деле изменилось на этот раз
Когда я впервые начал работать со structured concurrency ещё в инкубационной фазе, меня вдохновила идея более чистого и понятного concurrent кода.
Идея была проста: рассматривать параллельные задачи как структурированный блок, в котором все порождённые (spawned) задачи завершаются до выхода из блока. В теории звучало идеально, но API продолжал меняться, и за этими изменениями было сложно уследить.
Последняя итерация в JEP 505 привнесла важные усовершенствования, которые, на мой взгляд, наконец-то сделали этот функционал устойчивым. Самое заметное изменение — это введение более гибкой обработки задач и лучшая интеграция с виртуальными потоками. В этой статье я расскажу о различиях и поясню, почему они важны.
Основная концепция остаётся неизменной
Прежде чем перейти к изменениям, давайте вспомним, какую проблему пытается решить structured concurrency. В традиционном конкурентном программировании управление тасками часто оказывается разрозненным:
import java.util.Random;
import java.util.concurrent.*;
public class TraditionalConcurrencyExample {
private static final Random random = new Random();
private static String fetchUserData(String userId) throws InterruptedException {
Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds
if (random.nextBoolean()) {
throw new RuntimeException("User service unavailable");
}
return "UserData[" + userId + "]";
}
private static String fetchUserPreferences(String userId) throws InterruptedException {
Thread.sleep(800 + random.nextInt(1500)); // 0.8-2.3 seconds
if (random.nextBoolean()) {
throw new RuntimeException("Preferences service down");
}
return "Preferences[" + userId + "]";
}
private static String combineUserInfo(String userData, String preferences) {
return userData + " + " + preferences;
}
public static String getUserInfoTraditional(String userId) throws Exception {
try (ExecutorService executor = Executors.newCachedThreadPool()) {
Future<String> future1 = executor.submit(() -> fetchUserData(userId));
Future<String> future2 = executor.submit(() -> fetchUserPreferences(userId));
try {
String userData = future1.get();
String preferences = future2.get();
return combineUserInfo(userData, preferences);
} catch (Exception e) {
// Cleanup is messy - what about the other task?
System.out.println("Error occurred, attempting cleanup...");
future1.cancel(true);
future2.cancel(true);
throw e;
}
}
}
void main() {
for (int i = 0; i < 5; i++) {
try {
System.out.println("Attempt " + (i + 1) + ": " +
getUserInfoTraditional("user123"));
} catch (Exception e) {
System.out.println("Attempt " + (i + 1) + " failed: " +
e.getMessage());
}
System.out.println();
}
}
}
Когда вы запускаете этот код, обычно проявляется несколько проблем:
Сложная обработка ошибок: если одна из задач завершается с ошибкой, приходится вручную отменять вторую. В противном случае она продолжит выполняться, несмотря на то что больше не нужна, что приводит к утечке ресурсов.
Управление жизненным циклом потоков: вы сами несёте ответственность за полный жизненный цикл потоков.
Передача исключений: проверяемые (checked) исключения часто оборачиваются не самым правильным образом.
Отсутствие гарантии очистки: если основной поток завершается неожиданно, задачи могут продолжить выполняться.
Structured concurrency призвана решить эти проблемы.
Главное изменение: статические фабричные методы
Наиболее заметное новшество в JEP 505 — больше не нужно вызывать new StructuredTaskScope<>(). Вместо этого используется метод open():
try (var scope = StructuredTaskScope.open()) {
// ...
}
Метод open() без аргументов возвращает область (scope), которая ожидает успешного завершения всех подзадач либо сбоя хотя бы одной — это политика по умолчанию «все-или-ошибка» (“all-or-fail”). Если требуется более гибкое поведение, используйте перегруженный вариант open(joiner) и передайте пользовательскую политику завершения через Joiner (об этом чуть позже). Почему используется фабрика? Она предоставляет вменяемые значения по умолчанию и, что особенно важно, допускает развитие имплементации без нарушения совместимости с вашим кодом. Я считаю это изменение удачным: использование одного ключевого метода делает код более лаконичным и снижает вероятность ошибок.
Теперь перепишем предыдущий пример с использованием нового API:
public static String getUserInfoTraditional(String userId) throws Exception {
try (var scope = StructuredTaskScope.open()) {
StructuredTaskScope.Subtask<String> task1 = scope.fork(() -> fetchUserData(userId));
StructuredTaskScope.Subtask<String> task2 = scope.fork(() -> fetchUserPreferences(userId));
scope.join();
String userData = task1.get();
String preferences = task2.get();
return combineUserInfo(userData, preferences);
}
}
Разница колоссальна. С использованием structured concurrency очистка выполняется автоматически и гарантированно. Если какая-либо из задач завершается с ошибкой, все остальные задачи в области отменяются. Если область завершается (нормально или с исключением), все ресурсы освобождаются. Это сопоставимо с механизмом try-with-resources — но для параллельных задач.
Такой подход даёт несколько очевидных преимуществ, которые я особенно ценю:
Гарантированная очистка: задачи не могут пережить свою область.
Явное владение: задачи принадлежат конкретной области.
Безопасность при исключениях: сбои обрабатываются последовательно.
Управление ресурсами: управление пулами потоков не требуется.
Композиционность: scope-ы/области можно складывать и комбинировать.
Joiner'ы: задаём политику успеха
Joiner перехватывает события завершения задач и определяет:
следует ли отменять одноуровневые (соседние) задачи,
что должен возвращать метод
join().
JDK включает несколько фабричных помощников для таких случаев:
«Побеждает первая» (также известен как гонка реплик).
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
return scope.join(); // returns first successful String
}
«Все должны завершиться успешно, и мне нужны их результаты»
try (var scope = StructuredTaskScope.open(Joiner.<Result>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join() // Stream<Subtask<Result>>
.map(Subtask::get)
.toList();
}
Эти небольшие помощники упрощают распространённые шаблоны — «гонка» (“race”), «сбор» (“gather”), «ожидание всех» (“wait-for-all”).
Создание собственного Joiner'а
Иногда требуется своя политика. Предположим, я хочу собрать все успешные подзадачи, проигнорировав сбои:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.StructuredTaskScope;
import java.util.stream.Stream;
void main() {
List<String> urls = List.of("https://bazlur.ca", "https://foojay.io", "https://github.com");
try (var scope = StructuredTaskScope.open(new MyCollectingJoiner<String>())) {
urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
List<String> fetchedContent = scope.join().toList();
System.out.println("Total fetched content: " + fetchedContent.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private String fetchFrom(String url) {
return "fetched from " + url + "";
}
class MyCollectingJoiner<T> implements StructuredTaskScope.Joiner<T, Stream<T>> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
@Override
public boolean onComplete(StructuredTaskScope.Subtask<? extends T> st) {
if (st.state() == StructuredTaskScope.Subtask.State.SUCCESS)
results.add(st.get());
return false;
}
@Override
public Stream<T> result() {
return results.stream();
}
}
Интерфейс минимален — onFork, onComplete и result() — но при этом достаточно мощный для большинства пользовательских сценариев. Чтобы запустить этот код, потребуется JDK 25, и его можно выполнить из командной строки с помощью следующей команды:
java --enable-preview CollectingJoiner.java
Комментарий от эксперта команды Spring АйО, Михаила Поливахи
Java 25 GA на момент публикации статьи ещё официально не вышла. Выход 25-ой Java планируется на Сентябрь 2025. Пока есть только early access билды
Улучшенная отмена и дедлайны
Правила отмены по сути не изменились, но API стал строже. Если поток-владелец прерывается до или во время вызова join(), область автоматически отменяет все незавершённые подзадачи. Подзадачи должны корректно обрабатывать InterruptedException; в противном случае close() будет блокироваться в ожидании их завершения. (Если вы используете блокирующий ввод-вывод — всё в порядке; если используете polling — не забудьте проверять Thread.currentThread().isInterrupted().)
Нужен дедлайн? Передайте конфигурационную лямбду:
try (var scope = StructuredTaskScope.open(
Joiner.<String>anySuccessfulResultOrThrow(),
cfg -> cfg.withTimeout(Duration.ofSeconds(2)))) {
// ...
}
Если срабатывает таймаут, scope отменяется, а join() выбрасывает TimeoutException. На практике я привязываю таймаут к каждому внешнему вызову, чтобы держать неуправляемые задачи под контролем.
Также можно заменить фабрику виртуальных потоков по умолчанию на ту, которая задаёт имена или устанавливает thread-local переменные:
ThreadFactory tagged = Thread.ofVirtual().name("api-%d").factory();
try (var scope = StructuredTaskScope.open(
Joiner.<Integer>allSuccessfulOrThrow(),
cfg -> cfg.withThreadFactory(tagged))) {
// ...
}
Одни только имена потоков делают дампы значительно читаемее.
ScopedValues передаются автоматически
Все подзадачи наследуют привязки ScopedValues, установленные в родительском потоке. Это означает, что можно передавать контекст запроса, учетные данные безопасности или информацию MDC без необходимости вручную упаковывать их в каждую лямбду. Опробовав эту возможность, к ThreadLocal возвращаться уже не хочется.
Комментарий от команды Spring АйО
Защита от неправильного использования
StructuredTaskScope строго соблюдает структуру. Если вызвать fork() из потока, отличного от владельца, будет выброшено исключение StructureViolationException. Забыли использовать try-with-resources и позволили scope выйти за пределы метода? Результат тот же. Подход строгий, но эффективно предотвращает случайное истощение ресурсов (по аналогии с «fork-бомбами»).
Комментарий от Михаила Поливахи
Речь про fork-bomb уязвимость, суть которой заключается в том, что вредоносный процесс fork`ается бесконтрольно, потребляя при этом все ресурсы сервера. https://en.wikipedia.org/wiki/Fork_bomb
Улучшения в Observability
Теперь thread дампы включают дерево скоупов, так что инструменты могут напрямую отображать отношения родитель–потомок. Когда я запускаю jcmd <pid> Thread.dump_to_file -format=json, каждый scope отображается со своими forked потоками, вложенными под владельцем. Найти зависшую задачу, блокирующую виртуальный пул, теперь занимает пару секунд с grep — вместо получасового разбора.
Ещё несколько примеров для практики
Пример 1 — 360°-обзор продукта (сбор–затем–ошибка)
Классический endpoint в e-commerce: один HTTP-запрос должен агрегировать основную информацию о продукте, данные о наличии в реальном времени и персонализированную цену. Каждый подсервис вызывается параллельно внутри StructuredTaskScope, где применяется политика «всё или ничего»: любая ошибка или превышение односекундного дедлайна отменяет всю группу и возвращает ошибку вызывающему.
Таймаут области, пользовательские имена потоков и joiner allSuccessfulOrThrow() позволяют выразить то, что обычно требует сложной конфигурации CompletableFuture, всего в трёх декларативных строках.
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ThreadFactory;
public class ThreeSixtyProductView {
record Product(long id, String name) {}
record Stock(long productId, int quantity) {}
record Price(long productId, double amount) {}
record ProductPayload(Product core, Stock stock, Price price) {}
private static Product coreApi(long id) throws InterruptedException {
Thread.sleep(100); // simulate latency
return new Product(id, "Gadget‑" + id);
}
private static Stock stockApi(long id) throws InterruptedException {
Thread.sleep(120);
return new Stock(id, new Random().nextInt(100));
}
private static Price priceApi(long id) throws InterruptedException {
Thread.sleep(150);
return new Price(id, 99.99);
}
static ProductPayload fetchProduct(long id) throws Exception {
ThreadFactory named = Thread.ofVirtual().name("prod-%d", 1).factory();
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<Object>allSuccessfulOrThrow(),
cfg -> cfg.withTimeout(Duration.ofSeconds(1))
.withThreadFactory(named))) {
StructuredTaskScope.Subtask<Product> core = scope.fork(() -> coreApi(id));
StructuredTaskScope.Subtask<Stock> stock = scope.fork(() -> stockApi(id));
StructuredTaskScope.Subtask<Price> price = scope.fork(() -> priceApi(id));
scope.join(); // throws on first failure / timeout
return new ProductPayload(core.get(), stock.get(), price.get());
}
}
void main() throws Exception {
ProductPayload productPayload = fetchProduct(1L);
System.out.println(productPayload);
}
}
Пример 2 — «Гонка зеркал» для загрузки файлов
Крупные бинарные файлы размещаются на нескольких зеркалах CDN. Поскольку задержки различаются, запросы отправляются на все зеркала одновременно, а Joiner.anySuccessfulResultOrThrow() используется для выбора первого успешно возвращённого InputStream, при этом все остальные задачи отменяются.
Пропускная способность и соединения освобождаются мгновенно, а пользователи получают максимально быструю загрузку — без необходимости вручную реализовывать логику отмены.
import java.io.*;
import java.net.URI;
import java.nio.file.*;
import java.util.List;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;
public class MirrorDownloaderDemo {
void main() throws Exception {
List<URI> mirrors = List.of(
URI.create("https://mirror‑a.example.com"),
URI.create("https://mirror‑b.example.com"),
URI.create("https://mirror‑c.example.com"));
Path target = Files.createFile(Path.of("download1.txt"));
download(target, mirrors);
System.out.println("Saved to " + target.toAbsolutePath());
}
static Path download(Path target, List<URI> mirrors) throws Exception {
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<InputStream>anySuccessfulResultOrThrow())) {
mirrors.forEach(uri -> scope.fork(() -> fetchFromMirror(uri)));
try (InputStream in = scope.join()) {
Files.copy(in, target, StandardCopyOption.REPLACE_EXISTING);
}
return target;
}
}
private static InputStream fetchFromMirror(URI uri) throws InterruptedException {
Thread.sleep(50 + new Random().nextInt(300));
String data = "Downloaded from " + uri + "\n";
return new ByteArrayInputStream(data.getBytes());
}
}
Пример 3 — Пакетный генератор миниатюр с вложенными областями
Этап медиапайплайна получает каталог изображений. Внешний скоуп перебирает файлы, а внутренний — для каждого изображения — распараллеливает внутри ещё на три задачи ресайза (маленький, средний и большой размеры). Внутренний скоуп работает по принципу «быстрый отказ»: если один из ресайзов завершается с ошибкой, изображение пропускается, но внешний скуоп и его батчёвая обработка продолжается без прерывания.
Вложенные скоупы позволяют разделить консистентность на уровне отдельного элемента и общую производительность партии — при минимуме кода.
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.StructuredTaskScope;
public class ThumbnailBatchDemo {
enum Size {SMALL, MEDIUM, LARGE}
void main() throws Exception {
Path tmpDir = Files.createTempDirectory("images");
for (int i = 0; i < 3; i++) Files.createTempFile(tmpDir, "img" + i, ".jpg");
processBatch(tmpDir);
}
static void processBatch(Path dir) throws IOException, InterruptedException {
try (var batch = StructuredTaskScope.open()) {
try (var files = Files.list(dir)) {
files.filter(Files::isRegularFile)
.forEach(img -> batch.fork(() -> handleOne(img)));
}
batch.join();
}
}
private static void handleOne(Path image) {
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<Void>allSuccessfulOrThrow())) {
scope.fork(() -> resizeAndUpload(image, Size.SMALL));
scope.fork(() -> resizeAndUpload(image, Size.MEDIUM));
scope.fork(() -> resizeAndUpload(image, Size.LARGE));
scope.join();
} catch (Exception ex) {
System.err.println("Skipping " + image.getFileName() + ": " + ex);
}
}
private static Void resizeAndUpload(Path image, Size size) throws InterruptedException {
Thread.sleep(80); // simulate resize
Thread.sleep(40); // simulate upload
System.out.println("Uploaded " + image.getFileName() + " [" + size + "]");
return null;
}
Пример 4 — Служба котировок в реальном времени с резервным механизмом по таймауту
Трейдинговый UI требует котировку не позднее чем через 30 мс. Пользовательский joiner захватывает первую успешную цену из основного рыночного источника, с таймаутом области в 30 мс. Если источник зависает, scope.join() возвращает пустой результат, и сервис мгновенно переключается на вчерашнюю кэшированную цену закрытия.
Вызывающие стороны всегда получают значение вовремя, а вся логика обработки таймаута выражена одной декларативной строкой.
import java.time.Duration;
import java.util.*;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
public class QuoteServiceDemo {
void main() throws Exception {
double q = quote("ACME");
System.out.printf("Quote for ACME: %.2f%n", q);
}
static double quote(String symbol) throws InterruptedException {
var firstSuccess = new StructuredTaskScope.Joiner<Double, Optional<Double>>() {
private volatile Double value;
public boolean onComplete(Subtask<? extends Double> st) {
if (st.state() == Subtask.State.SUCCESS) value = st.get();
return value != null; // stop when we have one
}
public Optional<Double> result() {
return Optional.ofNullable(value);
}
};
try (var scope = StructuredTaskScope.open(firstSuccess,
cfg -> cfg.withTimeout(Duration.ofMillis(30)))) {
scope.fork(() -> marketFeed(symbol));
Optional<Double> latest = scope.join();
return latest.orElseGet(() -> cache(symbol));
}
}
private static double marketFeed(String symbol) throws InterruptedException {
long delay = new Random().nextBoolean() ? 20 : 60; // 50 % chance timeout
Thread.sleep(delay);
return 100 + new Random().nextDouble();
}
//for demo purposes only
private static double cache(String symbol) {
return 95.00;
}
}
Заключительные мысли
Эти изменения представляют значительный этап в развитии API structured concurrency.
Сегодняшняя версия API structured concurrency значительно лучше той, с которой всё начиналось, и я уверен, что она станет прочной основой для многопоточного программирования на Java в будущем.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано
Комментарии (4)

ris58h
25.07.2025 04:18Выглядит интересно. Хотелось бы, чтобы API был бы более foolproof. Например, пользователь может вызвать
forkпослеjoinили не вызватьjoinвообще - в этих случаях будет ошибка. Да - сам виноват, но может быть можно было бы защититься и от этого.
Artyomcool
25.07.2025 04:18По моим ощущениям, баланс сейчас идеален. Ещё более foolproof реализовывать разумно на стороне фреймворков и библиотек.
medvedmike
Насчёт MDC там не так все радужно, на самом деле. API ScopedValue подразумевает, что хранимое значение неизменяемо и может быть только переопределено во вложенных скоупах.
А MDC внутри содержит HashMap, соответственно там между сабтасками может быть неразбериха (если они пишут в MDC).
Я как-то экспериментировал и писал адаптеры для MDC и SpringSecurity для поддержки StructuredScope и ScopedValues (чтобы код оставался таким же простым как в примерах из JEP) и (не знаю как сейчас, может за год изменилось что) там все равно нужно было писать обёртку, чтобы каждая подзадача получила свою копию MDC, либо гарантировать что в подзадаче никогда не будет вызываться MDC.put, а только та его версия, что использует стек внутри.
Artyomcool
Со временем все адаптируются, а пока да, будет большое количество краевых случаев.