В последнее время я довольно часто сталкиваюсь с оптимизацией процессов загрузки и обработки информации. Периодически вижу не самые оптимальные, а то и вредные для производительности решения причем от разработчиков middle и senior уровня. Поэтому хотелось бы подробнее описать общий подход к написанию правильных и быстрых многопоточных архитектур с максимальной утилизацией ресурсов, но при этом с минимально возможным в каждом отдельном случае количеством потоков.

Что вообще можно ускорять с помощью многопоточной работы?

В первую очередь нужно вообще понять, возможно ли ускорить с помощью Thread Pool конкретно тот кусок кода или системы, с которым вы работаете.

Ответ “нет” если:

  • Процесс уже работает микросекунды/миллисекунды и выполняется быстрее, чем необходимо по требованиям/техзаданию

  • Утилизация основного ресурса, на который опирается алгоритм, уже достигает постоянных 100%

Теперь подробнее по каждому пункту “нет”.

Алгоритм уже укладывается в требования

Само по себе использование Thread Pools ведет к увеличению расхода памяти и ресурсов, поэтому если ваш алгоритм уже укладывается в SLA, то нет смысла все усложнять. Само по себе добавление потоков усложняет понимание кода, а также добавляет сразу пачку неочевидных потенциальных проблем и багов. Тут как никогда отлично применяются принципы KISS (Keep It Simple, Stupid) и YAGNI (You aren’t Gonna Need It).

Утилизация одного из ресурсов уже составляет постоянные 100%

О каких ресурсах идет речь? В первую очередь – CPU, Disk IO, Network. Это именно те ресурсы, которые можно забивать, увеличивая количество потоков вашего процесса. Таким образом, если один из ресурсов уже утилизирован по максимуму, то ничего ускорить мы не сможем. Другое дело, что современное железо сложно забить на 100% в один поток. Если это произошло, то стоит сперва обратить внимание на то, как вы их используете: возможно стоит рассмотреть, например, возможность замены случайного доступа к диску на последовательный. Далее будем говорить о системе в которой стоит CPU с 8 ядрами, для примера. Итак, вопрос. Почему мы говорим о каких-то еще ресурсах, если сами потоки работают на ядрах? Дело в том, что при работе со всеми другими ресурсами, кроме CPU, сами потоки очень часто находятся в состоянии ожидания новых данных, будь то чтение или сохранение данных на диск или загрузка чего-то по сети. Таким образом диспетчер задач операционной системы имеет достаточно времени CPU, чтобы запускать на наших 8 ядрах сотни потоков. 

Теперь возвращаемся к ресурсам и посмотрим пример. Предположим, наш канал 1 Гбит/сек, и мы парсим данные с удаленных серверов, средняя скорость которых 10 мбит.

Решение с одним потоком

//single thread
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  Data data = downloadProcessor.getData();
  process(data);
}

Чтобы забить такой канал на максимум понадобится запустить 100 потоков. 100 * 10 Мбит/сек = 1 Гбит/сек.

Решение в 100 потоков

//100 threads
private static final ExecutorService executorService = Executors.fixedThreadPool(100);
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  executorService.submit(() -> {
    Data data = downloadProcessor.getData();
    process(data);
  });
}

Забили канал. Mission Complete. Теперь посмотрим внимательнее. Решение имеет недостатки. 

В общем случае любое задаче требуются разные ресурсы в разных стадиях. Сначала нужно что-то откуда-то загрузить(нагружаем сеть или диск), потом это обработать (что обычно делается на CPU), а потом сохранить (опять сеть или диск).

