CompletableFuture
, который позволяет удобно писать асинхронный код.При использовании
CompletableFuture
из нескольких потоков я столкнулся с его неочевидным поведением, а именно с тем, что callbacks на нём могут выполнятся совсем не в тех потоках, как ожидалось. Об этом и о том, как мне удалось решить проблему — я и расскажу в этой статье.Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу, который использовал потоконебезопасные структуры данных. Тесты проходили без проблем, но benchmarks иногда падали c
ConcurrentModificationException
на внутренних структурах однопоточного клиента.Асинхронность в клиенте реализовывалась с использованием
CompletableFuture
, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor
).Фрагмент кода клиента с методом
get
, который доступен пользователям://ожидающие завершения запросы
private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
public CompletableFuture<String> get(String key) {
CompletableFuture<String> future = new CompletableFuture<>();
//передаём задачу на выполнение в поток клиента
singleThreadExecutor.execute(() -> {
//добавляем future в список ожидающих завершения
pendingFutures.add(future);
future.whenComplete((v, e) -> {
//когда future завершится удаляем его из списка ожидающих
pendingFutures.remove(future);
});
//тут был код передающий запрос на сервер и получающий ответ от сервера
//в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
});
return future;
}
Оказалось, что так делать нельзя.
Возможно, я узнал бы об этом раньше, если бы внимательно прочитал javadoc для
CompletableFuture
.При использовании такой архитектуры необходимо, чтобы все callbacks у
CompletableFuture
вызывались в том же потоке, который делает CompletableFuture.complete
.По указанному выше коду, вроде бы, так и происходит. Но benchmarks иногда завершались с
ConcurrentModificationException
в коде, который перебирал pendingFutures
в том же потоке клиента (singleThreadExecutor
).Дело в том, что callback, передаваемый в
future.whenComplete
(который вызывает pendingFutures.remove
), иногда выполняется совсем в другом потоке. А точнее в потоке приложения, которое пользуется моим клиентом:Client client = new Client("127.0.0.1", 8080);
CompletableFuture<String> result = client.get(key);
result.thenAccept(data -> {
System.out.println(data);
});
Вызов
result.thenAccept
в этом приложении приводит иногда к вызову остальных callbacks на future, которые были добавлены внутри самого кода клиента.Рассмотрим проблему на простых примерах
Thread mainThread = Thread.currentThread();
CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
future.complete(null);
Такой код всегда выводит на экран
true
, так как callback выполняется в том же потоке, что и метод complete.Но если к
CompletableFuture
будет хотя бы одно обращение из другого потока, то поведение может измениться://основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread)
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
future.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
Такой код может иногда выдавать
false
.Дело в том, что вызов
thenRun
у того же future, но во втором потоке, может привести к срабатыванию callback в первом thenRun
. При этом callback первого thenRun
будет вызван во втором потоке.Это происходит в тот момент, когда
future.complete(null)
начало выполняться, но ещё не успело вызвать callbacks, а во втором потоке вызвался thenRun
, который и выполнит все остальные callbacks на этом future но уже в своём потоке.Проблем решается просто:
//основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
secondThreadFuture.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
Мы просто добавили secondThreadFuture, которая зависит от результата исходной future. И вызов на ней
thenRun
во втором потоке не приводит к возможному срабатыванию callbacks на исходной future.Для гарантированного вызова callbacks в заданных пользователем потоках у
CompletableFuture
существуют async реализации методов, например — thenRunAsync
, которым нужно передавать Executor. Но async-версии методов могут работать медленней, чем обычные. Поэтому, я не хотел лишний раз их использовать.Вывод
Вывод, который я сделал для себя: не использовать один объект
CompletableFuture
в нескольких потоках, если необходимо быть уверенным, что все callbacks на нём выполняются в заданном потоке. А если использовать несколько потоков с одним CompletableFuture необходимо — то достаточно передавать в другой поток не оригинальный CompletableFuture
, а новый, который будет зависеть от исходного. Например, вот так:CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
//nop
});
Комментарии (53)
linfro
09.04.2017 00:06Используйте ListenableFuture из guava и забудьте об этих проблемах. Там можно явно указать executor, в котором должен выполняться listener. Да и в целом у ListenableFuture лучше продуман публичный API.
sshikov
09.04.2017 21:52+1С какой стати забудьте? Указание executor вовсе не избавляет от потенциальных проблем. Оно просто гарантирует (а точнее позволяет гарантировать), в каком потоке вызовется listener. Кстати его и тут можно указать, о чем в посте было написано — и в комментах уже кажется много раз.
Налажать при этом все равно никто не помешает — и это явно написано в javadoc у ListenableFuture в одном из первых абзацев.
linfro
09.04.2017 23:07+1Я ровно об этом и пытаюсь сказать. Вот цитата автора:
"
Асинхронность в клиенте реализовывалась с использованием CompletableFuture, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor)."
Автор гарантирует отсутствие гонок в неком потоконебезопасном коде выполняя его в single thread executor, но получает гонки, обращайтесь к этому самому небезопасному коду из колбэков фучи. Для решения этой проблемы существует стандартный механизм в ListenableFuture (и, как тут правильно заметили в CallableFuture и в Scala) — при добавлении колбэка указать executor, в котором он будет выполнен. Т.е. для решения проблемы автора ему было достаточно при добавлении колбэка указать тот самый single thread executor, в котором он выполняет свой небезопасный код.sshikov
10.04.2017 09:45+1Да, в такой формулировке я не спорю, согласен.
Просто вы так выразились "забудьте об этих проблемах", очень уж обобщенно ))) С многопоточностью вообще не стоит так обобщать. Где не гонки — там дедлоки )))
linfro
10.04.2017 11:05Да, дедлок тут очень легко схлопотать. Достаточно в том же самом single thread executor сказать future.get();
Googolplex
09.04.2017 01:19+2Интересно, что в Future'ах скалы это проблема решается тем, что все методы, которые принимают коллбэки, также принимают параметром executor, в котором коллбэк должен выполниться, поэтому всегда можно указать нужный executor и сериализовать обращения к мутабельному состоянию. Хотя для этого лучше конечно использовать библиотеки вроде Akka.
Chemist
09.04.2017 09:39В методы CompletableFuture тоже можно передавать executor, в конце статьи это описано.
osmanpasha
09.04.2017 09:34+1Я правильно понимаю, что ваша изначальная проблема решилась бы
- либо синхронизацией доступа к pendingFutures и другим внутренним объектам библиоткеи,
- либо переносом строчки pendingFutures.remove(future); из колбэка сразу за строчку future.complete(data);? Тогда добавление и удаление точно происходили бы в singleThreadExecutor.
Lailore
09.04.2017 17:30+1Эм, в java сложно сделать полноценные async/await как в c#?
sshikov
09.04.2017 20:36А чем вам этот вариант CompletableFuture неполноценный?
Lailore
09.04.2017 22:30+1Хм, тем что код имеет лишную вложенность. И как например с помощью java запустить сразу несколько задач в разных потоках и работать дальше с самым быстрым результатом, но обработать ошибки всех задач?
sshikov
10.04.2017 09:49Да ладно, где тут лишняя?
Сразу несколько задач в разных потоках — никаких проблем. ExecutorService это ровно для этого. Только надо выбрать многопоточный. Я только вот что хочу сказать — async/await в C# — это синтаксический сахар, на уровне языка, такого вы конечно в Java самостоятельно не сделаете. А во всем остальном то что есть сейчас — вполне адекватно задачам. Местами может быть есть оверинжиниринг, не без этого, но все совершенно полноценное.
Lailore
10.04.2017 10:06Так тут это новшество в языке, как я понял. Просто у меня вопрос почему в новой версии языка не сделать сразу хорошо и с сахором?
sshikov
10.04.2017 19:21Новшество конечно… в 8-ке. Только там помимо этого были еще и лямбды и стримы, коотрые сами по себе такое большое новшество, что релиз откладывали несколько раз. Я думаю просто не успели.
Можно кстати хоть на код этого CF посмотреть — на мой взгляд, там такой ад и ужас, в отличие от многих других классов. Наводит на мысли, что торопились слегка.
А насчет сразу хорошо… ну не знаю, я бы не хотел, чтобы синтаксис языка меняли из-за того, что можно сделать библиотекой. В большинстве случаев это получается более гибко. И сначала надо понять, какие кейсы все-таки больше нужны, и начинать делать с них. Вот лямбды, несмотря на все споры вокруг них, активно пошли в народ, и просто невооруженным взглядом видно, насколько стал меняться стиль во многих фреймворках, в сторону упрощения причем.
mayorovp
10.04.2017 17:13Вот тут лишняя:
pendingFutures.add(future); future.whenComplete((v, e) -> { pendingFutures.remove(future); });
Сравните:
pendingFutures.add(future); await future; pendingFutures.remove(future);
sshikov
10.04.2017 19:15Это не накладные расходы нифига. Это последствия того, что у CF много разных методов. Для разных же целей. Сделать обертку, которая тупо ждет, и ничего не делает — раз плюнуть. И будет одинаково.
Lailore
10.04.2017 20:44sshikov
10.04.2017 21:58Эта статься — очень сильное упрощение. Ну вот например:
Всё дело в том, что не нужно писать .then, создавать анонимную функцию для обработки ответа, или включать в код переменную с именем data, которая нам, по сути, не нужна.
Это чепуха. Не полная, но в значительной степени. Что значит нам не нужна переменная data? Во-первых, промисы можно сцеплять в цепочки, и обрабатывать ответ много раз (например, для разных потребителей). При этом каждый обработчик будет получать результат работы предыдущего. Тот сценарий, когда можно без этого — это лишь один частный случай. И пусть он даже часто встречается — но есть и другие.
И потом, это все написано для javascript. А там все сильно иначе, чем в java, как в node, так и в вебе.
mayorovp
10.04.2017 20:45+1А кто говорил про накладные расходы-то? Речь шла про лишнюю вложенность.
sshikov
10.04.2017 21:51-1Я именно про эти накладные расходы — в виде лишних строчек. Их нет, потому что ваш пример с await — не их той оперы. Напомню:
CF — это A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.
Если вам не нужен обработчик завершения (dependent functions and actions), который и есть вложенный вызов — то зачем вас сдался CompletableFuture? Возьмите обычный, и сделайте get() — и будет вам щастье.
mayorovp
10.04.2017 21:54Кажется, вы не поняли. Приведенные мною два куска псевдокода делают одно и то же. В смысле, могли бы делать если бы async/await были включены в язык.
sshikov
11.04.2017 09:18-1Это вы меня не поняли. Если вам не нужен callback, который будет асинхронно вызван по завершению — то зачем вы берете CompletableFuture? Возмите просто Future, вызовите .get(), и будет у вас ровно тоже число строк — одна.
А если callback нужен — то пример с await неполный, и его надо переписать, и тогда он может будет проще и короче, а может и не будет.
Вообще вся вот эта вот фигня со списками и remove — она ровно для того, чтобы отслеживать, какие процессы завершились. А если у вас асинхронные callbacks — то вы и так знаете, когда и что завершилось, и эти же вещи удобнее делать не так. Изначальный код — он вообще странен, он от обычный Futures, так не пишут.
Lailore
11.04.2017 10:04pendingFutures.add(future); await future; pendingFutures.remove(future); <- Это и есть call back. Все что после await асинхронно вызовется после завершения
Хотя пример и правда странный
Сколько у future будет лишней вложенности?
await pendingFutures.addAsync(future); var result = await future; var foo = result.SomeField ?? await resut.DoSecondRequestAsync() await pendingFutures.removeAsync(future); foo.StartMainCircle().FireAndFoget()
Lailore
11.04.2017 10:12И опять забыл главное. Обработка ошибок
try { await pendingFutures.addAsync(future); var result = await future; var foo = result.SomeField ?? await resut.DoSecondRequestAsync() await pendingFutures.removeAsync(future); foo.StartMainCircle().FireAndFoget() } catch(Exeption ex){ //work with ex throw ex }
sshikov
11.04.2017 10:21-2Я вам именно про это и толкую. Если у вас такая логика — вам прямая дорога к использованию просто Future, и просто .get(). У Future вообще нет callbacks, если они вам не нужны — зачем вы берете CompletableFuture, чье основное назначение — именно наличие callbacks?
mayorovp
11.04.2017 10:21+1Чтобы не блокировать поток!
Вы вообще пост-то читали — или сразу в комментарии полезли?
Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу
Нельзя в таком сервере использовать метод .get(), вот никак нельзя!
sshikov
11.04.2017 10:23-2Оператор await применяется к задаче в асинхронном методе для приостановки выполнения метода до завершения выполнения ожидаемой задачи. Задача представляет выполняющуюся работу.
Это я цитирую MSDN, если что.
По-вашему, приостановка потока — это не равно "блокирование"?
mayorovp
11.04.2017 10:25+1Приостановка выполнения метода — это не то же самое что приостановка выполнения потока!
sshikov
11.04.2017 12:16-1В java — одно и тоже.
mayorovp
11.04.2017 12:22+1Вы спросили: "А чем вам этот вариант CompletableFuture неполноценный?"
Вам ответили: тем, что в текущей реализации используется на 1 уровень вложенности больше, чем было бы, если бы в Java был оператор await с поведением аналогичным тому как он себя ведет в C#.
Вы понимаете что такое сослагательное наклонение?
Lailore
11.04.2017 13:41Если бы только одна вложенность, то это не было бы по сути проблемой. Но если есть логика обработки ответа, которая тоже запускает асинхронные методы, то… Вложенности растут! А асинхронный код крайне заразный
sshikov
11.04.2017 12:15Блин. При чем тут пост? Я лично давно уже говорю совершенно о другом. await приостанавливает метод до завершения — тоже самое делает .get(). Хотите callback — берите CF, не хотите — get() это ровно тоже самое. Если вы не вызываете .get() — ваш текущий метод и поток (в java это одно и тоже) продолжает выполняться параллельно с тем потоком, в котором работает Future.
То что в C# из кода async метода генерируется де-факто callback — это совершенно другая история.
mayorovp
11.04.2017 12:24А что если я хочу поведение как при использовании callback — но с синтаксисом похожим на get()?
Хотите callback — берите CF, не хотите — get() это ровно тоже самое.
Нет, get() это совсем не то же самое! Он останавливает поток, в отличии от await.
mayorovp
11.04.2017 10:08Вообще-то вызов
.get()
заблокирует поток, его нельзя использовать вместо (гипотетического) оператора await.Lailore
11.04.2017 10:14Хм, тогда этот get плохой в принципе. Вся польза же от того, что потоки не блокируются
sshikov
11.04.2017 10:22Хм. Вот посмотрите внимательно на свой код с await, и расскажите мне, что делает ваш поток, если асинхронная операция на момент await еще не завершилась? Какой код он выполняет?
mayorovp
11.04.2017 10:24-1Он выполняет, внезапно, вызов
future.whenComplete
. Говорю же, оба приведенных мною кода — полностью эквивалентны.
Lailore
11.04.2017 16:26Он ничего не выполняет, а отдает тред в тредпул, или если это например ui тред, то освобождает тред для дальнейшей работы с ui
А вот когда операция завершиться, то если нет контекста треда, то возмется любой тред из пула, а если это например ui тред, то в него запущится операция в очереедь на выполнение
Lailore
09.04.2017 22:45+1Не написал гоавного, с подходом прописов или future большая проблема в том, когда у на есть ветвление в зависимости от результата асинхронной операции. Либо ненужное дробление кода, либо растут скобочки
sshikov
10.04.2017 09:51Хм. Имелось в виде промисов? Ну не без этого конечно, и я бы сказал что CF — это по сути и есть промис, только в профиль.
Но по-моему во всех остальных случаях этого ветвления и скобочек еще больше.
mayorovp
10.04.2017 09:13Для этого для начала нужен способ определить Executor, привязанный к текущему потоку — нужно какое-нибудь свойство, привязанное к потоку.
Причем модифицировать текущие реализации этого интерфейса чтобы они назначали себя созданным ими же потокам — нельзя, потому что это нарушит инкапсуляцию существующих библиотек, которые создают себе внутри приватный Executor...
Ryppka
А вызывать продолжения на та же экзекуторе не пробывали?
Chemist
В конце статьи написал, что можно использовать async-варианты методов.
Если вы имеете в виду callbacks, которые устанавливает пользовательское приложение, то оно не имеет доступа к потоку клиентской библиотеки. Поэтому добавляться callbacks могут из неизвестных заранее потоков.
Ryppka
А типа синглтон, которым by its very nature является единственный поток религия не велит?
Chemist
Я не могу контролировать из каких потоков будут обращаться к моей клиентской библиотеки. Она однопоточная только внутри себя.
Ryppka
Каюсь, не вник… Но идея библиотеки, завязанной на выполнение в одном потоке и не контролирующей это кажется странной. Мне кажется, что функциональность можно было бы оформить так, чтобы вызывающий код предоставил поток, в котором все будет выполнятся. Короче, если вызывается обычное продолжение, то, если склероз мне не изменяет, он будет выполнятся в некоем произвольном потоке на дефолтном тред-пуле. Если это так, то чему удивляться?
Chemist
Вы не поверите, но такие проблемы иногда встречаются в других клиентских либах.
Например: https://github.com/atomix/copycat/issues/75
Дело в том, что это общепринятая практика, что клиентская либа поддерживает какой-то внутренний пул потоков, в которых производит сетевые операции. Например java-клиент Kafka так работает.
Конечно, если внимательно читать документацию, то многих проблем бы удалось избежать.
Ryppka
Верю, но это кажется мне просчетом дизайна(
osmanpasha
А вообще, библиотеке действительно обязательно быть однопоточной? Если в приложении будут запросы, которые обрабатываются долго (медленный сервер? много данных?), они будут блокировать этот один поток и другие, быстрые запросы.
Chemist
Библиотека использует NIO, т.е. поток не блокируется.