Около года назад наша команда переписала бэкенд одного малоизвестного приложения с 5 млн. пользователей с использованием «latency and fault tolerance» Hystrix. Это позволило значительно повысить надежность приложения при падении или задержках в нижестоящих системах (их около 10, что для серьезной системы не много), предоставило замечательный инструмент (Hystrix Dashboard) мониторинга нагрузки внешних систем (теперь мы знаем кто тормоз), позволило оптимизировать размеры различных пулов в приложении. Однако, осталась проблема длительной обработки отдельных тяжелых запросов, решению которой и посвящена эта статья.

Почему вставить хистрикс было легко


Все плюшки были достигнуты минимальными усилиями и очень небольшими изменениями кода — Hystrix предоставляет синхронную модель программирования, в результате только небольшой код вызова http-клиентов был перенесен из потоков приложения в пулы потоков hystrix с минимально возможным количеством проблем (я не верю, что этих проблем можно избежать полностью).

Рассмотрим простой пример — команду CommandHelloWorld, которая с задержкой извлекает очень важные данные и клиентский код, который ее использует.

public class CommandHelloWorld extends HystrixCommand<String> {  
    @Override
    protected String run() throws InterruptedException {       
        // вот тут происходит очень важное обращение во внешнюю систему
        // все это работает в пуле потоков хистрикса
        Thread.sleep(random.nextInt(500));
        return "Hello " + name + "!";
    }
}

// клиентский код
String executeCommand(String str) {
        // вызов зависает на время, не большее чем установленный таймаут.
        return new CommandHelloWorld(str).execute();        
    }

Получилось очень простое решение с контролируемым временем выполнения. Конечно, со временем любое начальное преимущество обернется недостатками. Появились клиенты, которым нужно предоставить список из 200 элементов, для формирования каждого элемента надо один раз (или не один) обратиться во внешнюю систему (иногда не одну). Не все внешние системы поддерживают батчи, не любой код так уж легко распараллелить без ошибок — затратив 20 мс на один поход во внешнюю систему получаем увесистые 4 секунды на весь запрос, в течении которых пользователь страдал, а пул потоков томката не получал свой поток назад, при этом пул потоков хистрикса практически не был заполнен запросами. Пришло время это исправить.

Как нам обустроить хистрикс


Практически все приложение было построено по следующему паттерну:

  • получаем список с исходными данными
  • формируем из него stream
  • фильтруем/маппируем, с хистриксом в том числе
  • коллекционируем из stream список

Конечно, теоретически, в java stream есть операция parallel, но по факту в ней крайне плохое управление пулами потоков, в которых будет производиться работа, что ее использования было решено отказаться. Нужно было бы какое-то понятное решение, которое бы не сломало исходный Паттерн и не сломало моск команде. В результате было предложено 2 работающих решения — на основе reactivex.io и надстройкой над java stream.

Демонстрационный пример можно найти на гитхабе, все работающие примеры помещены в тесты.

Вариант 1. Добавим реактивности


Нужно заметить, что внутри хистрикс использует реактивную (это термин, не факт что она быстрая) библиотеку " асинхронных источников данных" reactivex.io. Статья не является руководством по этой не объятной теме, но будет показано одно элегантное решение. К несчастью, я не знаком с устоявшимся русским переводом термина observable, потому буду называть его источником. И так, желая не сломать Паттерн, мы будем действовать таким образом:

  • получаем список с исходными данными
  • формируем из него источник
  • фильтруем/маппируем источник, с хистриксом в том числе
  • коллекционируем из источника список

С одной стороны последняя операция не так проста, как кажется, но раз мы знаем, что все внешние связи контролируются хистриксом, а потому строго ограничены во времени, а собственный код приложения ничего толком не делает — мы можем считать, что источник данных в течении указанного таймаута обязан поднимать значение или же исключение будет выкинуто хистриксом до истечения таймаута (1 секунда). Потому будем использовать простую функцию-коллектор:

static List<String> toList(Observable<String> observable) {
        return observable.timeout(1, TimeUnit.SECONDS).toList().toBlocking().single();
}