Например, парсинг результатов загрузки занимает 1 секунду процессорного времени. В нашем случае произойдет следующее: в первой части программы весь executorService забьется каким-то количеством задач на загрузку. Если все они требуют примерно одинакового времени для работы, то через некоторый промежуток времени мы получим 100 потоков, в каждом из которых алгоритм дойдет до выполнения метода process. Итого: 100 потоков, которые хотят получить 1 секунду процессорного времени, а это значит 100 задач будут выполняться на 100 потоках уже после выполненной загрузки данных,100 / 8 ~= 13 секунд. И все эти 13 секунд сеть будет простаивать, так как все работающие потоки будут заниматься обработкой загруженных данных. В более сложном случае сами задачи будут требовать разного количества ресурсов. Часть задач могут загружаться 100 секунд и обрабатываться 10 мсек, другая часть задач может загружаться 10 мсек и требовать 100 сек работы ядра.

Конечно, в любом случае выигрыш в скорости работы перед обычным однопоточным приложением будет существенный, но ресурсы будут забиваться неравномерно и, в среднем, не на 100%. 

Что можно сделать? Разобьем нашу задачу на 2 подзадачи, которые утилизируют разные ресурсы, и добавим еще один пул потоков, который отдельно будет утилизировать CPU, не мешая при этом потокам занимающимся загрузкой данных. На 8 ядер нам понадобится ровно 8 CPU-heavy потоков чтоб утилизировать их на 100%.

Конвейер в 100+8 потоков

//100+8 threads
private static final ExecutorService downloadExecutorService = Executors.fixedThreadPool(100);
private static final ExecutorService processExecutorService = Executors.fixedThreadPool(8);
List<Future<Data>> futureDataResults = new ArrayList<>();
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  futureDataResults.add(downloadExecutorService.submit(() -> downloadProcessor.getData()));
}

for (Future<Data> futureData : futureDataResults) {
  processExecutorService.submit(() -> process(futureData.get()));
}

Выглядит это лучше, хотя все еще не идеально. У такого решения есть один главный недостаток (хотя иногда может и преимущество): мы не можем запускать в обработку результат третьей загрузки, пока не запустим результат второй, и если вторая загрузка будет загружаться час, то второй пул будет простаивать очень долго без работы. Из плюсов: простой, почти последовательный алгоритм, который не требует особых знаний о том, как организовывать многопоточную работу и читается очень легко(опять же почти последовательный). И он еще лучше будет читаться если, например, вам нужно загружать данные из разных мест, а потом обрабатывать их вместе. Тогда по двум массивам можно будет пробежаться и взять элементы по одному и тому же индексу. Здесь есть несколько вариантов сделать это хотя бы так, чтоб кровь из глаз не пошла. С использованием очередей, CompletableFuture или с использованием паттерна Observer, когда одни задачи будут уведомлять другие о том, что результаты можно брать в обработку. В любом случае, все это подвиды колбэков, которые мы можем отдавать в нашу задачу, чтоб она выполняла переданный ей кусок кода, которому передаст результат выполнения. Этот кусок кода просто должен добавлять следующий этап задачи в следующий Thread Pool.

Конвейер в 100+8 потоков с коллбэками

while (isNextDownloadTaskAvailable()) {
   DownloadProcessor downloadProcessor = downloadService.createProcessor();
   DownloadTask downloadTask = new DownloadTask(downloadTask, data -> {
       CalculateProcessor calculateProcessor = calculateService.createProcessor(data);
       CalculateTask calculateTask = new CalculateTask(calculateProcessor, results::add);
       calculateExecutorService.submit(new CalculateTaskCallbackProxy(calculateTask));
   });
   downloadExecutorService.submit(downloadTaskCallbackProxy);
}

Конвейер в 100+8 потоков на CompletableFuture

while (isNextDownloadTaskAvailable()) {
   DownloadProcessor downloadProcessor = downloadService.createProcessor();
   DownloadTask downloadTask = new DownloadTask(downloadProcessor);
   CompletableFuture<DownloadData> downloadResultFuture = CompletableFuture
           .supplyAsync(downloadTask::call, downloadExecutorService);
   CompletableFuture<Long> calcResultFuture = downloadResultFuture
           .thenApplyAsync(data -> {
               CalculateProcessor calculateProcessor = calculateService.createProcessor(data);
               CalculateTask calculateTask = new CalculateTask(calculateProcessor);
               return calculateTask.call();
           }, calculateExecutorService);
}

