Многопоточная гидра больно кусается
Многопоточная гидра больно кусается

Продолжаем серию статей, посвященных разным прикладным концептуальным решениям, которые могут существенно "прокачать" производительность вашего Node.js-приложения.

В прошлой статье мы рассмотрели реализацию эффективной очереди на основе "эластичного" кольцевого буфера, а в этой попробуем разобраться с особенностями использования модуля Worker threads в Node.js - какие проблемы внедрения многопоточности будут нас ждать при попытках сделать код более производительным, и узнаем, как их можно обойти, применяя типовые концепты.

Начнем с достаточно типовой задачи: мы получаем некоторые сообщения, и нам их надо как-то обработать. В качестве тестового примера сгенерируем эти сообщения самостоятельно, и посмотрим, за какое минимальное время мы сможем вычислить SHA-256-хэш для каждого из них.

Возьмем совсем простой пример: 4096 случайно сгенерированных "сообщения" объемом 64KB каждое (все программисты любят красивые числа) прохэшируем прямо в основном потоке:

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const tsg = hrtime();
const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
console.log('generated:', Number(hrtime() - tsg) / 1e6 | 0, 'ms');

const tsh = hrtime();
const hashes = messages.map(data => createHash('sha256').update(data).digest('hex'));
console.log('hashed:   ', Number(hrtime() - tsh) / 1e6 | 0, 'ms');

В данном случае нам интересны не результаты "в абсолюте", а хотя бы порядок значений, от которых мы сможем оттолкнуться при дальнейших модификациях:

generated: 279 ms
hashed:    914 ms

Вспомогательный поток

Обычно первая попытка внедрения многопоточности начинается с "я слышал, что CPU-нагруженные задачи можно эффективно распараллелить с помощью потоков!"

Конечно, полностью цитата из документации выглядит чуть иначе:

Workers (threads) are useful for performing CPU-intensive JavaScript operations. They do not help much with I/O-intensive work. The Node.js built-in asynchronous I/O operations are more efficient than Workers can be.

В том смысле, что для CPU-нагруженных задач потоки могут помочь (а могут ведь и не помочь, поскольку вовсе не обязаны), а для I/O-нагруженных можете даже и не пытаться.

Но у нас-то как раз в чистом виде CPU-intensive, так что мы все-таки попытаемся. И начнем с элементарного, срисованного с мануала, варианта с одним вспомогательным потоком:

