К исследованиям в этой сфере меня вдохновила статья Асинхронность: назад в будущее. В ней автор описывает идею о том, как, используя сопрограммы, можно упростить асинхронный код так, чтобы выглядел он так же, как обычный синхронный, но сохранял плюшки, которые нам даёт применение асинхронных операций. Вкратце, суть подхода такова: если у нас есть механизм, позволяющий сохранять и восстанавливать контекст выполнения (поддержка сопрограмм), то код на цепочках callback'ов

startReadSocket((data) -> {
   startWriteFile(data, (result) -> {
      if (result == ok) ...
   });
});

мы можем переписать так:

data = readSocket();
result = writeFile(data);
if (result == ok) ...

Здесь readSocket() и writeFile() — сопрограммы, в которых асинхронные операции вызываются следующим образом:

byte[] readSocket() {
    byte[] result = null;
    startReadSocket((data) -> {
	    result = data;
	    resume();
    });
    yield();
    return result;
}

Методы yield() и resume() сохраняют и восстанавливают контекст выполнения, со всеми фреймами и локальными переменными. Происходит следующее: при вызове readSocket() мы планируем асинхронную операцию вызовом startReadSocket() и выполняем yield(). Yield() сохраняет контекст выполнения и поток завершается (возвращается в пул). Когда асинхронная операция будет выполнена, мы вызовем resume() перед выходом из callback'a, и тем самым возобновим выполнение кода. Управление снова получит основная функция, которая вызовет writeFile(). writeFile() устроен аналогично, и всё повторится.

Сделав единожды такое преобразование для всех используемых асинхронных операций и поместив полученные функции в библиотеку, мы получаем инструмент, позволяющий нам писать асинхронный код так, как будто это обычный синхронный код. Мы получаем возможность сочетать плюсы синхронного кода (читабельность, удобная обработка ошибок) и асинхронного (производительность). Плата за это удобство — необходимость как-то сохранять и восстанавливать контекст выполнения. В статье автор описывает реализацию на С++, мне же захотелось заиметь что-то такое в Java. Об этом и пойдёт речь.

javaflow


В первую очередь надо было найти реализацию сопрограмм для JVM. Среди нескольких вариантов самой подходящей оказалась библиотека javaflow. Она бы вполне подошла для эксперимента, но, к сожалению, проект давно заброшен. Потыкав палочкой (декомпилятором) в генерируемый ей код, я выяснил, что в javaflow есть несколько серьезных проблем:

  • Совсем не поддерживаются лямбды. Это не удивительно, с учётом того, что последний релиз библиотеки был в 2008 году.
  • Код инструментируется крайне не оптимально — инструментируются все вызовы внутри метода, хотя большинство из них никогда не приведёт к вызову suspend(). В результате байт-код сильно распухает, и в реальной жизни такой подход будет неприемлемо медленно работать.
  • Нет поддержки reflection. Если в процессе исполнения кода какой-то метод может быть вызван через reflection, javaflow не сможет в этом месте сохранить и восстановить контекст выполнения. А это критично при повседневном программировании, и дело даже не в принципиальной возможности, а в том, что почти все используют сейчас DI-контейнеры, которые работают через reflection. Поэтому запрещать reflection нельзя, это слишком сильное ограничение для программистов.

Несмотря на всё это, javaflow помог разобраться в том, как можно реализовать сохранение и восстановление состояния. Далее было 2 варианта: пытаться поддерживать javaflow или написать свою реализацию. По очевидным соображениям (фатальный недостаток) был избран второй способ.

jcoro


Сопрограммы, добавляемые в язык, где их не было, расширяют его. Чтобы писать приложения, которые полностью используют преимущества предлагаемого подхода, и не материться при этом, нужно сделать их удобными. При чтении кода мы должны сразу видеть, что вот эта функция является сопрограммой и выполняет асинхронную операцию, и поэтому её надо запускать в рамках контекста, поддерживающего сохранение и восстановление стека. В языке C# для этого есть ключевые слова async и await. В Java, к сожалению, добавить свои ключевые слова не представляется реальным, но можно воспользоваться аннотациями! Выглядит всё это, конечно, громоздко, но что поделать. Может быть, придумается ещё что-нибудь. А пока так:

