imageВ Java 8 появился новый класс CompletableFuture, который позволяет удобно писать асинхронный код.
При использовании CompletableFuture из нескольких потоков я столкнулся с его неочевидным поведением, а именно с тем, что callbacks на нём могут выполнятся совсем не в тех потоках, как ожидалось. Об этом и о том, как мне удалось решить проблему — я и расскажу в этой статье.

Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу, который использовал потоконебезопасные структуры данных. Тесты проходили без проблем, но benchmarks иногда падали c ConcurrentModificationException на внутренних структурах однопоточного клиента.

Асинхронность в клиенте реализовывалась с использованием CompletableFuture, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor).

Фрагмент кода клиента с методом get, который доступен пользователям:

//ожидающие завершения запросы
private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
    
public CompletableFuture<String> get(String key) {
     CompletableFuture<String> future = new CompletableFuture<>();
        
     //передаём задачу на выполнение в поток клиента
     singleThreadExecutor.execute(() -> {
             //добавляем future в список ожидающих завершения
             pendingFutures.add(future);
             future.whenComplete((v, e) -> {
                     //когда future завершится удаляем его из списка ожидающих
                     pendingFutures.remove(future);
             });
             //тут был код передающий запрос на сервер и получающий ответ от сервера
             //в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
     });

     return future;
}

Оказалось, что так делать нельзя.

Возможно, я узнал бы об этом раньше, если бы внимательно прочитал javadoc для CompletableFuture.

Посмотреть javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

При использовании такой архитектуры необходимо, чтобы все callbacks у CompletableFuture вызывались в том же потоке, который делает CompletableFuture.complete.

По указанному выше коду, вроде бы, так и происходит. Но benchmarks иногда завершались с ConcurrentModificationException в коде, который перебирал pendingFutures в том же потоке клиента (singleThreadExecutor).

Дело в том, что callback, передаваемый в future.whenComplete (который вызывает pendingFutures.remove), иногда выполняется совсем в другом потоке. А точнее в потоке приложения, которое пользуется моим клиентом:

Client client = new Client("127.0.0.1", 8080);
CompletableFuture<String> result = client.get(key);
result.thenAccept(data -> {
    System.out.println(data);
});

Вызов result.thenAccept в этом приложении приводит иногда к вызову остальных callbacks на future, которые были добавлены внутри самого кода клиента.

Рассмотрим проблему на простых примерах


Thread mainThread = Thread.currentThread();
CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread);
});
future.complete(null);

Такой код всегда выводит на экран true, так как callback выполняется в том же потоке, что и метод complete.

Но если к CompletableFuture будет хотя бы одно обращение из другого потока, то поведение может измениться:

//основной поток
Thread mainThread = Thread.currentThread();

//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();

CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread)
});

//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
    future.thenRun(() -> {
        //nop
    });
});

//завершаем future
future.complete(null);

Такой код может иногда выдавать false.

Дело в том, что вызов thenRun у того же future, но во втором потоке, может привести к срабатыванию callback в первом thenRun. При этом callback первого thenRun будет вызван во втором потоке.

Это происходит в тот момент, когда future.complete(null) начало выполняться, но ещё не успело вызвать callbacks, а во втором потоке вызвался thenRun, который и выполнит все остальные callbacks на этом future но уже в своём потоке.

Проблем решается просто:

//основной поток
Thread mainThread = Thread.currentThread();

//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();

CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread);
});

//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
    secondThreadFuture.thenRun(() -> {
        //nop
    });
});

//завершаем future
future.complete(null);

Мы просто добавили secondThreadFuture, которая зависит от результата исходной future. И вызов на ней thenRun во втором потоке не приводит к возможному срабатыванию callbacks на исходной future.

Для гарантированного вызова callbacks в заданных пользователем потоках у CompletableFuture существуют async реализации методов, например — thenRunAsync, которым нужно передавать Executor. Но async-версии методов могут работать медленней, чем обычные. Поэтому, я не хотел лишний раз их использовать.

Вывод