Все запускаемые примеры, с которыми можно поиграться, доступны в репозитории.

Всякие подобные заметки я периодически записываю у себя в телеграм канале.

Комментарии (12)


  1. BugM
    15.11.2021 02:34
    +6

    Как же вы переусложнили простейший код. И насажали багов с гонками. Гонка будет в некоторых ваших вариантах междуisNextDownloadTaskAvailable и nextTask

    У вас есть два бизнес процесса котрые требуют ресурсы. Загрузка даных и обработка того что загрузили. И некая внешняя функция которая дает задания.

    Ну так сделайте простейшие очереди. И два пула размеры которых можно покрутить и тонко все настроить.

    Есть задача на загрузку? Ставим в очередь.

    Есть загруженная задача? Обрабатываем.

    Загрузили слишком много и не успеваем обработать? Ждем пока обработается.

    Основной код

    while(isNextDownloadTaskAvailable()) {
        DownloadTask downloadTask = nextTask();
        downloadExecutorService.submit(() -> {
            DownloadProcessor downloadProcesor = downloadService.createProcessor(downloadTask);
            Data data = downloadProcessor.getData();
            processExecutorService.submit(() -> {
                process(data);
            });
        });
    }

    Объявление экзекуторов. У меня копипаста с SO. В любом живом проекте такое куда-то в общие библиотеки всегда вытащено. Всем нужны блокирующие экзекуторы.

    int DOWNLOAD_TREADS_COUNT = 10;
    int PROCESS_TREADS_COUNT = 10;
    
    BlockingQueue<Runnable> downloadQueue = new ArrayBlockingQueue<>(DOWNLOAD_TREADS_COUNT);
    ThreadPoolExecutor downloadExecutorService =
            new ThreadPoolExecutor(1, DOWNLOAD_TREADS_COUNT, 30, TimeUnit.SECONDS, downloadQueue);
    downloadExecutorService.setRejectedExecutionHandler((r, executor) -> {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    });
    
    BlockingQueue<Runnable> processQueue = new ArrayBlockingQueue<>(PROCESS_TREADS_COUNT);
    ThreadPoolExecutor processExecutorService =
            new ThreadPoolExecutor(1, PROCESS_TREADS_COUNT, 30, TimeUnit.SECONDS, processQueue);
    processExecutorService.setRejectedExecutionHandler((r, executor) -> {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    });

    Катим в прод или на нагрузочное тестирование и крутим размеры пулов так чтобы результат нас устраивал.

    Загруженное, но необработанное надо где-то хранить... А память не резиновая. Поэтому схема именно такая.


  1. pin2t
    15.11.2021 08:47
    +1

    Мне кажется минимизация количества потоков уже давно не актуальная задача. С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.

    И вся эта возня с Future, CompletableFuture, пайплайнами callback-ов, тред пулами и перекладыванием данных из очереди в очередь ушла в прошлое.

    Сейчас вполне можно просто создать по потоку на загрузку и пусть они выполняют все этапы последовательно

    ExecutorService pool = Executors.newCachedThreadPool();
    for (Task t : tasks) {
    		pool.submit(() -> {
        		Data data = Data.parse(t.download());
            data.process();
        });
    }

    А если хочется чего-то ограничить, то можно ипользовать старый добрый семафор

    Semaphore processors = new Semaphore(Runtime.getAvailableProcessors());
    ExecutorService pool = Executors.newCachedThreadPool();
    for (Task t : tasks) {
        pool.submit(() -> {
           String raw = t.download();
           processors.acquire();
           try {
           		Data data = Data.parse(raw);
              data.process();
           } finally {
           		processors.release();
           }
        });
    }

    Перекладывание из очереди в очередь имеет смысл когда загрузка и обработка происходят на разных машинах, у которых нет общего локального стека.

    А на одной машине все эти очереди, явные и неявные, через .then конструкции, просто все значительно усложняют без реальной на то причины.


    1. lair
      15.11.2021 09:44

      С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.

      Это зависит от того, какие у у потока накладные расходы. Например, .NET (по крайней мере, раньше) аллоцировал на каждый поток 4Mb под стек. Сотни тысяч потоков дают вам сотни гигабайт аллокации — не каждый "современный ноутбук" такое выдержит.


      1. pin2t
        15.11.2021 09:49

        Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.

        В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать.

        Но в 64-битных системах с этим нет проблем, адресное пространство большое.


        1. lair
          15.11.2021 09:55
          +1

          Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.

          Не в случае .NET. Там память именно "реально выделялась". Поэтому нехватка памяти когда потоков слишком много была реальной опасностью.


          В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать. [...] Но в 64-битных системах с этим нет проблем, адресное пространство большое.

          На 32-битных системах .NET выделял по 1Мб на поток.


    1. asundukov Автор
      15.11.2021 13:21
      +1

      Если наш канал 1 Гбит и мы поставим на загрузку одновременно 10к задач в 10к потоков - то сначала мы задушим свой же CPU, RAM и сеть т.к. забьем память бесполезными тредами с высокими IO и будем загружать со средней скоростью в <100 кбит. А если больше 60к потоков будет - то еще и порты на хост машине забьем. Какие сотни тысяч потоков?


      1. pin2t
        15.11.2021 14:05
        +1

        Если есть ограничения на количество соединений - нужно их ограничивать с помощью семафора в функции download.

        В случае с пайплайнами и тредпулами основной высокоуровневый алгоритм, бизнес логика так сказать

        • скачать данные

        • распарсить

        • что-то обработать

        оказывается разбит на мелкие мелкие кусочки, а чтобы связать эти кусочки воедино нужны промежуточные структуры Future, очереди и коллбэки. А из-за того что кусочки мелкие и находятся в разных местах кода - весь алгоритм целиком очень сложно понять, что-то в нем изменить, исправить, и очень легко всовершить ошибку. Как раз при передачи чего-то во вспомогательной структуре Future

        В случае же если создавать под каждую задачу отдельный поток, общий алгоритм, бизнес логика, написан в одной функции и выполняется последовательно. Его проще понять и меньше шанс совершить ошибку. То что создается много потоков, это не проблема для современных машин. А количество сокетов и одновременных вычислений надо ограничивать, да.


  1. dyadyaSerezha
    15.11.2021 19:18

    Сдаётся мне, что первый алгоритм с названием "Конвейер в 100+8 потоков" логически неверный. Там сначала стартуем загрузку всех задач, а потом уже их обработку. Обычно мы не знаем, сколько задач и просто загружаем, пока не какой-то маркер <eof>. То есть, либо загрузку и обработку надо делать в том же цикле, либо через очередь - 100 потоков грузят и пишут задачи в очередь, а другие 8 потоков берут загруженные задачи из очередити обрабатывают их. И как уже писали выше, обычно размер очереди ограничивают.


    1. asundukov Автор
      15.11.2021 22:18

      там сначала наполняется очередь, а потом разгребаются результаты, конечно - это палец, высосанный из примера, и реальные кейсы могут немного отличаться по эндпоинту поступления задач и по характеру агрегации результата. И, конечно, там есть минусы и они сразу после куска кода подписаны :)


  1. Artyomcool
    15.11.2021 22:32

    Если действительно потребовалось выдавливать предельную многопоточную производительность, то возможно стоит посмотреть в сторону https://openjdk.java.net/projects/loom/ .

    Если это кажется слишком сложным для вашей проблемы, то возможно она не стоит и предложенных усилий, и оригинальный вариант вполне неплох.


  1. akpaevj
    16.11.2021 19:38

    Для pipe в .net есть прекрасная библиотека TPL, с помощью которой можно довольно гибко регулировать нагрузку на железо.


  1. therealalexz
    20.11.2021 22:31
    -1

    если всего 8 ядер на тачке, зачем 100 тредов? они же друг у друга будут ресурсы забирать и потеряете на их частые переключения