Coro coro = Coro.initSuspended(new ICoroRunnable() {
    @Override
    @Async({@Await("foo")})
    public void run() {
        int i = 5;
        double f = 10;
        final String argStr = foo(i, f, "argStr");
    }

    @Async(@Await("yield"))
    private String foo(int x, double y, String m) {
        Coro c = Coro.get();
        c.yield();
        return "returnedStr";
    }
});
coro.start();
coro.resume();

Наличие аннотации @Async говорит jcoro о том, что нужно инструментировать байткод этого метода, сделав его сопрограммой. Сигнатуры точек восстановления задаются аннотациями @Await. Все вызовы внутри сопрограммы, сигнатуры которых есть в списке @Await-аннотаций, становятся точками восстановления. Сопрограмма в jcoro — это метод, помеченный аннотацией @Async и имеющий хотя бы одну точку восстановления. Если в методе нет ни одной точки восстановления, он не будет инструментирован. Точка восстановления — это вызов Coro.yield() или любой вызов (сопрограммы), который может в конечном счёте привести к вызову Coro.yield().

Что происходит в приведённом примере?
Сначала создаётся экземпляр Coro — это объект, который хранит в себе сохранённое состояние сопрограммы и может её запускать, сохранять и восстанавливать. Изначально сопрограмма только инициализируется, но не запускается. При вызове start() управление получает метод run(), который первым делом проверяет, не нужно ли восстанавливать состояние. Пока мы только запустили сопрограмму, и run() просто начинает выполнять свой код. Метод выполняет код, вызывает foo(). Внутри foo() выполняется такая же проверка — не нужно ли восстанавливать состояние? Ответ отрицательный, и, аналогично, код метода начинает выполняться с начала. А вот при вызове yield() происходит следующее. Сам вызов yield() только устанавливает флаг «isYielding» и больше ничего не делает, но код после вызова, увидев этот флаг, не продолжает выполнение, а сохраняет своё состояние и сразу же завершается, возвращая null. То же самое происходит уровнем выше. И далее метод start() возвращает управление. Что мы имеем на этот момент? Код до вызова yield() выполнен, состояние выполнения сохранено в экземпляре Coro. Далее мы зовём resume(). Это приводит к повторному вызову метода run(). И, как и в первый раз, метод проверяет, не нужно ли восстановить состояние. На этот раз это действительно нужно сделать, и метод, вспомнив, что остановился он на вызове foo(), восстанавливает свои локальные переменные и стек и переходит прямо к вызову foo(), не выполняя кода, который был перед ним. В методе foo() происходит то же самое — он восстанавливает стек и локальные переменные, а потом переходит сразу к вызову yield(). Вызов yield() сам по себе ничего не делает, кроме сброса внутреннего флага. После него метод foo() завершает выполнение, возвращая строку «returnedStr». Остаётся метод run(), который так же благополучно завершается, возвращая управление коду, вызывающему resume(). На выходе мы имеем полностью отработавшую сопрограмму, выполнение которой мы разбили на две части.

Как это поможет нам в написании асинхронных приложений?


Предположим, что нам нужно написать серверное приложение, которое в ответ на запрос обращается к базе данных, потом что-то делает с данными, далее применяет их к шаблону, и возвращает кусочек разметки. Классическое серверное веб-приложение. Почти на всех этапах мы можем использовать асинхронные операции. Установка соединения, чтение данных из сокета при получении запроса, все сетевые операции с базой данных, чтение файла при загрузке шаблона, отправка результата в сокет. CPU в таком сценарии должен быть занят только планированием асинхронных операций, логикой препроцессинга данных и шаблонизацией. В остальное время процессор может отдохнуть. Давайте попробуем прикинуть, как это можно было бы организовать в коде. Набросаем сервер:

public static void main(String[] args) {
    Coro.initSuspended(new ICoroRunnable() {
        @Async({@Await("accept")})
        public void run() {
            final AsynchronousServerSocketChannel listener = bind(new InetSocketAddress(5000));

            // Чтобы использовать процессор по максимуму, создаём пул потоков по количеству ядер
            ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

            while (true) {
                AsynchronousSocketChannel channel = accept(listener); // Асинхронная операция
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        Coro.initSuspended(new ICoroRunnable() {
                            @Async({@Await("handle")})
                            public void run() {
                                // Код обработки запроса - в нём тоже можно вызывать асинхронные операции,
                                // так как он работает в сохраняемом контексте
                                handle(channel);
                            }
                        }).start();
                    }
                });
            }
        }
    }).start();
}

@Async({@Await("read"), @Await("write")})
public static void handle(AsynchronousSocketChannel channel) {
    ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);

    Integer read = read(channel, buffer); // Асинхронная операция
    write(channel, outBuffer); // Асинхронная операция

    channel.close();
}

В коде опущены некоторые блоки try-catch, необходимые для корректной компиляции, это сделано чтобы было легче читать код.

Внутри handle можно добавить любую логику. Например, определение «контроллера» и вызов его через reflection, внедрение зависимостей. Но нужно быть аккуратным с вызовами кода, содержащего точки восстановления, через reflection или неинструментированные библиотеки. Об этом чуть ниже.

С точки зрения утилизации потоков это работает следующим образом. Есть пул рабочих потоков, а есть системный пул потоков, который JVM резервирует для выполнения callback-ов асинхронных операций. Когда какая-то асинхронная операция завершается, один из потоков начинает выполнять callback. В нём сначала восстанавливается состояние сопрограммы, потом сопрограмма продолжает выполнение, либо доходя до завершения, либо до следующей асинхронной операции. После того как сопрограмма завершается (или приостанавливает выполнение после планирования очередной асинхронной операции), поток возвращается в пул. Таким образом, один запрос по очереди может обрабатываться разными потоками, и это накладывает некоторые ограничения на наш код. Так, например, мы не можем пользоваться thread local-переменными, если нет уверенности, что между put и get выполнение сопрограммы не будет прервано. С другой стороны, схема выглядит близкой к оптимальной, и обещает хорошую производительность.

Реализация


В отличие от javaflow, jcoro не инструментирует все методы и все вызовы внутри них. Инструментированию подлежат только сопрограммы — те методы, в которых есть хотя бы одна точка восстановления. Точка восстановления — это вызов, который при выполнении может привести в конечном итоге к вызову yield(). То есть это не обязательно должно происходить при каждом вызове, достаточно теоретической возможности. Как вообще инструментируется код? Как можно сохранить и восстановить состояние выполнения целого потока? Оказывается, это совсем не трудно. Достаточно каждый метод, который претендует на гордое звание сопрограммы, превратить в маленькую state-машину. Для этого в начале метода дописывается байт-код, который ничего не делает, если восстанавливаться не нужно, а если нужно, то выполняет switch(state) и по значению состояния переходит на вызов точки восстановления, на которой выполнение было приостановлено. Этого достаточно, потому что сохранение состояния может произойти только в момент вызова точки восстановления (и сам вызов yield() тоже является точкой восстановления). Ну и плюс к этому нужно не забыть восстановить локальные переменные и стек фрейма. Так как в JVM состояние фрейма однозначно идентифицируется этим набором (состояние стека, локальных переменных и текущая инструкция), то после этого можно утверждать, что всё у нас работает правильно. Аналогичным образом отрабатывает сохранение-восстановление на всём стеке выполнения.

Возвращаясь к нашему примеру, давайте посмотрим на то, во что он превратится:

@Async(@Await("yield"))
private String foo(int a, double b, String c) {
    Coro c = Coro.get();
    c.yield();
    return "returnedStr";
}

Эта сопрограмма не делает ничего полезного, а только приостанавливает свою работу, а после возвращает значение. В байт-коде это выглядит так:

private java.lang.String foo(int, double, java.lang.String);
    descriptor: (IDLjava/lang/String;)Ljava/lang/String;
    flags: ACC_PRIVATE
    Code:
      stack=1, locals=6, args_size=4
      0000: invokestatic  org/jcoro/Coro.get:()Lorg/jcoro/Coro;
      0003: astore        5
      0005: aload         5
      0007: invokevirtual org/jcoro/Coro.yield:()V
      0010: ldc           "returnedStr"
      0012: areturn

После инструментирования мы увидим вот что (результат приведён в виде unified diff, к сожалению, хабр не поддерживает подсвечивания строк):

private java.lang.String foo(int, double, java.lang.String);
     descriptor: (IDLjava/lang/String;)Ljava/lang/String;
     flags: ACC_PRIVATE
     Code:
-      stack=1, locals=6, args_size=4
+      stack=2, locals=6, args_size=4
+         0: invokestatic  org/jcoro/Coro.getSafe:()Lorg/jcoro/Coro;     // Получаем текущую сопрограмму
+         3: ifnull        0000                                          // Если её нет - переходим к началу метода
+         6: invokestatic  org/jcoro/Coro.popState:()Ljava/lang/Integer; // popState() вернёт нам не null, если есть сохранённое состояние
+         9: dup
+        10: ifnull        32                                            // Восстанавливать ничего не надо - переходим на начало метода
+        13: invokestatic  org/jcoro/Coro.isUnpatchableCall:()Z          // Это для поддержки неинструментируемых вызовов
+        16: ifeq          23
+        19: invokestatic  org/jcoro/Coro.popRef:()Ljava/lang/Object;
+        22: pop
+        23: ldc           0
+        25: invokestatic  org/jcoro/Coro.setUnpatchableCall:(Z)V
+        28: pop
+        29: goto          43
+        32: pop
       0000: invokestatic  org/jcoro/Coro.get:()Lorg/jcoro/Coro;         // Тело метода до первой точки восстановления
       0003: astore        5
       0005: aload         5
+        40: goto          0007                                          // Далее - код восстановления состояния перед первой точкой восстановления
+        43: invokestatic  org/jcoro/Coro.popRef:()Ljava/lang/Object;
+        46: checkcast     "Lorg/jcoro/Coro;"
+        49: astore        5
+        51: invokestatic  org/jcoro/Coro.popRef:()Ljava/lang/Object;
+        54: checkcast     "Ljava/lang/String;"
+        57: astore        4
+        59: invokestatic  org/jcoro/Coro.popDouble:()D
+        62: dstore_2
+        63: invokestatic  org/jcoro/Coro.popInt:()I
+        66: istore_1
+        67: invokestatic  org/jcoro/Coro.popRef:()Ljava/lang/Object;
+        70: checkcast     "Lorg/jcoro/tests/SimpleTest$1;"
+        73: astore_0
+        74: invokestatic  org/jcoro/Coro.popRef:()Ljava/lang/Object;
+        77: checkcast     "Lorg/jcoro/Coro;"
       0007: invokevirtual org/jcoro/Coro.yield:()V                      // Точка восстановления
+        83: invokestatic  org/jcoro/Coro.isYielding:()Z                 // Далее - код сохранения состояния
+        86: ifeq          0010
+        89: aload_0
+        90: invokestatic  org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+        93: iload_1
+        94: invokestatic  org/jcoro/Coro.pushInt:(I)V
+        97: dload_2
+        98: invokestatic  org/jcoro/Coro.pushDouble:(D)V
+       101: aload         4
+       103: invokestatic  org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+       106: aload         5
+       108: invokestatic  org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+       111: aload_0
+       112: invokestatic  org/jcoro/Coro.pushRef:(Ljava/lang/Object;)V
+       115: ldc           0
+       117: invokestatic  org/jcoro/Coro.pushState:(I)V
+       120: aconst_null                                                 // Возвращаем null, если сопрограмма не завершена
+       121: areturn
       0010: ldc           "returnedStr"                                 // Код завершения метода
       0012: areturn