const {
  Worker
, isMainThread
, parentPort
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

if (isMainThread) {
  // это основной поток
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
  console.log('generated:', Number(hrtime() - tsg) / 1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined); // готовим пустой массив для результатов
  let remain = hashes.length; // счетчик ожидающихся результатов

  let tsh;
  
  const worker = new Worker(__filename); // тут мы порождаем поток на основе текущего файла
  worker
    .on('online', () => {     // поток готов начинать работу
      tsh = hrtime();         // начинаем замер времени
      messages.forEach((data, id) => worker.postMessage({id, data})); // передаем ID вместе с сообщением
    })
    .on('message', ({id, hash}) => {                                  // принимаем ID вместе с результатом
      hashes[id] = hash;
      if (!--remain) {        // счетчик кончился, получены все результаты
        console.log('hashed:   ', Number(hrtime() - tsh) / 1e6 | 0, 'ms');
        worker.unref();       // отвязываем вспомогательный поток, позволяя основному завершиться
      }
    });
}
else {
  // это вспомогательный поток
  parentPort.on('message', ({id, data}) => {                          // принимаем ID вместе с сообщением
    parentPort.postMessage({                                          // передаем ID вместе с результатом
      id
    , hash : createHash('sha256').update(data).digest('hex')
    });
  });
}

Тут мы знакомимся с первыми концептами:

  • если вам необходимо как-то увязать переданную в поток задачу и ответ на нее, то передача связующего ID туда-обратно - на вашей совести.

  • все асинхронно, поэтому нет понятия порядка операций, откуда следует, что ...

  • вам необходимо считать количество ответов, чтобы узнать, когда наступит момент "все готово".

Если схематично изобразить алгоритм работы этого кода, то получится что-то такое:

Схема работы со вспомогательным потоком
Схема работы со вспомогательным потоком

Но если верить этой схеме, время обработки может даже увеличиться за счет всех этих пересылок. Давайте же запустим наш код, и... время обработки выросло на 22.5%!

generated: 276 ms
hashed:    1121 ms

Так что тут нас настигает следующий момент:

  • издержки на общение с потоком могут свести на нет весь выигрыш от многопоточности.

Слишком много потоков

Но мы тут все говорим про многопоточность, а поток-то у нас пока всего один! Наверное, нам их просто не хватило, чтобы стало быстрее. Давайте будем создавать на каждую микрозадачу по потоку, передавая ее сразу через workerData - заодно и сэкономим в основном потоке на необходимости дожидаться события 'online' и вызывать worker.postMessage.

const {
  Worker
, isMainThread
, parentPort
, workerData // для приема стартовой информации в потоке
} = require('node:worker_threads');

// ...

if (isMainThread) {
  // ...
  
  const tsh = hrtime();
  messages.forEach((data, id) => {
    const worker = new Worker(__filename, { // каждой задаче - свой worker
      workerData : {id, data} // передаем задание сразу при старте
    });
    worker
      .on('message', ({id, hash}) => {
        hashes[id] = hash;
        if (!--remain) {
          console.log('hashed:   ', Number(hrtime() - tsh) / 1e6 | 0, 'ms');
        }
      });
  });
}
else {
  const {id, data} = workerData; // переданное при старте потока
  parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
  process.exit(); // сразу завершаем поток
}
Каждой задаче - по потоку
Каждой задаче - по потоку

Ура, теперь процесс загружает все 4 ядра CPU на 100%! Вот только...

generated: 290 ms
hashed:    95161 ms

Сколько-сколько?!.. Да, по времени обработки теперь мы проиграли в 100 раз от первоначального не-многопоточного результата, зато узнали такой принцип:

  • слишком много одновременно активных потоков жестко конфликтуют за ресурсы (производительность CPU, пропускная способность памяти) и сообща работают намного дольше.

Несколько потоков, Round-robin

То есть потоков нам надо больше одного, но меньше, чем "непонятно сколько". Их количество должно быть таким, чтобы они не конфликтовали за ресурсы, но при этом максимально эффективно использовали их.

Кроме того, раз у нас теперь на каждый поток должна попасть только часть задач (а не все и не одна) ...

  • необходим алгоритм распределения задач между потоками.

Возьмем в качестве самого простого такого алгоритма Round-robin, когда задачи выдаются потокам последовательно "по кругу".

Распределение задач между потоками по Round-robin
Распределение задач между потоками по Round-robin

Поскольку основным используемым ресурсом в нашей задаче у нас является CPU, то сделаем ровно столько вычислительных потоков, сколько у нас CPU-ядер, чтобы в пределе их можно было все загрузить:

// ...

if (isMainThread) {
  // ...
  
  let tsh;

  const n = require('node:os').cpus().length; // потоков по числу ядер CPU
  Promise.all(
    Array(n).fill().map(_ => { // запускаем N потоков через Promise
      return new Promise((resolve, reject) => {
        const worker = new Worker(__filename);
        worker
          .on('online', () => resolve(worker))
          .on('message', ({id, hash}) => {
            hashes[id] = hash;
            if (!--remain) {
              console.log('hashed:   ', Number(hrtime() - tsh) / 1e6 | 0, 'ms');
              process.exit();
            }
          });
      });    
    })
  )
    .then((workers) => { // все потоки готовы
      const ln = workers.length;
      tsh = hrtime();
      messages.forEach((data, id) => {
        const worker = workers[id % ln]; // поочередно перебираем потоки "по кругу"
        worker.postMessage({id, data});
      });
    });
}
else {
  parentPort.on('message', ({id, data}) => {
    parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
  });
}
generated: 279 ms
hashed:    375 ms

Наконец-то мы получили от многопоточности хоть какой-то профит, причем сразу весьма неплохой - на 4-ядерном CPU в 2.5 раза быстрее, чем исходная версия.

"Сколько вешать в граммах?"

Но кто сказал, что активных потоков должно быть ровно столько - не больше и не меньше? Давайте чуть модифицируем наш тест и убедимся сами, заодно получив нагрузку каждого из потоков с помощью worker.eventLoopUtilization:

Полный код и результаты теста с разным количеством потоков
const {
  Worker
, isMainThread
, parentPort
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

if (isMainThread) {
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = []; // полный набор потоков
  let active = 1;     // текущее кол-во активных потоков
  let tsh;

  process
    .on('test:start', () => {
      hashes.fill();
      remain = hashes.length;

      // фиксируем состояние нагрузки на начало теста
      workers.forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
      tsh = hrtime();
      messages.forEach((data, id) => {
        const worker = workers[id % active]; // RR только среди активных
        worker.postMessage({id, data});
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      // получаем загрузку по всем потокам
      workers.forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      // вычисляем среднюю загрузку активных потоков
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0) / active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(2) // % загрузки
        ).join(' ')
      );

      if (active < n) {
        active++;                   // увеличиваем кол-во активных
        process.emit('test:start'); // перезапускаем тест
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => {
      return new Promise((resolve, reject) => {
        const worker = new Worker(__filename);
        worker
          .on('online', () => resolve(worker))
          .on('message', ({id, hash}) => {
            hashes[id] = hash;
            if (!--remain) {
              process.emit('test:end'); // фиксируем результаты
            }
          });
      });    
    })
  )
    .then((result) => {
      workers.push(...result);    // сохраняем все потоки в общедоступный массив
      process.emit('test:start'); // запускаем тест
    });
}
else {
  parentPort.on('message', ({id, data}) => {
    parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
  });
}
generated: 282 ms
hashed  1: 1120 ms | 99 |  99  0  0  0  0  0  1  0  0  0  0  0  0  0  1  0
hashed  2:  609 ms | 97 |  95 99  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  3:  427 ms | 97 |  97 96 99  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  4:  381 ms | 94 |  93 90 95 97  0  0  0  0  0  0  0  0  0  0  0  0
hashed  5:  362 ms | 95 |  91 98 93 99 93  0  0  0  0  0  0  0  0  0  0  0
hashed  6:  328 ms | 95 |  96 92 93 96 97 92  0  0  0  0  0  0  0  0  0  0
hashed  7:  331 ms | 82 |  74 78 96 80 93 71 83  0  0  0  0  0  0  0  0  0
hashed  8:  319 ms | 90 |  91 89 89 83 98 85 94 92  0  0  0  0  0  0  0  0
hashed  9:  323 ms | 85 |  86 79 86 84 83 80 85 87 97  0  0  0  0  0  0  0
hashed 10:  312 ms | 84 |  94 84 77 83 89 84 86 81 81 78  0  0  0  0  0  0
hashed 11:  313 ms | 83 |  94 75 85 94 88 80 82 77 81 82 72  0  0  0  0  0
hashed 12:  323 ms | 70 |  78 73 65 91 62 59 77 77 77 77 53 51  0  0  0  0
hashed 13:  307 ms | 68 |  77 50 46 99 64 58 60 77 74 73 56 75 80  0  0  0
hashed 14:  309 ms | 65 |  94 77 73 67 58 70 30 85 25 80 68 89 71 25  0  0
hashed 15:  304 ms | 56 |  58 89 82 23 79 79 75 22 23 25 79 46 71 53 34  0
hashed 16:  358 ms | 51 |  25 40 26 54 61 39 59 60 68 69 20 44 69 46 59 74

Длительность теста и средняя загрузка активных потоков
Длительность теста и средняя загрузка активных потоков

И вот тут мы можем заметить странность: минимальное время достигнуто при 15 потоках, а вовсе не при 4, что соответствовало бы полной загрузке ядер CPU. Правда, средняя загрузка этих 15 активных потоков при этом была ниже 60%, причем на некоторых - 22%, когда на других - 89%, а ведь это означает, что...

  • некоторые вычислительные потоки простаивают, если основной поток не успевает давать им задания.

  • или он отдал это задание не тому, кто был свободен и смог бы выполнить его сразу, а другому потоку, который еще был занят предыдущим заданием.

Как можно разобраться с этими неприятностями - в следующей части.

Комментарии (2)


  1. kpmy
    22.09.2022 23:58

    У воркера ведь свой скоуп и данные для обработки туда надо копировать целиком?


    1. Kilor Автор
      23.09.2022 00:03

      Да, но не всегда. Есть такая штука как transferList, про это будет в следующих частях.