Вывод, который я сделал для себя: не использовать один объект CompletableFuture в нескольких потоках, если необходимо быть уверенным, что все callbacks на нём выполняются в заданном потоке. А если использовать несколько потоков с одним CompletableFuture необходимо — то достаточно передавать в другой поток не оригинальный CompletableFuture, а новый, который будет зависеть от исходного. Например, вот так:

CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
    //nop
});
Поделиться с друзьями
-->

Комментарии (53)


  1. Ryppka
    08.04.2017 19:47

    А вызывать продолжения на та же экзекуторе не пробывали?


    1. Chemist
      08.04.2017 19:51

      В конце статьи написал, что можно использовать async-варианты методов.
      Если вы имеете в виду callbacks, которые устанавливает пользовательское приложение, то оно не имеет доступа к потоку клиентской библиотеки. Поэтому добавляться callbacks могут из неизвестных заранее потоков.


      1. Ryppka
        08.04.2017 20:01

        А типа синглтон, которым by its very nature является единственный поток религия не велит?


        1. Chemist
          08.04.2017 20:02

          Я не могу контролировать из каких потоков будут обращаться к моей клиентской библиотеки. Она однопоточная только внутри себя.


          1. Ryppka
            08.04.2017 20:07
            +3

            Каюсь, не вник… Но идея библиотеки, завязанной на выполнение в одном потоке и не контролирующей это кажется странной. Мне кажется, что функциональность можно было бы оформить так, чтобы вызывающий код предоставил поток, в котором все будет выполнятся. Короче, если вызывается обычное продолжение, то, если склероз мне не изменяет, он будет выполнятся в некоем произвольном потоке на дефолтном тред-пуле. Если это так, то чему удивляться?


            1. Chemist
              08.04.2017 20:35

              Вы не поверите, но такие проблемы иногда встречаются в других клиентских либах.
              Например: https://github.com/atomix/copycat/issues/75
              Дело в том, что это общепринятая практика, что клиентская либа поддерживает какой-то внутренний пул потоков, в которых производит сетевые операции. Например java-клиент Kafka так работает.
              Конечно, если внимательно читать документацию, то многих проблем бы удалось избежать.


              1. Ryppka
                08.04.2017 21:57
                +3

                Верю, но это кажется мне просчетом дизайна(


          1. osmanpasha
            09.04.2017 16:55

            А вообще, библиотеке действительно обязательно быть однопоточной? Если в приложении будут запросы, которые обрабатываются долго (медленный сервер? много данных?), они будут блокировать этот один поток и другие, быстрые запросы.


            1. Chemist
              09.04.2017 16:57

              Библиотека использует NIO, т.е. поток не блокируется.


  1. linfro
    09.04.2017 00:06

    Используйте ListenableFuture из guava и забудьте об этих проблемах. Там можно явно указать executor, в котором должен выполняться listener. Да и в целом у ListenableFuture лучше продуман публичный API.


    1. sshikov
      09.04.2017 21:52
      +1

      С какой стати забудьте? Указание executor вовсе не избавляет от потенциальных проблем. Оно просто гарантирует (а точнее позволяет гарантировать), в каком потоке вызовется listener. Кстати его и тут можно указать, о чем в посте было написано — и в комментах уже кажется много раз.


      Налажать при этом все равно никто не помешает — и это явно написано в javadoc у ListenableFuture в одном из первых абзацев.


      1. linfro
        09.04.2017 23:07
        +1

        Я ровно об этом и пытаюсь сказать. Вот цитата автора:
        "
        Асинхронность в клиенте реализовывалась с использованием CompletableFuture, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor)."
        Автор гарантирует отсутствие гонок в неком потоконебезопасном коде выполняя его в single thread executor, но получает гонки, обращайтесь к этому самому небезопасному коду из колбэков фучи. Для решения этой проблемы существует стандартный механизм в ListenableFuture (и, как тут правильно заметили в CallableFuture и в Scala) — при добавлении колбэка указать executor, в котором он будет выполнен. Т.е. для решения проблемы автора ему было достаточно при добавлении колбэка указать тот самый single thread executor, в котором он выполняет свой небезопасный код.


        1. sshikov
          10.04.2017 09:45
          +1

          Да, в такой формулировке я не спорю, согласен.


          Просто вы так выразились "забудьте об этих проблемах", очень уж обобщенно ))) С многопоточностью вообще не стоит так обобщать. Где не гонки — там дедлоки )))


          1. linfro
            10.04.2017 11:05

            Да, дедлок тут очень легко схлопотать. Достаточно в том же самом single thread executor сказать future.get();


  1. Googolplex
    09.04.2017 01:19
    +2

    Интересно, что в Future'ах скалы это проблема решается тем, что все методы, которые принимают коллбэки, также принимают параметром executor, в котором коллбэк должен выполниться, поэтому всегда можно указать нужный executor и сериализовать обращения к мутабельному состоянию. Хотя для этого лучше конечно использовать библиотеки вроде Akka.


    1. Chemist
      09.04.2017 09:39

      В методы CompletableFuture тоже можно передавать executor, в конце статьи это описано.


  1. osmanpasha
    09.04.2017 09:34
    +1

    Я правильно понимаю, что ваша изначальная проблема решилась бы

    • либо синхронизацией доступа к pendingFutures и другим внутренним объектам библиоткеи,
    • либо переносом строчки pendingFutures.remove(future); из колбэка сразу за строчку future.complete(data);? Тогда добавление и удаление точно происходили бы в singleThreadExecutor.


    1. Chemist
      09.04.2017 09:37
      -1

      Да, если сделать так, как вы описали, проблема бы не возникла.


  1. Lailore
    09.04.2017 17:30
    +1

    Эм, в java сложно сделать полноценные async/await как в c#?


    1. sshikov
      09.04.2017 20:36

      А чем вам этот вариант CompletableFuture неполноценный?


      1. Lailore
        09.04.2017 22:30
        +1

        Хм, тем что код имеет лишную вложенность. И как например с помощью java запустить сразу несколько задач в разных потоках и работать дальше с самым быстрым результатом, но обработать ошибки всех задач?


        1. sshikov
          10.04.2017 09:49

          Да ладно, где тут лишняя?


          Сразу несколько задач в разных потоках — никаких проблем. ExecutorService это ровно для этого. Только надо выбрать многопоточный. Я только вот что хочу сказать — async/await в C# — это синтаксический сахар, на уровне языка, такого вы конечно в Java самостоятельно не сделаете. А во всем остальном то что есть сейчас — вполне адекватно задачам. Местами может быть есть оверинжиниринг, не без этого, но все совершенно полноценное.


          1. Lailore
            10.04.2017 10:06

            Так тут это новшество в языке, как я понял. Просто у меня вопрос почему в новой версии языка не сделать сразу хорошо и с сахором?


            1. sshikov
              10.04.2017 19:21

              Новшество конечно… в 8-ке. Только там помимо этого были еще и лямбды и стримы, коотрые сами по себе такое большое новшество, что релиз откладывали несколько раз. Я думаю просто не успели.


              Можно кстати хоть на код этого CF посмотреть — на мой взгляд, там такой ад и ужас, в отличие от многих других классов. Наводит на мысли, что торопились слегка.


              А насчет сразу хорошо… ну не знаю, я бы не хотел, чтобы синтаксис языка меняли из-за того, что можно сделать библиотекой. В большинстве случаев это получается более гибко. И сначала надо понять, какие кейсы все-таки больше нужны, и начинать делать с них. Вот лямбды, несмотря на все споры вокруг них, активно пошли в народ, и просто невооруженным взглядом видно, насколько стал меняться стиль во многих фреймворках, в сторону упрощения причем.


          1. mayorovp
            10.04.2017 17:13

            Вот тут лишняя:


            pendingFutures.add(future);
            future.whenComplete((v, e) -> {
                pendingFutures.remove(future);
            });

            Сравните:


            pendingFutures.add(future);
            await future;
            pendingFutures.remove(future);


            1. sshikov
              10.04.2017 19:15

              Это не накладные расходы нифига. Это последствия того, что у CF много разных методов. Для разных же целей. Сделать обертку, которая тупо ждет, и ничего не делает — раз плюнуть. И будет одинаково.


              1. Lailore
                10.04.2017 20:44

                1. sshikov
                  10.04.2017 21:58

                  Эта статься — очень сильное упрощение. Ну вот например:


                  Всё дело в том, что не нужно писать .then, создавать анонимную функцию для обработки ответа, или включать в код переменную с именем data, которая нам, по сути, не нужна.

                  Это чепуха. Не полная, но в значительной степени. Что значит нам не нужна переменная data? Во-первых, промисы можно сцеплять в цепочки, и обрабатывать ответ много раз (например, для разных потребителей). При этом каждый обработчик будет получать результат работы предыдущего. Тот сценарий, когда можно без этого — это лишь один частный случай. И пусть он даже часто встречается — но есть и другие.


                  И потом, это все написано для javascript. А там все сильно иначе, чем в java, как в node, так и в вебе.


              1. mayorovp
                10.04.2017 20:45
                +1

                А кто говорил про накладные расходы-то? Речь шла про лишнюю вложенность.


                1. sshikov
                  10.04.2017 21:51
                  -1

                  Я именно про эти накладные расходы — в виде лишних строчек. Их нет, потому что ваш пример с await — не их той оперы. Напомню:


                  CF — это A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.


                  Если вам не нужен обработчик завершения (dependent functions and actions), который и есть вложенный вызов — то зачем вас сдался CompletableFuture? Возьмите обычный, и сделайте get() — и будет вам щастье.


                  1. mayorovp
                    10.04.2017 21:54

                    Кажется, вы не поняли. Приведенные мною два куска псевдокода делают одно и то же. В смысле, могли бы делать если бы async/await были включены в язык.


                    1. sshikov
                      11.04.2017 09:18
                      -1

                      Это вы меня не поняли. Если вам не нужен callback, который будет асинхронно вызван по завершению — то зачем вы берете CompletableFuture? Возмите просто Future, вызовите .get(), и будет у вас ровно тоже число строк — одна.


                      А если callback нужен — то пример с await неполный, и его надо переписать, и тогда он может будет проще и короче, а может и не будет.


                      Вообще вся вот эта вот фигня со списками и remove — она ровно для того, чтобы отслеживать, какие процессы завершились. А если у вас асинхронные callbacks — то вы и так знаете, когда и что завершилось, и эти же вещи удобнее делать не так. Изначальный код — он вообще странен, он от обычный Futures, так не пишут.


                      1. Lailore
                        11.04.2017 10:04

                        pendingFutures.add(future);
                        await future;
                        pendingFutures.remove(future); <- Это и есть call back. Все что после  await асинхронно вызовется после завершения
                        


                        Хотя пример и правда странный

                        Сколько у future будет лишней вложенности?
                        await pendingFutures.addAsync(future);
                        var result = await future;
                        var foo = result.SomeField ?? await resut.DoSecondRequestAsync()
                        await pendingFutures.removeAsync(future);
                        foo.StartMainCircle().FireAndFoget()
                        


                        1. Lailore
                          11.04.2017 10:12

                          И опять забыл главное. Обработка ошибок

                          try {
                              await pendingFutures.addAsync(future);
                              var result = await future;
                              var foo = result.SomeField ?? await resut.DoSecondRequestAsync()
                              await pendingFutures.removeAsync(future);
                              foo.StartMainCircle().FireAndFoget()
                          } catch(Exeption ex){
                              //work with ex
                              throw ex
                          }
                          


                        1. sshikov
                          11.04.2017 10:21
                          -2

                          Я вам именно про это и толкую. Если у вас такая логика — вам прямая дорога к использованию просто Future, и просто .get(). У Future вообще нет callbacks, если они вам не нужны — зачем вы берете CompletableFuture, чье основное назначение — именно наличие callbacks?


                          1. mayorovp
                            11.04.2017 10:21
                            +1

                            Чтобы не блокировать поток!


                            Вы вообще пост-то читали — или сразу в комментарии полезли?


                            Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу

                            Нельзя в таком сервере использовать метод .get(), вот никак нельзя!


                            1. sshikov
                              11.04.2017 10:23
                              -2

                              Оператор await применяется к задаче в асинхронном методе для приостановки выполнения метода до завершения выполнения ожидаемой задачи. Задача представляет выполняющуюся работу.


                              Это я цитирую MSDN, если что.


                              По-вашему, приостановка потока — это не равно "блокирование"?


                              1. mayorovp
                                11.04.2017 10:25
                                +1

                                Приостановка выполнения метода — это не то же самое что приостановка выполнения потока!


                                1. sshikov
                                  11.04.2017 12:16
                                  -1

                                  В java — одно и тоже.


                                  1. mayorovp
                                    11.04.2017 12:22
                                    +1

                                    Вы спросили: "А чем вам этот вариант CompletableFuture неполноценный?"


                                    Вам ответили: тем, что в текущей реализации используется на 1 уровень вложенности больше, чем было бы, если бы в Java был оператор await с поведением аналогичным тому как он себя ведет в C#.


                                    Вы понимаете что такое сослагательное наклонение?


                                    1. Lailore
                                      11.04.2017 13:41

                                      Если бы только одна вложенность, то это не было бы по сути проблемой. Но если есть логика обработки ответа, которая тоже запускает асинхронные методы, то… Вложенности растут! А асинхронный код крайне заразный


                            1. sshikov
                              11.04.2017 12:15

                              Блин. При чем тут пост? Я лично давно уже говорю совершенно о другом. await приостанавливает метод до завершения — тоже самое делает .get(). Хотите callback — берите CF, не хотите — get() это ровно тоже самое. Если вы не вызываете .get() — ваш текущий метод и поток (в java это одно и тоже) продолжает выполняться параллельно с тем потоком, в котором работает Future.


                              То что в C# из кода async метода генерируется де-факто callback — это совершенно другая история.


                              1. mayorovp
                                11.04.2017 12:24

                                А что если я хочу поведение как при использовании callback — но с синтаксисом похожим на get()?


                                Хотите callback — берите CF, не хотите — get() это ровно тоже самое.

                                Нет, get() это совсем не то же самое! Он останавливает поток, в отличии от await.


                      1. mayorovp
                        11.04.2017 10:08

                        Вообще-то вызов .get() заблокирует поток, его нельзя использовать вместо (гипотетического) оператора await.


                        1. Lailore
                          11.04.2017 10:14

                          Хм, тогда этот get плохой в принципе. Вся польза же от того, что потоки не блокируются


                        1. sshikov
                          11.04.2017 10:22

                          Хм. Вот посмотрите внимательно на свой код с await, и расскажите мне, что делает ваш поток, если асинхронная операция на момент await еще не завершилась? Какой код он выполняет?


                          1. mayorovp
                            11.04.2017 10:24
                            -1

                            Он выполняет, внезапно, вызов future.whenComplete. Говорю же, оба приведенных мною кода — полностью эквивалентны.


                            1. sshikov
                              11.04.2017 12:17
                              -1

                              Вы мои ответы изволите читать, или просто пишете, что в голову пришло? Там где у вас await — там нет никакого whenComplete, не путайтесь в своем же примере.


                              1. mayorovp
                                11.04.2017 12:25

                                Почему вы считаете, что его там нет?


                          1. Lailore
                            11.04.2017 16:26

                            Он ничего не выполняет, а отдает тред в тредпул, или если это например ui тред, то освобождает тред для дальнейшей работы с ui

                            А вот когда операция завершиться, то если нет контекста треда, то возмется любой тред из пула, а если это например ui тред, то в него запущится операция в очереедь на выполнение


      1. Lailore
        09.04.2017 22:45
        +1

        Не написал гоавного, с подходом прописов или future большая проблема в том, когда у на есть ветвление в зависимости от результата асинхронной операции. Либо ненужное дробление кода, либо растут скобочки


        1. sshikov
          10.04.2017 09:51

          Хм. Имелось в виде промисов? Ну не без этого конечно, и я бы сказал что CF — это по сути и есть промис, только в профиль.


          Но по-моему во всех остальных случаях этого ветвления и скобочек еще больше.


    1. mayorovp
      10.04.2017 09:13

      Для этого для начала нужен способ определить Executor, привязанный к текущему потоку — нужно какое-нибудь свойство, привязанное к потоку.


      Причем модифицировать текущие реализации этого интерфейса чтобы они назначали себя созданным ими же потокам — нельзя, потому что это нарушит инкапсуляцию существующих библиотек, которые создают себе внутри приватный Executor...