Создавать команды и источники будем двумя функциями, пользуясь родным API:

     /**
     * создает горячую хистриксную команду-источник    
     */
    static Observable<String> executeCommand(String str) {
        LOG.info("Hot Hystrix command created: {}", str);
        return new CommandHelloWorld(str).observe();
    }

    /**
     * создает холодную хистриксную команду-источник    
     */
    static Observable<String> executeCommandDelayed(String str) {
        LOG.info("Cold Hystrix command created: {}", str);
        return new CommandHelloWorld(str).toObservable();
    }

И так, рассмотрим простой случай обработки списка их 6 элементов:

    public void testNaive() {
        List<Integer> source = IntStream.range(1, 7).boxed().collect(Collectors.toList());
        Observable<String> observable = Observable.from(source)
                .flatMap(elem -> executeCommand(elem.toString()));
        toList(observable).forEach(el ->LOG.info("List element: {}", el));
    }

Все замечательно параллельно отрабатывает приблизительно за 500мс, логи программы подтверждают одновременное использование потоков хистрикса. Как побочный эффект — элементы располагаются в списке в случайном порядке. Такова цена реактивности.

Попробуем увеличить размер списка до 49 — и получим закономерный облом:

    public void testStupid() {
        List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList());

        Observable<String> observable = Observable.from(source)
                .flatMap(elem -> executeCommand(elem.toString()));

        toList(observable).forEach(el ->LOG.info("List element: {}", el));
    }

Вот такой смешной выхлоп в лог:

[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 1
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 2
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 3
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 4
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 5
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 6
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 7
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 8
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 9
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 10
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 11
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5
[hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2
[hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9
[hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6
[hystrix-ExampleGroup-10] INFO org.silentpom.CommandHelloWorld - Command start: 10
[hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3
[main] ERROR org.silentpom.RxHystrixTest - Ooops
com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.

Дело в том, что по дефолту хистрикс создает пул из 10 потоков с нулевой очередью. При подписке на источник все его элементы очень быстро эмитируются, мгновенно переполняя весь пул, не помогло даже создание холодного источника. Нам такой хистрикс не нужен. Требуется разумно ограничить жадность одного клиента.

Решение обнаружилось достаточно простое:

  1. Для начала мы не будет использовать flatMap, чтобы подписка на источник не вызывала создание всех команд. А создадим двойной источник методом map.
  2. сгруппируем эти источники методом window — получим тройной источник!
  3. пришло время строго упорядочить тройные источники — выпускаем их один за одним методом concatMap
  4. каждый двойной источник параллельно вычислим методом flatMap

Код оказался удивительно компактным, хотя на понимание его работы ушло много времени:

public void testWindow() {
        List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList());

        Observable<String> observable =  Observable.from(source)
                        .map(elem -> executeCommandDelayed(elem.toString()))
                        .window(7)
                        .concatMap(window -> window.flatMap(x -> x));

        toList(observable).forEach(el ->LOG.info("List element: {}", el));
    }

Посмотрим фрагмент логов:

[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21
[hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3
[hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4
[hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1
[hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6
[hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2
[hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4
[hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 11
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 12

Из логов приложения видно, что процесс запуска новых команд приостановился после запуска 7й команды. Получилась очень простая модификация кода, которая позволяет запускать запросы к внешним системам с нужной степенью параллельности, не забивая собой весь пул.

Вариант 2. Нафиг реактивность! У нас есть екзекуторы


Редкий человек понимаем rx. Если даже он говорит обратное — попросите написать код выше своими словами. Но ведь в java 8 и так есть stream и future, хистрикс вроде умеет работать с future как с родной, давайте попробуем запустить параллельную обработку с их помощью. Создавать future будем так:

  // просим хистрикс запустить команду и вернуть фьючу
  static Future<String> executeCommandDelayed(String str) {
        LOG.info("Direct Hystrix command created: {}", str);
        return new CommandHelloWorld(str).queue();
    }

  // по старинке синхронно вызываем хистрикс, все в ручную
  static String executeCommand(String str) {
        LOG.info("Direct Hystrix command created: {}", str);
        return new CommandHelloWorld(str).execute();
    }

Пробуем обработать список из 49 элементов:
public void testStupid() {
            IntStream.range(1, 50).boxed().map(
                    value -> executeCommandDelayed(value.toString())
            ).collect(Collectors.toList())
                    .forEach(el -> LOG.info("List element (FUTURE): {}", el.toString()));
}

и снова знакомая проблема.

[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 2
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 3
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 4
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 5
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 6
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 7
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 8
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 9
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 10
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 11
[main] ERROR org.silentpom.stream.ParallelAsyncServiceTest - Ooops
com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.

В данном случае порядком создания команд управляет сплиттератор, то есть вообще совершенно непрозрачно и опасно. Постараемся решить проблему в 2 этапа, сохраняя достаточно привычную технику работы со стримом:

1) замапим стрим исходных данных в стрим фьюч. Причем с контролем реально запущенных на исполнение задач. Для этого нам потребуется промежуточный экзекьютор с нужной нам степенью параллельности
2) превратим стрим фьюч в стрим значений. держа в уме, что каждая фьюча в конечном итоге исполняется хистриксом и имеет гарантированное время выполнения.

Для реализации первого шага сделаем отдельный оператор parallelWarp по преобразованию пользовательской функции, для второго шага придется написать функцию waitStream, принимающую и возвращающую стримы:

  public void testSmart() {
        service.waitStream(
                IntStream.range(1, 50).boxed().map(
                        service.parallelWarp(
                                value -> executeCommand(value.toString())
                        )
                )
        ).collect(Collectors.toList())
                .forEach(el -> LOG.info("List element: {}", el));
    }

получилась почти привычная для пользователей стримов запись. Посмотрим что под капотом, это последний фрагмент кода на сегодня:
// по традиции threadSize = 7
 public ParallelAsyncService(int threadSize) {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("parallel-async-thread-%d").build();
// создаем промежуточный экзекутор, который будет создавать команды хистрикса
        executorService = Executors.newFixedThreadPool(threadSize, namedThreadFactory);
    }

    /**
     * Maps user function T -> Ret to function T -> Future<Ret>. Adds task to executor service
     * @param mapper user function
     * @param <T> user function argument
     * @param <Ret> user function result
     * @return function to future
     */
    public <T, Ret> Function<T, Future<Ret>> parallelWarp(Function<T, Ret> mapper) {
        return (T t) -> {
            LOG.info("Submitting task to inner executor");
            Future<Ret> future = executorService.submit(() -> {
                LOG.info("Sending task to hystrix");
                return mapper.apply(t);
            });
            return future;
        };
    }

    /**
     * waits all futures in stream and rethrow exception if occured
     * @param futureStream stream of futures
     * @param <T> type
     * @return stream of results
     */
    public <T> Stream<T> waitStream(Stream<Future<T>> futureStream) {
        List<Future<T>> futures = futureStream.collect(Collectors.toList());

        // wait all futures one by one.
        for (Future<T> future : futures) {
            try {
                future.get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();

                if (cause instanceof RuntimeException) {
                    throw (RuntimeException) cause;
                }
                throw new RuntimeException(e);
            }
        }

        // all futures have completed, it is safe to call get
        return futures.stream().map(
                future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null; // не должен вызываться вообще
                    }
                }
        );

Метод waitStream очень прост, только обработка ошибок его испортила. Оператор parallelWarp крайне прост и наверняка имеет специальное название у адептов функционального программирования. Новые хистрикс команды создаются только внутренним экзекутором, который имеет нужную нам степень параллельности. Пруфлинк:

main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 18
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 19
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1
[hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2
[hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6
[hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7
[hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2
[hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5
[hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3
[hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4
[hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1
[hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 11
[hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 12
[hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8
[hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 13
[hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9

При данном подходе нам потребовался дополнительный пул потоков на каждый пул потоков хистрикса, зато список на выходе сохранил порядок. Какой из подходов выиграет в приложении — время покажет.

Повторюсь, что все примеры можно посмотреть в тестах на гитхабе. Буду рад знаменитой хаброкритике.

Комментарии (0)