API structured concurrency в Java наконец-то обрёл устойчивость. В новом переводе от команды Spring АйО подробно рассматриваются последние изменения, появившиеся с выходом JEP 505: фабричный метод open()
, политики Joiner'ов, улучшенная отмена задач, дедлайны, передача контекста через ScopedValues
и строгая защита от ошибок использования. Всё это делает параллельное программирование в Java более безопасным, читаемым и управляемым.
API structured concurrency вновь изменился — после двух инкубационных этапов и четырёх раундов превью.
В идеале такой сценарий кажется неожиданным.
Однако, учитывая статус API как предварительного, подобные изменения возможны — как и произошло в данном случае.
Эти изменения значительно повлияли на зрелость API, и есть надежда, что теперь он стабилизируется и больше не потребует доработок.
Что на самом деле изменилось на этот раз
Когда я впервые начал работать со structured concurrency ещё в инкубационной фазе, меня вдохновила идея более чистого и понятного concurrent кода.
Идея была проста: рассматривать параллельные задачи как структурированный блок, в котором все порождённые (spawned) задачи завершаются до выхода из блока. В теории звучало идеально, но API продолжал меняться, и за этими изменениями было сложно уследить.
Последняя итерация в JEP 505 привнесла важные усовершенствования, которые, на мой взгляд, наконец-то сделали этот функционал устойчивым. Самое заметное изменение — это введение более гибкой обработки задач и лучшая интеграция с виртуальными потоками. В этой статье я расскажу о различиях и поясню, почему они важны.
Основная концепция остаётся неизменной
Прежде чем перейти к изменениям, давайте вспомним, какую проблему пытается решить structured concurrency. В традиционном конкурентном программировании управление тасками часто оказывается разрозненным:
import java.util.Random;
import java.util.concurrent.*;
public class TraditionalConcurrencyExample {
private static final Random random = new Random();
private static String fetchUserData(String userId) throws InterruptedException {
Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds
if (random.nextBoolean()) {
throw new RuntimeException("User service unavailable");
}
return "UserData[" + userId + "]";
}
private static String fetchUserPreferences(String userId) throws InterruptedException {
Thread.sleep(800 + random.nextInt(1500)); // 0.8-2.3 seconds
if (random.nextBoolean()) {
throw new RuntimeException("Preferences service down");
}
return "Preferences[" + userId + "]";
}
private static String combineUserInfo(String userData, String preferences) {
return userData + " + " + preferences;
}
public static String getUserInfoTraditional(String userId) throws Exception {
try (ExecutorService executor = Executors.newCachedThreadPool()) {
Future<String> future1 = executor.submit(() -> fetchUserData(userId));
Future<String> future2 = executor.submit(() -> fetchUserPreferences(userId));
try {
String userData = future1.get();
String preferences = future2.get();
return combineUserInfo(userData, preferences);
} catch (Exception e) {
// Cleanup is messy - what about the other task?
System.out.println("Error occurred, attempting cleanup...");
future1.cancel(true);
future2.cancel(true);
throw e;
}
}
}
void main() {
for (int i = 0; i < 5; i++) {
try {
System.out.println("Attempt " + (i + 1) + ": " +
getUserInfoTraditional("user123"));
} catch (Exception e) {
System.out.println("Attempt " + (i + 1) + " failed: " +
e.getMessage());
}
System.out.println();
}
}
}
Когда вы запускаете этот код, обычно проявляется несколько проблем:
Сложная обработка ошибок: если одна из задач завершается с ошибкой, приходится вручную отменять вторую. В противном случае она продолжит выполняться, несмотря на то что больше не нужна, что приводит к утечке ресурсов.
Управление жизненным циклом потоков: вы сами несёте ответственность за полный жизненный цикл потоков.
Передача исключений: проверяемые (checked) исключения часто оборачиваются не самым правильным образом.
Отсутствие гарантии очистки: если основной поток завершается неожиданно, задачи могут продолжить выполняться.
Structured concurrency призвана решить эти проблемы.
Главное изменение: статические фабричные методы
Наиболее заметное новшество в JEP 505 — больше не нужно вызывать new StructuredTaskScope<>()
. Вместо этого используется метод open()
:
try (var scope = StructuredTaskScope.open()) {
// ...
}
Метод open()
без аргументов возвращает область (scope), которая ожидает успешного завершения всех подзадач либо сбоя хотя бы одной — это политика по умолчанию «все-или-ошибка» (“all-or-fail”). Если требуется более гибкое поведение, используйте перегруженный вариант open(joiner) и передайте пользовательскую политику завершения через Joiner (об этом чуть позже). Почему используется фабрика? Она предоставляет вменяемые значения по умолчанию и, что особенно важно, допускает развитие имплементации без нарушения совместимости с вашим кодом. Я считаю это изменение удачным: использование одного ключевого метода делает код более лаконичным и снижает вероятность ошибок.
Теперь перепишем предыдущий пример с использованием нового API:
public static String getUserInfoTraditional(String userId) throws Exception {
try (var scope = StructuredTaskScope.open()) {
StructuredTaskScope.Subtask<String> task1 = scope.fork(() -> fetchUserData(userId));
StructuredTaskScope.Subtask<String> task2 = scope.fork(() -> fetchUserPreferences(userId));
scope.join();
String userData = task1.get();
String preferences = task2.get();
return combineUserInfo(userData, preferences);
}
}
Разница колоссальна. С использованием structured concurrency очистка выполняется автоматически и гарантированно. Если какая-либо из задач завершается с ошибкой, все остальные задачи в области отменяются. Если область завершается (нормально или с исключением), все ресурсы освобождаются. Это сопоставимо с механизмом try-with-resources — но для параллельных задач.
Такой подход даёт несколько очевидных преимуществ, которые я особенно ценю:
Гарантированная очистка: задачи не могут пережить свою область.
Явное владение: задачи принадлежат конкретной области.
Безопасность при исключениях: сбои обрабатываются последовательно.
Управление ресурсами: управление пулами потоков не требуется.
Композиционность: scope-ы/области можно складывать и комбинировать.
Joiner'ы: задаём политику успеха
Joiner перехватывает события завершения задач и определяет:
следует ли отменять одноуровневые (соседние) задачи,
что должен возвращать метод
join()
.
JDK включает несколько фабричных помощников для таких случаев:
«Побеждает первая» (также известен как гонка реплик).
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
return scope.join(); // returns first successful String
}
«Все должны завершиться успешно, и мне нужны их результаты»
try (var scope = StructuredTaskScope.open(Joiner.<Result>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join() // Stream<Subtask<Result>>
.map(Subtask::get)
.toList();
}
Эти небольшие помощники упрощают распространённые шаблоны — «гонка» (“race”), «сбор» (“gather”), «ожидание всех» (“wait-for-all”).
Создание собственного Joiner'а
Иногда требуется своя политика. Предположим, я хочу собрать все успешные подзадачи, проигнорировав сбои:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.StructuredTaskScope;
import java.util.stream.Stream;
void main() {
List<String> urls = List.of("https://bazlur.ca", "https://foojay.io", "https://github.com");
try (var scope = StructuredTaskScope.open(new MyCollectingJoiner<String>())) {
urls.forEach(url -> scope.fork(() -> fetchFrom(url)));
List<String> fetchedContent = scope.join().toList();
System.out.println("Total fetched content: " + fetchedContent.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private String fetchFrom(String url) {
return "fetched from " + url + "";
}
class MyCollectingJoiner<T> implements StructuredTaskScope.Joiner<T, Stream<T>> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
@Override
public boolean onComplete(StructuredTaskScope.Subtask<? extends T> st) {
if (st.state() == StructuredTaskScope.Subtask.State.SUCCESS)
results.add(st.get());
return false;
}
@Override
public Stream<T> result() {
return results.stream();
}
}
Интерфейс минимален — onFork
, onComplete
и result()
— но при этом достаточно мощный для большинства пользовательских сценариев. Чтобы запустить этот код, потребуется JDK 25, и его можно выполнить из командной строки с помощью следующей команды:
java --enable-preview CollectingJoiner.java
Комментарий от эксперта команды Spring АйО, Михаила Поливахи
Java 25 GA на момент публикации статьи ещё официально не вышла. Выход 25-ой Java планируется на Сентябрь 2025. Пока есть только early access билды
Улучшенная отмена и дедлайны
Правила отмены по сути не изменились, но API стал строже. Если поток-владелец прерывается до или во время вызова join()
, область автоматически отменяет все незавершённые подзадачи. Подзадачи должны корректно обрабатывать InterruptedException;
в противном случае close()
будет блокироваться в ожидании их завершения. (Если вы используете блокирующий ввод-вывод — всё в порядке; если используете polling — не забудьте проверять Thread.currentThread().isInterrupted().
)
Нужен дедлайн? Передайте конфигурационную лямбду:
try (var scope = StructuredTaskScope.open(
Joiner.<String>anySuccessfulResultOrThrow(),
cfg -> cfg.withTimeout(Duration.ofSeconds(2)))) {
// ...
}
Если срабатывает таймаут, scope отменяется, а join()
выбрасывает TimeoutException
. На практике я привязываю таймаут к каждому внешнему вызову, чтобы держать неуправляемые задачи под контролем.
Также можно заменить фабрику виртуальных потоков по умолчанию на ту, которая задаёт имена или устанавливает thread-local переменные:
ThreadFactory tagged = Thread.ofVirtual().name("api-%d").factory();
try (var scope = StructuredTaskScope.open(
Joiner.<Integer>allSuccessfulOrThrow(),
cfg -> cfg.withThreadFactory(tagged))) {
// ...
}
Одни только имена потоков делают дампы значительно читаемее.
ScopedValues передаются автоматически
Все подзадачи наследуют привязки ScopedValues
, установленные в родительском потоке. Это означает, что можно передавать контекст запроса, учетные данные безопасности или информацию MDC без необходимости вручную упаковывать их в каждую лямбду. Опробовав эту возможность, к ThreadLocal
возвращаться уже не хочется.
Комментарий от команды Spring АйО
Защита от неправильного использования
StructuredTaskScope
строго соблюдает структуру. Если вызвать fork()
из потока, отличного от владельца, будет выброшено исключение StructureViolationException
. Забыли использовать try-with-resources
и позволили scope выйти за пределы метода? Результат тот же. Подход строгий, но эффективно предотвращает случайное истощение ресурсов (по аналогии с «fork-бомбами»).
Комментарий от Михаила Поливахи
Речь про fork-bomb уязвимость, суть которой заключается в том, что вредоносный процесс fork`ается бесконтрольно, потребляя при этом все ресурсы сервера. https://en.wikipedia.org/wiki/Fork_bomb
Улучшения в Observability
Теперь thread дампы включают дерево скоупов, так что инструменты могут напрямую отображать отношения родитель–потомок. Когда я запускаю jcmd <pid> Thread.dump_to_file -format=json
, каждый scope отображается со своими forked потоками, вложенными под владельцем. Найти зависшую задачу, блокирующую виртуальный пул, теперь занимает пару секунд с grep — вместо получасового разбора.
Ещё несколько примеров для практики
Пример 1 — 360°-обзор продукта (сбор–затем–ошибка)
Классический endpoint в e-commerce: один HTTP-запрос должен агрегировать основную информацию о продукте, данные о наличии в реальном времени и персонализированную цену. Каждый подсервис вызывается параллельно внутри StructuredTaskScope
, где применяется политика «всё или ничего»: любая ошибка или превышение односекундного дедлайна отменяет всю группу и возвращает ошибку вызывающему.
Таймаут области, пользовательские имена потоков и joiner allSuccessfulOrThrow()
позволяют выразить то, что обычно требует сложной конфигурации CompletableFuture
, всего в трёх декларативных строках.
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ThreadFactory;
public class ThreeSixtyProductView {
record Product(long id, String name) {}
record Stock(long productId, int quantity) {}
record Price(long productId, double amount) {}
record ProductPayload(Product core, Stock stock, Price price) {}
private static Product coreApi(long id) throws InterruptedException {
Thread.sleep(100); // simulate latency
return new Product(id, "Gadget‑" + id);
}
private static Stock stockApi(long id) throws InterruptedException {
Thread.sleep(120);
return new Stock(id, new Random().nextInt(100));
}
private static Price priceApi(long id) throws InterruptedException {
Thread.sleep(150);
return new Price(id, 99.99);
}
static ProductPayload fetchProduct(long id) throws Exception {
ThreadFactory named = Thread.ofVirtual().name("prod-%d", 1).factory();
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<Object>allSuccessfulOrThrow(),
cfg -> cfg.withTimeout(Duration.ofSeconds(1))
.withThreadFactory(named))) {
StructuredTaskScope.Subtask<Product> core = scope.fork(() -> coreApi(id));
StructuredTaskScope.Subtask<Stock> stock = scope.fork(() -> stockApi(id));
StructuredTaskScope.Subtask<Price> price = scope.fork(() -> priceApi(id));
scope.join(); // throws on first failure / timeout
return new ProductPayload(core.get(), stock.get(), price.get());
}
}
void main() throws Exception {
ProductPayload productPayload = fetchProduct(1L);
System.out.println(productPayload);
}
}
Пример 2 — «Гонка зеркал» для загрузки файлов
Крупные бинарные файлы размещаются на нескольких зеркалах CDN. Поскольку задержки различаются, запросы отправляются на все зеркала одновременно, а Joiner.anySuccessfulResultOrThrow()
используется для выбора первого успешно возвращённого InputStream
, при этом все остальные задачи отменяются.
Пропускная способность и соединения освобождаются мгновенно, а пользователи получают максимально быструю загрузку — без необходимости вручную реализовывать логику отмены.
import java.io.*;
import java.net.URI;
import java.nio.file.*;
import java.util.List;
import java.util.Random;
import java.util.concurrent.StructuredTaskScope;
public class MirrorDownloaderDemo {
void main() throws Exception {
List<URI> mirrors = List.of(
URI.create("https://mirror‑a.example.com"),
URI.create("https://mirror‑b.example.com"),
URI.create("https://mirror‑c.example.com"));
Path target = Files.createFile(Path.of("download1.txt"));
download(target, mirrors);
System.out.println("Saved to " + target.toAbsolutePath());
}
static Path download(Path target, List<URI> mirrors) throws Exception {
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<InputStream>anySuccessfulResultOrThrow())) {
mirrors.forEach(uri -> scope.fork(() -> fetchFromMirror(uri)));
try (InputStream in = scope.join()) {
Files.copy(in, target, StandardCopyOption.REPLACE_EXISTING);
}
return target;
}
}
private static InputStream fetchFromMirror(URI uri) throws InterruptedException {
Thread.sleep(50 + new Random().nextInt(300));
String data = "Downloaded from " + uri + "\n";
return new ByteArrayInputStream(data.getBytes());
}
}
Пример 3 — Пакетный генератор миниатюр с вложенными областями
Этап медиапайплайна получает каталог изображений. Внешний скоуп перебирает файлы, а внутренний — для каждого изображения — распараллеливает внутри ещё на три задачи ресайза (маленький, средний и большой размеры). Внутренний скоуп работает по принципу «быстрый отказ»: если один из ресайзов завершается с ошибкой, изображение пропускается, но внешний скуоп и его батчёвая обработка продолжается без прерывания.
Вложенные скоупы позволяют разделить консистентность на уровне отдельного элемента и общую производительность партии — при минимуме кода.
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.StructuredTaskScope;
public class ThumbnailBatchDemo {
enum Size {SMALL, MEDIUM, LARGE}
void main() throws Exception {
Path tmpDir = Files.createTempDirectory("images");
for (int i = 0; i < 3; i++) Files.createTempFile(tmpDir, "img" + i, ".jpg");
processBatch(tmpDir);
}
static void processBatch(Path dir) throws IOException, InterruptedException {
try (var batch = StructuredTaskScope.open()) {
try (var files = Files.list(dir)) {
files.filter(Files::isRegularFile)
.forEach(img -> batch.fork(() -> handleOne(img)));
}
batch.join();
}
}
private static void handleOne(Path image) {
try (var scope = StructuredTaskScope.open(
StructuredTaskScope.Joiner.<Void>allSuccessfulOrThrow())) {
scope.fork(() -> resizeAndUpload(image, Size.SMALL));
scope.fork(() -> resizeAndUpload(image, Size.MEDIUM));
scope.fork(() -> resizeAndUpload(image, Size.LARGE));
scope.join();
} catch (Exception ex) {
System.err.println("Skipping " + image.getFileName() + ": " + ex);
}
}
private static Void resizeAndUpload(Path image, Size size) throws InterruptedException {
Thread.sleep(80); // simulate resize
Thread.sleep(40); // simulate upload
System.out.println("Uploaded " + image.getFileName() + " [" + size + "]");
return null;
}
Пример 4 — Служба котировок в реальном времени с резервным механизмом по таймауту
Трейдинговый UI требует котировку не позднее чем через 30 мс. Пользовательский joiner захватывает первую успешную цену из основного рыночного источника, с таймаутом области в 30 мс. Если источник зависает, scope.join()
возвращает пустой результат, и сервис мгновенно переключается на вчерашнюю кэшированную цену закрытия.
Вызывающие стороны всегда получают значение вовремя, а вся логика обработки таймаута выражена одной декларативной строкой.
import java.time.Duration;
import java.util.*;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
public class QuoteServiceDemo {
void main() throws Exception {
double q = quote("ACME");
System.out.printf("Quote for ACME: %.2f%n", q);
}
static double quote(String symbol) throws InterruptedException {
var firstSuccess = new StructuredTaskScope.Joiner<Double, Optional<Double>>() {
private volatile Double value;
public boolean onComplete(Subtask<? extends Double> st) {
if (st.state() == Subtask.State.SUCCESS) value = st.get();
return value != null; // stop when we have one
}
public Optional<Double> result() {
return Optional.ofNullable(value);
}
};
try (var scope = StructuredTaskScope.open(firstSuccess,
cfg -> cfg.withTimeout(Duration.ofMillis(30)))) {
scope.fork(() -> marketFeed(symbol));
Optional<Double> latest = scope.join();
return latest.orElseGet(() -> cache(symbol));
}
}
private static double marketFeed(String symbol) throws InterruptedException {
long delay = new Random().nextBoolean() ? 20 : 60; // 50 % chance timeout
Thread.sleep(delay);
return 100 + new Random().nextDouble();
}
//for demo purposes only
private static double cache(String symbol) {
return 95.00;
}
}
Заключительные мысли
Эти изменения представляют значительный этап в развитии API structured concurrency.
Сегодняшняя версия API structured concurrency значительно лучше той, с которой всё начиналось, и я уверен, что она станет прочной основой для многопоточного программирования на Java в будущем.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано
Комментарии (4)
ris58h
25.07.2025 04:18Выглядит интересно. Хотелось бы, чтобы API был бы более foolproof. Например, пользователь может вызвать
fork
послеjoin
или не вызватьjoin
вообще - в этих случаях будет ошибка. Да - сам виноват, но может быть можно было бы защититься и от этого.Artyomcool
25.07.2025 04:18По моим ощущениям, баланс сейчас идеален. Ещё более foolproof реализовывать разумно на стороне фреймворков и библиотек.
medvedmike
Насчёт MDC там не так все радужно, на самом деле. API ScopedValue подразумевает, что хранимое значение неизменяемо и может быть только переопределено во вложенных скоупах.
А MDC внутри содержит HashMap, соответственно там между сабтасками может быть неразбериха (если они пишут в MDC).
Я как-то экспериментировал и писал адаптеры для MDC и SpringSecurity для поддержки StructuredScope и ScopedValues (чтобы код оставался таким же простым как в примерах из JEP) и (не знаю как сейчас, может за год изменилось что) там все равно нужно было писать обёртку, чтобы каждая подзадача получила свою копию MDC, либо гарантировать что в подзадаче никогда не будет вызываться MDC.put, а только та его версия, что использует стек внутри.
Artyomcool
Со временем все адаптируются, а пока да, будет большое количество краевых случаев.