В начале метода добавился код, определяющий точку восстановления, а перед и после точки восстановления — код для восстановления и сохранения. Если бы точек восстановление было больше, то в начале вместо простого перехода мы бы увидели switch. Есть ещё один нюанс. Раз уж мы пользуемся параллельными стеками для сохранения-восстановления фреймов, то мы должны соблюдать порядок добавления и получения объектов. Если мы сначала кладём на стек объект А, а потом Б, то получать мы их должны в обратном порядке. Поэтому если мы сохраняем сначала локальные переменные, а потом стек фрейма, то восстановление мы должны выполнять наоборот. И плюс сюда отлично вписывается обработка ссылки на объект вызова (this). При сохранении он кладётся на стек крайним, а при восстановлении забирается первым (если, конечно, точка восстановления — нестатический метод). В приведённом примере локальных переменных нет, но с ними код был бы почти такой же.

Unpatchable-код


К сожалению, описанная стратегия сохранения и восстановления стека работает только если есть возможность инструментировать все сопрограммы. Если какой-то метод, который содержит в себе точку восстановления, мы не можем инструментировать, эта стратегия не сработает. Такое возможно, если мы зовём код посредством рефлекшена или же библиотеки, которую невозможно инструментировать. И если с библиотеками ещё можно что-то придумать, то без рефлекшена ну никак нельзя. Все программисты хотят использовать DI-контейнеры, прокси и AOP. Однако, можно заметить, что чаще всего такого рода вызовы — полностью stateless, то есть сколько их не вызывай, они по сути ничего не делают, кроме передачи управления дальше. И при возобновлении сопрограммы можно вызвать такой метод вторично, просто передав в него те же самые аргументы. А уже в коде, который позовёт он, продолжить восстанавливать состояние. И для поддержки этого механизма нужна лишь вторая стратегия сохранения состояния, при которой аргументы сохраняются перед вызовом, а не после. Эта стратегия сейчас поддерживается в jcoro, а для использования нужно всего лишь помечать точки восстановления как @Await(patchable = false).

Информацию о том, во что превращается вызов метода с использованием каждой из стратегий, можно найти на вики.

Поддержка лямбд


Лямбды поддерживаются, но кривовато. Есть две проблемы. Одна из них заключается в том, что в java сложно повесить аннотации на лямбду, и ещё сложнее их прочитать. Единственное найденное мной решение основано на появившихся недавно Type Annotations и выглядит следующим образом:

Coro coro = Coro.initSuspended((@Async({@Await(value = "yield")}) ICoroRunnable) () -> {
    Coro.get().yield();
});

Компилятор, когда видит такое, добавляет в class-файл аннотацию и связывает её с инструкцией invokedynamic. И это работает, но, к сожалению, не всегда. Иногда компилятор связывает такую аннотацию не с этой инструкцией, а с предыдущей (скорее всего, это баг), а иногда — вообще не записывает аннотации в class-файл. Например, это происходит при компиляции такого кода:

    public static void main(String[] args) {
        Runnable one = (@TypeAnn("1") Runnable) () -> {
            Runnable two = (@TypeAnn("2") Runnable) () -> {
                Runnable three = (@TypeAnn("3") Runnable) () -> {
                    Runnable four = (@TypeAnn("4") Runnable) () -> {
                    };
                };
            };
        };
    }

В class-файле окажутся аннотированными только инструкции invokedynamic для внешних двух лямбд. А аннотации для внутренних двух лямбд компилятор проигнорирует. Это тоже скорее всего баг, я отправил его в Oracle, но подтверждения пока не получил. Буду надеяться, что с этим получится разобраться.

