Неблокирующий повтор (retry) в Java и проект Loom


Введение


Повтор (retry) операции является старейшим механизмом обеспечения надежности программного обеспечения. Мы используем повторы при выполнении HTTP запросов, запросов к базам данных, отсылке электронной почты и проч. и проч.


Наивный повтор


Если Вы программировали, то наверняка писали процедуру какого либо повтора. Простейший повтор использует цикл с некоторым ожиданием после неудачной попытки. Примерно вот так.


    static <T> T retry(long maxTries, Duration delay, CheckedSupplier<T> supp) {
        for (int i = 0; i < maxTries; i++) {
            try {
                return supp.get();
            } catch (Exception e) {
                if (i < maxTries - 1) { //not last attempt
                    try {
                        Thread.sleep(delay.toMillis());
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt(); //Propagate interruption 
                        break;
                    }
                }
            }
        }
        throw new RuntimeException("Retry failed after %d retries".formatted(maxTries)); 
    }

Вот пример использования повтора для получения соединения к базе данных. Мы делаем 10 попыток с задержкой 100 msec.


    Connection retryConnection() {
        return retry(10, Duration.ofMillis(100), () -> DriverManager.getConnection("jdbc:foo"));
    }

Этот код блокирует Ваш поток на одну секунду (точнее 900 milliseconds, мы не ждем после последней попытки) потому что Thread.sleep() является блокирующей операцией. Попробуем оценить производительность метода в терминах количества потоков (threads) и времени. Предположим нам нужно сделать 12 операций повтора. Нам потребуется 12 потоков чтобы выполнить задуманное за минимальное время 1 сек, 6 потоков выполнят повторы на 2 сек, 4 — за три секунды, один поток — за 12 секунд. А что если нам потребуется выполнить 1000 операций повтора? Для быстрого выполнения потребуется 1000 потоков (нет, только не это) или 16 минут в одном потоке. Как мы видим этот метод плохо масштабируется.


Давайте проверим это на тесте


    public void testNaiveRetry() throws Exception {
        ExecutorService ex = Executors.newFixedThreadPool(4);       //4 threads
        Duration dur = TryUtils.measure(() -> {
            IntStream.range(0, 12)                                  //12 retries
            .mapToObj(i -> ex.submit(() -> this.retryConnection())) //submit retry
            .toList()                                               //collect all 
            .forEach(f -> Try.of(()->f.get()));                     //wait all finished
        });
        System.out.println(dur);                                    
    }

Output: PT2.723379388S

Теоретически: 900 ms * 3 = 2.7 sec, хорошее совпадение


Неблокирующий повтор


А можно ли делать повторы не блокируя потоки? Можно, если вместо Thread.sleep() использовать перепланирование потока на некоторый момент в будущем при помощи CompletableFuture.delayedExecutor(). Как это сделать можно посмотреть в моем классе Retry.java. Кстати подобный подход используется в неблокирующем методе delay() в Kotlin.


Retry.java — компактный класс без внешних зависимостей который может делать неблокирующий асинхронный повтор операции ( исходный код, Javadoc ).
Полное описание возможностей Retry с примерами можно посмотреть тут


Так можно сделать повтор, который мы уже делали выше.


    CompletableFuture<Connection> retryConnection(Executor ex) {
        return Retry.of(() -> DriverManager.getConnection("jdbc:foo"))
            .withFixedDelay(Duration.ofMillis(100))
            .withExecutor(ex)
            .retry(10);
    }

Мы видим что здесь возвращается не Connection, а CompletableFuture<Connection>, что говорит об асинхронной природе этого вызова. Давайте попробуем выполнить 1000 повторов в одном потоке при помощи Retry.


    public void testRetry() throws Exception {
        Executor ex = Executors.newSingleThreadExecutor(); //Один поток
        Duration dur = TryUtils.measure(() -> {      //Это удобная утилита для измерения времен
            IntStream.range(0, 1_000)                //1000 раз
            .mapToObj(i -> this.retryConnection(ex)) //запускаем операцию повтора
            .toList()                                //собираем future в список
            .forEach(f -> Try.of(()->f.get()));      //дожидаемся всех результатов
        });
        System.out.println(dur);                     //печатаем прошедшее время
    }