Вторая же проблема связана с тем, что лямбды — довольно странные создания в мире Java. Вызываются они как экземплярные методы, но на самом деле представляют собой методы статические. И этот корпускулярно-волновой дуализм создаёт концептуальную проблему для механизма сохранения-восстановления. Дело в том, что для оптимальной стратегии восстановления мы в теле экземплярного метода должны сохранить this (см схему). Но ссылка на экземпляр функционального интерфейса есть только у вызывающего кода! В конечном счёте, мы приходим к необходимости использовать сохранение аргументов перед выполнением лямбды, то есть, всё тот же вариант с patchable=false (который предназначался для обхода проблем с рефлекшеном). А он работает медленнее. Хотя, быть может, это и не критично по сравнению с неудобствами, которые доставляет необходимость прописывать patchable=false на каждой лямбде-сопрограмме.

Суммируя эти две проблемы, можно сделать неутешительный вывод: лямбды-сопрограммы использовать пока не рекомендуется.

Текущее состояние и планы


Проект доступен по адресу https://github.com/elw00d/jcoro. Сейчас доступен движок, набор тестов к нему и несколько примеров. Для доведения технологии до ума необходимо сделать следующее:

  1. В рамках доработок движка — оптимизировать генерацию stack map frames и порешать проблемы с лямбдами
  2. Написать maven и gradle плагины для инструментирования указанных jar-ников или наборов class-файлов
  3. Провести тестирование производительности, написав 3 сервера с одинаковой функциональностью. Один будет использовать блокирующую модель, второй — асинхронную на коллбеках (обычное nio, без jcoro), и третий — асинхронный с использованием jcoro. Нужно оценить, сколько кушает сохранение-восстановление контекста по сравнению с кодом, который этого не делает. Очень надеюсь, что это будет не слишком много.
  4. Разработка окружения. Самая важная часть. Нужно сделать врапперы и аналоги для самых важных библиотек. В первую очередь это, конечно, jdbc. Нужно придумать какой-то «стандарт» для асинхронного jdbc, а потом сделать для него реализации на самые популярные базы данных — mysql, postgresql, mssql. И затем — враппер для jcoro, который бы заворачивал асинхронные операции в сопрограммы. Сюда же — реализация какого-то примера каркаса для написания веб-приложений.
  5. Написать плагин к IntelliJ IDEA, который бы помогал в написании сопрограмм. Сопрограммы и точки восстановления бы как-то выделялись визуально, а ошибки при написании кода (нет аннотации @Await на точке восстановления, нет аннотации @Async) подсказывались при анализе исходника.
  6. Оформить документацию, написать внятный User Guide итд.

Если у вас появится желание помочь или попробовать jcoro в деле, welcome! Для публичной коммуникации, наверное, проще всего будет использовать Github Issues.