Output: PT1.065544748S    

Как мы видим, Retry не блокируется и выполняет 1000 повторов в одном потоке чуть-чуть более одной секунды. Ура, мы можем эффективно делать повторы.


Причем здесь проект Loom?


Проект Loom добавляет в JDK 19 виртуальные потоки (пока в стадии preview). Цель введения виртуальных потоков лучше всего описана в JEP 425 и я рекомендую это к прочтению.


Возвращаясь к нашей теме повтора операций, коротко скажу что Thread.sleep() более не является блокирующей операцией будучи выполняемой в виртуальном потоке. Точнее говоря, sleep() приостановит (suspend) виртуальный поток, давая возможность системному потоку (carrier thread) переключиться на выполнение других виртуальных потоков. После истечения срока сна, виртуальный поток возобновит (resume) свою работу. Давайте запустим наивный алгоритм повтора в виртуальных потоках.


    var dur =TryUtils.measure(() -> {
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        IntStream.range(0, 1_000)
        .mapToObj(i -> executor.submit(() -> retryConnection()))
        .toList()
        .forEach(f -> Try.of(() -> f.get()));
    });
    System.out.println(dur);

Output: PT1.010342284S

Поразительно, имеем чуть более одной секунды на 1000 повторов, как и при использовании Retry.


Проект Loom принесет кардинальные изменения в экосистему Java.


  • Стиль виртуальный поток на запрос (thread per request) масштабируется с почти оптимальным использованием процессорных ядер. Этот стиль становится рекомендуемым.


  • Виртуальные потоки не усложняют модель программирования и не требуют изучения новых концепций, автор JEP 425 говорит что скорее нужно будет забыть старые привычки ("it may require unlearning habits developed to cope with today's high cost of threads.")


  • Многие стандартные библиотеки модифицированы для совместной работы с виртуальными потоками.Так например, блокирующие операции чтения из сокета станут неблокирующими в виртуальном потоке.


  • Реактивный/асинхронный стиль программирования становится практически не нужным.



Нас ждут интересные времена в ближайшем будущем. Я очень надеюсь что виртуальные потоки станут стандартной частью JDK в очередной LTS версии Java.


Сергей Копылов
skopylov@gmail.com
Резюме
Смотреть код на Github

Комментарии (7)


  1. ggo
    01.12.2022 10:13

    Реактивный/асинхронный стиль программирования становится практически не нужным.

    Если так, то как реализовать логику: после того как выполнилось это, выполни вот то в неблокирующем стиле?


    1. Vamp
      01.12.2022 11:04
      +3

      это();
      то();


      1. sergeykopylov Автор
        01.12.2022 16:36
        +1

        Совершенно верно. Для виртуальных потоков нужно писать прямой последовательный код без всякой реактивщины. И этот факт вызывает у меня восхищение.


        1. ggo
          02.12.2022 10:23
          -1

          ну справедливости ради, реактивщина используется для экономии ресурсов, а под ресурсами может быть не только треды, но и что угодно - сетевые пулы, файловые пулы и т.д.


      1. ggo
        02.12.2022 10:13
        +1

        до чего техника дошла... (с)


  1. KivApple
    01.12.2022 15:04
    +1

    По факту это и есть та же самая асинхронность, только с поддержкой в языке вместо построения абстракций только на уровне библиотек. В том плане, что можно вообразить гипотетический транслятор такого кода в классический асинхронный код и его логика работы будет весьма однозначной.


  1. ra314ra
    02.12.2022 13:47

    Поглядел в реализацию CompletableFuture.delayedExecutor() - думал увижу что оно неблокирующе работает через механизмы epoll ОС, наподобие тому как оно устроено с сокетами/фаловыми опреациями...

    Но оказалось все банально: всего лишь скедулим ретрай на экзекуторе как runnable, типа как executor.submit(...) и естественно, имея под экзекутором очередь. Так что да - неблокирующе, но никакой магии господа, расходимся... я такие ретраи и раньше сам писал, до того как появился CompletableFuture