Комментарии (25)


  1. edolganov
    20.10.2015 11:40
    +3

    Академически — круто:
    показано, что можно прервать программу на середине, а потом восстановить выполнение, подсунув тот же стек.

    Практически — пока не видно удобства:
    вместо обещанного

    data = readSocket();
    

    мы получаем код, который визуально не проще работы с колбеками:
    Coro coro = Coro.initSuspended((@Async({@Await(value = "yield")}) ICoroRunnable) () -> {
    Coro.get().yield();
    });
    


    Спасибо автору за расширение моего кругозора, но видимо следующим шагом надо писать свой новый язык JVM, где можно использовать свои собственные ключевые слова async и await.


    1. elw00d
      20.10.2015 11:58

      Если писать сервер, то этот ужасный код будет только в самом верху стека обработки запроса, далее писать код можно обычными методами, только следя за аннотациями на точках восстановления. А все асинхронные операции достаточно один раз обернуть в сопрограммы и положить в утилитный класс, и больше его не трогать, а пользоваться ими как синхронными вызовами. См пример


    1. grossws
      20.10.2015 14:31
      +2

      следующим шагом надо писать свой новый язык JVM, где можно использовать свои собственные ключевые слова async и await
      Для желающих они реализованы в стандартной scala-library, на базе Future (в java8 запилили аналогичный, хотя и более простой, CompletableFuture).


  1. HDDimon
    20.10.2015 12:54
    +1

    Подскажите, а в чем корневое отличие данного подхода от Future и CompletableFuture?


    1. elw00d
      20.10.2015 13:08

      В том, что не нужно использовать promises и коллбеки, а можно просто писать код, как если бы он был синхронным.


  1. aparamonov
    20.10.2015 13:05
    +2

    По-моему слишком много магии.
    Akka и rx frameworks выглядят гораздо приятнее и надежнее.


    1. elw00d
      20.10.2015 14:02

      Конечно, надёжнее, ведь там не инструментируется байт-код :) но, я надеюсь, что и этот механизм будет доведён до должного уровня доверия


  1. ikirin
    20.10.2015 13:17

    А чем плох vert.x toolkit?


    1. elw00d
      20.10.2015 13:23

      Там нужно писать коллбеки, именно этого и хочется избежать.


      1. ikirin
        20.10.2015 13:39

        По-моему проще написать обертку для vert.x, чтобы избежать callback. Разве нет?


        1. ikirin
          20.10.2015 13:52

          Из документации vert.x 3: RxJava style APIs if you don't like callbacks.

          Думаю это как раз то что нужно чтобы избежать коллбеки.


          1. elw00d
            20.10.2015 14:00

            Может быть, я не туда смотрю, но тут всё на коллбеках: github.com/vert-x3/vertx-rx/blob/master/rx-java/src/main/asciidoc/java/index.adoc#async-result-support


            1. ikirin
              20.10.2015 16:28

              Я если правильно понимаю так:

              Action1<HttpServer> onNext = httpServer -> {};
              Action1<Throwable> onError = httpServer -> {};
              Action0 onComplete = () -> {};
              
              Handler<AsyncResult<HttpServer>> handler1 = RxHelper.toFuture(onNext);
              Handler<AsyncResult<HttpServer>> handler2 = RxHelper.toFuture(onNext, onError);
              Handler<AsyncResult<HttpServer>> handler3 = RxHelper.toFuture(onNext, onError, onComplete);
              


              1. elw00d
                20.10.2015 16:51

                Кажется, это те же коллбеки, записанные иначе (заранее сохранённые в отдельную переменную и потом использованные). Но дело даже не в том, что коллбеки плохи сами по себе. Проблема в неудобствах, которые возникают, когда нам нужно результат одной операции передать в следующую. Плюс, надо как-то уметь обрабатывать ошибки. А если писать синхронный код, таких проблем не возникает — всё просто и понятно. В этом суть подхода.


                1. ikirin
                  20.10.2015 17:20

                  Как же данная проблема решается в том же vert.x?


                  1. elw00d
                    20.10.2015 18:01

                    Предполагаю, что написанием этого неудобного кода :) не знаю, как решить эту задачу удобно и без сопрограмм.


                    1. ikirin
                      22.10.2015 12:53

                      Немного изучил тему. И опять же нашел вариант у то во же vert.x. Для задач, где необходимо выполнить методы последовательно и для проблемы с обработкой ошибок в частности предлагается использовать Vertx-Sync. Они так и пишут:

                      The non blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callback style, promises or Rx-style. Vert.x uses callback style in most places (although it also supports Rx).

                      In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do in sequence. Also error propagation is often more complex when using asynchronous APIs.

                      Vertx-sync allows you to work with asynchronous APIs, but using a direct synchronous style that you’re already familiar with.


                      Не знаю насколько это удобно, тут важно понять окупаются ли все эти неудобства с той мощностью что дает vert.x из коробки.


                      1. elw00d
                        22.10.2015 20:11

                        Посмотрел на Vertx-Sync, там написано, что оно работает на базе Quasar, который, как я понимаю, использует аналогичные jcoro механизмы. Ну то есть там тоже есть восстанавливаемый контекст выполнения и инструментирование байт-кода. Так что даже не знаю, что проще — взять одну jcoro или пробовать vertx-sync, который зависит от quasar и vertx, которые сами по себе не маленькие :)


  1. dtx12
    20.10.2015 13:17

    Вот, кстати, еще один живой проект github.com/puniverse/quasar, имеющий цели схожие с вашими. Без поддержки на уровне языка, все это все же выглядит не слишком удобно, как мне кажется.


    1. elw00d
      20.10.2015 13:22

      Согласен, поддержки на уровне языка очень не хватает…


      1. gridem
        20.10.2015 17:59

        В этом аспекте С++ выглядит интереснее: поддержки на уровне языка — нет, а, тем не менее, использовать корутины сильно проще.


  1. konsoletyper
    20.10.2015 15:43
    +2

    В своём проекте TeaVM я тоже сделал сопрограммы, вдохновившись опытом javaflow. Более того, я даже сделал эмуляцию тредов на них (ну а как ещё сделать треды в JS?), вот работающий пример.

    В отличие от javaflow код сильно не разбухает, т.к. я произвожу статический анализ всего кода, и могу определить, какие методы никогда не будут вызывать, прямо или косвенно, сопрограммы, и таким образом не транформировать их. Кроме того, я не добавляю код по сохранению состояния метода после каждого invoke'а, а просто вставляю goto на общий код (таким образом получается некоторый оверхед по размеру сохранённого состояния). Так же поддерживаются лямбды, но не поддерживается reflection, потому что сам TeaVM не поддерживает reflection.

    Большой проблемой было то, что вставляя развесистый tableswitch в начало метода, мы получаем чаще всего неприводимый граф потока, который нельзя перевести в структурные операторы JavaScript. Эту проблему я решил частично разрезанием графа, частично node splitting'ом.

    Вот пример того, как создаются сопрограммы:

        @Async static native String get(String url) throws IOException;
    
        static void get(final String url, final AsyncCallback<String> callback) {
            XMLHttpRequest xhr = XMLHttpRequest.create();
            xhr.overrideMimeType("text/plain; charset=x-user-defined");
            xhr.onComplete(() -> {
                if (xhr.getStatus() != 200) {
                    callback.error(new IOException("Error loading remote resource " + url + ". Status: " +
                            xhr.getStatus() + " " + xhr.getStatusText()));
                    return;
                }
                callback.complete(xhr.getResponseText());
            });
            xhr.open("get", url);
            xhr.send();
        }
    


    Ну а пример использования такого метода:

    System.out.println("Fetching data...");
    System.out.println(get("http://localhost:8080"));
    System.out.println("Complete");
    


    Соответственно, Complete будет напечатан после содержимого.

    К сожалению, всё очень сильно завязано на инфраструктуру TeaVM, и для просто Java работать не будет.


    1. elw00d
      20.10.2015 16:10

      Круто! Я не решился на статический анализ, решил для начала размечать аннотациями вручную. Статический анализ действительно хорошо работает в этом месте?


      1. konsoletyper
        20.10.2015 16:16

        К сожалению, статический анализ не будет работать в JVM. В TeaVM я сознательно с самого начало убил reflection ради возможности строить method call graph, и исходя из моих целей это было оправдано. Когда потом уже понадобилось делать сопрограммы, method call graph оказался очень кстати.


        1. TimReset
          21.10.2015 09:08
          +1

          Периодически наблюдаю за вашей TeaVM — с того момента, как Вы статью на хабре написали. Сам писал на GWT, т.ч. интерес был не праздный. Но статей уже давно небыло. Не планируете написать ещё стаью о текущем состоянии дел с TeaVM? На github находил статьи на английском, но может тут напишете?