В прошлых частях цикла мы:
рассмотрели базовые концепты работы с многопоточностью в JavaScript на примере среды Node.js;
научились формировать общую очередь и каналы обмена данными и сигналами, чтобы более эффективно управлять загрузкой потоков;
использовали разделяемую память и Atomics-операции как самое быстрое средство обмена большими блоками данных;
и создали отдельный поток-координатор, чтобы устранить негативное влияние синхронного кода в основном потоке исполнения на загрузку потоков вспомогательных.
В сегодняшней, заключительной, части я продемонстрирую, как все эти механики вместе позволяют сделать эффективный микросервис, автоматически подстраивающийся под изменения входящей нагрузки.
В данном случае эффективность - это не про максимально возможную скорость обработки каждой отдельной задачи, а про сбалансированное использование аппаратных ресурсов с учетом тех ограничений, на которые мы готовы пойти. Особенно актуально это для различных "облачных" размещений, где оплата идет за фактически потребленные CPU и RAM.
Передача контента сообщений
В предыдущей части мы научились быстро передавать блоки двоичных данных через разделяемую память в обрабатывающие их потоки с помощью потока-координатора, избавившего нас от ожидания исполнения синхронного кода в основном потоке:
Но, как правило, обрабатываемые сообщения состоят не только из двоичного контента, но и из некоторого объекта, который хочется иметь в доступе по итогу процессинга, а вот передавать его туда целиком вовсе незачем.
Доработаем схему из прошлой статьи, вынеся функционал промежуточного хранения сообщений и формирования единого блока двоичных данных в класс Coordinator
, а управления обрабатывающими потоками и очередью - в WorkersPool
:
В коде эта схема может выглядеть примерно так:
if (isMainThread) {
const {Coordinator} = require(...);
const coordinator = new Coordinator(__filename, ...);
coordinator
.on('online', () => {
// отправка данных
coordinator.postMessage(message);
})
.on('message', (result, message) => { // увязанные [по id] сообщение и результат
// обработка полученного результата в паре с исходным сообщением
});
}
else {
const {workerType, port} = workerData;
switch (workerType) {
case 'coordinator':
const {WorkersPool} = require(__filename, ...);
const pool = new WorkersPool(...);
break;
case 'worker':
const {shared} = workerData;
// магия обработки данных в разделяемой памяти
port.postMessage({id, ...});
break;
}
}
Представление данных
Для "увязки" обрабатываемых двоичных данные и исходного сообщения мы можем использовать автоинкрементный id
. Но только лишь его - мало, чтобы корректно передать данные в поток.
Создавая блок разделяемой памяти SharedArrayBuffer
, мы должны заранее предусмотреть максимальный размер возможного контента. Но работать-то мы можем с разными данными - то есть получается, что передавать размер нам надо тоже, иначе мы не сможем их правильно "выцепить" из разделяемой памяти.
Получается, что каждый отправляемый в поток блок данных должен иметь префикс, состоящий из служебного id
и длины самих данных - для этого нам достаточно иметь Uint32Array
из двух ячеек.
Но для "склейки" этих данных с двоичными данными из сообщения нам понадобится его Uint8Array
-проекция и функция преобразования dataFunc
, которая может быть примерно такой:
message => { // функция получения двоичных данных
// объединяем префикс и данные
const buf = Buffer.concat([ui8prefix, ...message.part]);
// записываем в префикс результирующую длину
buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
// возвращаем итоговый двоичный контент
return buf.buffer;
}
В свою очередь, на стороне обрабатывающего потока позиция id
воспринимается не только как Int32
-значение (уже знаковое!), но и как ячейка для ожидания блокировки:
const {shared} = workerData;
// [shared] = {lock:int32} + {size:uint32} + {data:uint8[]}
const lock = new Int32Array(shared, 0, 1);
const size = new Uint32Array(shared, Int32Array.BYTES_PER_ELEMENT, 1);
const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT);
// ...
const id = lock[0];
const messageData = data.subarray(0, size[0]);
Передача данных
Давайте детализируем схему выше с точки зрения путей обмена этими данными:
в основном потоке приложения у нас появляется сообщение
message
, содержащее некоторый набор двоичных данныхмы передаем его в
Coordinator
, где оно по автоинкрементномуid
заносится в промежуточноеMap
-хранилищеmessages
с помощью вспомогательной функции
dataFunc
мы из сообщения извлекаем все необходимые двоичные данные и "склеиваем" их с префиксом, содержащимid
и размер данныхрезультирующий
data
-буфер мы по ссылке передаем в поток-координаторв координаторе данные должны или пройти через очередь (если потоки заняты), или сразу копируются в разделяемую память свободного потока
поток, получая уведомление о новых данных, обрабатывает их и отправляет прямо в основной поток, ...
... где они по
id
связываются с исходным сообщением и передаются в.emit
Класс Coordinator
class Coordinator extends Worker {
#messages = new Map(); // хранилище всех обрабатываемых сообщений
#p32 = new Uint32Array(2); // префикс данных = [messageID, dataSize]
#p8 = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
#data = [this.#p8]; // массив для склейки частей в двоичный блок
#message2data; // функция получения двоичных данных из сообщения и помещения в массив
#data2buffer() {
// объединяем префикс и данные
const buf = Buffer.concat(this.#data);
// записываем в префикс результирующую длину
buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
// восстанавливаем состояние массива
this.#data.length = 1;
// возвращаем итоговый двоичный контент
return buf.buffer;
};
constructor(filename, options) {
// сигнальный канал
const {port1, port2} = new MessageChannel();
/* доинициализируем необходимые опции
{
workerData : {
workerType : 'coordinator'
, port : port2
}
, transferList : [port2]
}
*/
((options ??= {}).workerData ??= {}).workerType = 'coordinator';
((options ??= {}).workerData ??= {}).port = port2;
((options ??= {}).transferList ??= []).push(port2);
super(filename, options);
const {dataField, dataArray} = options;
this.#p32[0] = 0; // ID
this.#message2data = dataField
? message => this.#data.push(dataField(message))
: message => this.#data.push(...dataArray(message));
const messages = this.#messages;
port1.on('message', ({threadId, port}) => {
// ассоциируем открывшийся порт с конкретным потоком
port.threadId = threadId;
this.emit('port.open', port);
port
.on('message', result => {
// из результата обработки по ID получаем исходное сообщение ...
const message = messages.get(result.id);
if (message) {
messages.delete(result.id);
// ... и передаем вместе с результатом
this.emit('message', result, message);
}
})
.on('close', () => {
// при закрытии порта - отписываемся от него
this.emit('port.close', port);
port.removeAllListeners();
});
});
}
// передача двоичных данных в поток
postMessage(message) {
// сохраняем объект сообщения в хранилище
this.#messages.set(this.#p32[0], message);
// добавляем в общий массив для склейки одно или несколько полей
this.#message2data(message);
// формируем целевой контейнер данных
const buffer = this.#data2buffer();
// передаем двоичный контент с префиксом в поток по ссылке
super.postMessage(buffer, [buffer]);
// id = (id + 1) % 0x10000000
this.#p32[0]++;
this.#p32[0] &= 0x0FFFFFFF;
}
}
На что тут стоит обратить внимание:
для связи между объектом
Coordinator
в основном потоке и потоком-координатором мы подняли служебныйMessageChannel
, состоящий из пары портов, как это было описано во второй части серии;именно по этому каналу поток-координатор передает нам порты для порождаемых им обрабатывающих потоков;
по этим-то портам рабочие потоки и сбрасывают нам результаты своей деятельности прямо в основной поток.
Разумное количество потоков
Так, с передачей данных в обрабатывающие потоки и обратно в основной поток - разобрались. Но сколько таких потоков нам необходимо иметь вообще?
Сначала обратим внимание на тот факт, что некоторых ресурсов стоит уже само создание потока. Если же оно включает в себя какую-то "тяжелую" инициализационную подготовку вроде прегенерации кэша внутри потока, то его старт может кратковременно занимать логическое ядро CPU даже на все 100%.
Отсюда следует четыре достаточно простых вывода:
потоки стоит порождать/убивать как можно реже;
не стоит это делать одновременно - то есть допустима некоторая задержка на создание/уничтожение потока;
как минимум, один поток обработки должен существовать всегда;
потоков должно быть не больше, чем CPU-ядер.
Раз уж мы упомянули тот факт, что внутри потока могут формироваться собственные данные или кэш, то становится выгодно иметь лишь минимально необходимое число потоков, насколько это вообще возможно, чтобы занимать как можно меньше памяти и улучшать долюcache hit
.
Как упоминалось выше, для решения наших задач будет логично выделить класс WorkersPool
, работающий внутри потока-координатора, который и будет осуществлять все управление потоками и очередью.
Критерий "необходимости"
Но как понять, что существующего количества потоков уже стало недостаточно, и все-таки необходимо создать еще один, несмотря на все сопутствующие издержки?
Фактически, нас интересует лишь один параметр работы нашего сервиса - чтобы все задачи обрабатывались достаточно быстро. А любая поступившая задача может быть либо сразу отдана на исполнение какому-то из потоков, либо поставлена в очередь.
Очередь, кстати, возьмем все ту же, из прошлой статьи, на основе кольцевого буфера:
this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);
Сама длина очереди "в штуках" нам не сильно о чем-то говорит, ведь мы не оцениваем ни скорость обработки, ни "вес" каждой отдельной задачи. Но если мы задумаемся, что нахождение задачи в очереди в течение 1 миллисекунды нас совсем не напрягает, а "зависание" в течение 1 секунды не устраивает совсем, то где-то на этом интервале [1ms .. 1s]
найдем комфортное значение, сколько мы готовы достаточно безболезненно позволить задаче находиться в очереди - скажем, это будет 100ms
.
То есть пока время пребывания самой старой задачи в очереди не перевалило за это значение, выгоднее еще немного подождать, чем стартовать еще один поток.
Чтобы не заниматься этой оценкой на каждой операции, повесим эту проверку на таймер прямо в конструкторе класса.
Класс WorkersPool (каркас)
class WorkersPool {
#options; // кэш аргументов конструктора
#mainWorker; // основной рабочий поток
#workersPool = []; // пул свободных дополнительных потоков
#workersSet = new Set(); // полный набор всех активных потоков
#workersRemain; // потоков еще доступно к созданию
#queue; // очередь на кольцевом буфере
#checking; // признак активности проверки состояния
#queueMore1; // "длина" очереди превышает возможности 1 потока
#queueMoreW; // ... всех существующих потоков
#checkQueue(workers) {
// если в очереди задач уже больше, чем потоков,
// ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
}
constructor(workerFile, options) {
options['workerFile'] ??= workerFile;
const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
this.#options = options;
this.#workersRemain = poolSize;
// очередь на кольцевом буфере
this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);
// главный вспомогательный поток существует сразу и всегда
this.#mainWorker = this.#createWorker();
// ...
// периодическая проверка очереди и пула дополнительных потоков
const pool = this.#workersPool;
setInterval(() => {
// пора ли порождать еще один поток?
this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
// пора ли отдавать задачи дополнительным потокам?
this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
// закрытие простаивающих дополнительных потоков
// ...
}, intervalCheck);
}
#createWorkerIfPossible() {
// проверяем возможность и необходимость (по состоянмю очереди) запуска потока
if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
this.#createWorker();
}
}
#createWorker() {
this.#workersRemain--;
this.#workersRemain = -this.#workersRemain; // wrap flag
// ...
worker.on('online', () => {
// продолжим пробовать стартовать следующий после паузы на timeoutSpawn
setTimeout(() => {
this.#workersRemain = -this.#workersRemain; // wrap flag
this.#createWorkerIfPossible();
}, this.#options['timeoutSpawn']);
});
return worker;
}
#destroyWorker(worker) {
// убираем поток из общего набора
this.#workersSet.delete(worker);
// завершаем сам поток
worker.terminate();
// исключаем воркер из пула, если он там был
const idx = this.#workersPool.indexOf(worker);
if (idx >= 0) {
this.#workersPool.splice(idx, 1);
}
// увеличиваем количество доступных к запуску
this.#workersRemain++;
}
}
Тут мы использовали #workersRemain
одновременно и как счетчик доступных к запуску, и как признак (отрицательное значение) наличия запускающегося в текущий момент потока.
Второе замечание касается использования для пула свободных именно массива, а не Set, поскольку нам важен приоритет потоков при раздаче им заданий на обработку.
Принцип распределения сообщений
Старшему досталась мельница, среднему – осел, ну а младшему пришлось взять себе кота.
[Шарль Перро, "Кот в сапогах"]
С учетом возможности существования собственного кэша в каждом из потоков, нам выгоднее использовать его как можно эффективнее - то есть отправлять задачи в поток, который существует дольше.
В идеале, таким потоком должен всегда оказываться главный рабочий поток, который, как мы договорились выше, существует всегда.
Но он может быть занят выполнением текущей задачи - тогда задача должна быть направлена в очередь, либо первому свободному потоку, если "длина" очереди уже заведомо превосходит возможности первого потока.
Если же ни одного такого потока не нашлось, и общая длина очереди превышает возможности всех потоков за нормативное время порождения потока, то пора задуматься о порождении нового потока:
this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
this.#createWorkerIfPossible();
this.#checking = null;
}, this.#options['intervalCheck']));
Такая конструкция не только заставляет ждать intervalCheck
для попытки запуска потока, но и блокирует все остальные попытки на этом интервале.
Получение данных из очередей
На самом деле, когда мы говорим про очередь, надо не забывать, что их, фактически, две.
Когда у нас случается parentPort.on('message', message => ...)
, это означает, что нам отправили не только конкретное сообщение, но и что-то еще за ним могли успеть положить в очередь порта, которую мы можем сразу извлечь через receiveMessageOnPort
, избегнув повторных вызов обработчика, как рассказывалось во второй части.
Получается вот такая нетривиальная схема доставки данных до процессинга в потоке:
.on('message') и .pulse()
constructor(workerFile, options) {
// ...
// основная точка приема сообщений из main-потока
parentPort.on('message', message => {
// флаг возможности и необходимости продолжать отправку
let processed;
const {lock : [lockState], _pulse} = this.#mainWorker;
if (lockState == THREAD_FREE) { // если основной рабочий поток свободен
processed = _pulse(message); // ... отдаем сообщение основному потоку
message = undefined; // ... и обнуляем
}
// продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
// ... и еще кто-то остался кто-то свободный
while (!processed) {
const worker = this.#queueMore1 && this.#workersPool.pop();
if (worker) {
processed = worker._pulse(message);
message &&= undefined;
}
else {
// перекладываем всю очередь порта в свою очередь с меткой времени
const now = Date.now();
message ??= receiveMessageOnPort(parentPort)?.message;
while (message) {
this.#queue.push(message);
message.ts = now;
message = receiveMessageOnPort(parentPort)?.message;
}
// отложенная проверка необходимости запуска нового потока
this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
this.#createWorkerIfPossible();
this.#checking = null;
}, this.#options['intervalCheck']));
return;
}
}
});
}
// ...
#pulseWorker(worker, message) {
// если нам передали сообщение - пытаемся отдать его потоку
let processed = !message || worker._send(message);
// пока он говорит "уже все сразу обработал"...
while (processed) {
// извлекаем следующее сообщение из очереди порта основного потока
const recv = receiveMessageOnPort(parentPort);
// ... или своей локальной очереди
message = recv?.message ?? this.#queue.shift();
if (!message) {
if (worker !== this.#mainWorker) {
// когда все очереди закончились - возвращаем дополнительный поток в пул свободных
const pool = this.#workersPool;
!pool.includes(worker) && pool.push(worker);
}
return true;
}
// передаем данные в поток
processed = worker._send(message);
}
};
Сама схема передачи в поток "по блокировке" та же самая, что мы использовали в прошлой части.
"Моя фамилия - Итого"
Осталось только свести все части воедино:
Полный код классов и тестового приложения, и пара слов про обработку строк
const {
Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
, receiveMessageOnPort
, threadId
} = require('node:worker_threads');
const THREAD_FREE = -1;
class WorkersPool {
#options; // кэш аргументов конструктора
#mainWorker; // основной рабочий поток
#workersPool = []; // пул свободных дополнительных потоков
#workersSet = new Set(); // полный набор всех активных потоков
#workersRemain; // потоков еще доступно к созданию
#queue; // очередь на кольцевом буфере
#checking; // признак активности проверки состояния
#queueMore1; // "длина" очереди превышает возможности 1 потока
#queueMoreW; // ... всех существующих потоков
#checkQueue(workers) {
// если в очереди задач уже больше, чем потоков,
// ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
}
constructor(workerFile, options) {
options['workerFile'] ??= workerFile;
const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
this.#options = options;
this.#workersRemain = poolSize;
// очередь на кольцевом буфере
this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);
// главный вспомогательный поток существует сразу и всегда
this.#mainWorker = this.#createWorker();
// основная точка приема сообщений из main-потока
parentPort.on('message', message => {
// флаг возможности и необходимости продолжать отправку
let processed;
const {lock : [lockState], _pulse} = this.#mainWorker;
if (lockState == THREAD_FREE) { // если основной рабочий поток свободен
processed = _pulse(message); // ... отдаем сообщение основному потоку
message = undefined; // ... и обнуляем
}
// продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
// ... и еще кто-то остался кто-то свободный
while (!processed) {
const worker = this.#queueMore1 && this.#workersPool.pop();
if (worker) {
processed = worker._pulse(message);
message &&= undefined;
}
else {
// перекладываем всю очередь порта в свою очередь с меткой времени
const now = Date.now();
message ??= receiveMessageOnPort(parentPort)?.message;
while (message) {
this.#queue.push(message);
message.ts = now;
message = receiveMessageOnPort(parentPort)?.message;
}
// отложенная проверка необходимости запуска нового потока
this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
this.#createWorkerIfPossible();
this.#checking = null;
}, this.#options['intervalCheck']));
return;
}
}
});
// периодическая проверка очереди и пула дополнительных потоков
const pool = this.#workersPool;
setInterval(() => {
// пора ли порождать еще один поток?
this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
// пора ли отдавать задачи дополнительным потокам?
this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
// закрытие простаивающих дополнительных потоков
if (pool.length) {
const deadline = Date.now() - timeoutIdle;
for (const {activity, lock : [lockState], _destroy} of pool) {
activity < deadline && lockState == THREAD_FREE && _destroy();
}
}
}, intervalCheck);
}
#createWorkerIfPossible() {
// проверяем возможность и необходимость (по состоянмю очереди) запуска потока
if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
this.#createWorker();
}
}
#createWorker() {
this.#workersRemain--;
this.#workersRemain = -this.#workersRemain; // wrap flag
const shared = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT + this.#options['dataSize']);
const data = new Uint8Array(shared);
const lock = new Int32Array(shared, 0, 1); // lock/id - первые 4 байта data
lock[0] = 0;
// ждем, пока поток не проинициализируется и не сменит состояние, чтобы его "подергать"
let worker;
Atomics.waitAsync(lock, 0, 0)
.value
.then(() => lock[0] == THREAD_FREE && worker._pulse());
const {port1, port2} = new MessageChannel();
worker = new Worker(
this.#options['workerFile']
, {
workerData : {
workerType : 'worker'
, shared
, port : port2 // передаем порт в рабочий поток
}
, transferList : [port2]
}
);
// отправляем порт в основной поток, в Coordinator
this.#options['port'].postMessage(
{
threadId : worker.threadId
, port : port1
}
, [port1]
);
this.#workersSet.add(worker);
worker.id = worker.threadId;
worker.port = port2;
worker.lock = lock;
worker.data = data;
worker._pulse = this.#pulseWorker.bind(this, worker);
worker._destroy = this.#destroyWorker.bind(this, worker);
worker._send = this.#sendMessage.bind(this, worker);
worker.on('online', () => {
// продолжим пробовать стартовать следующий после паузы на timeoutSpawn
setTimeout(() => {
this.#workersRemain = -this.#workersRemain; // wrap flag
this.#createWorkerIfPossible();
}, this.#options['timeoutSpawn']);
});
return worker;
}
#pulseWorker(worker, message) {
// если нам передали сообщение - пытаемся отдать его потоку
let processed = !message || worker._send(message);
// пока он говорит "уже все сразу обработал"...
while (processed) {
// извлекаем следующее сообщение из очереди порта основного потока
const recv = receiveMessageOnPort(parentPort);
// ... или своей локальной очереди
message = recv?.message ?? this.#queue.shift();
if (!message) {
if (worker !== this.#mainWorker) {
// когда все очереди закончились - возвращаем дополнительный поток в пул свободных
const pool = this.#workersPool;
!pool.includes(worker) && pool.push(worker);
}
return true;
}
// передаем данные в поток
processed = worker._send(message);
}
};
#sendMessage(worker, message) {
const {lock, data, _pulse} = worker;
// записываем из ui8-проекции в разделяемую память вместе с id
data.set(new Uint8Array(message));
// id - это первые 4 байта data
const id = lock[0];
// уведомляем поток
Atomics.notify(lock, 0, 1);
// фиксируем момент последней активности потока
worker.activity = Date.now();
// ждем, пока поток не обработает и не позовет нас
const {value} = Atomics.waitAsync(lock, 0, id);
if (value === 'not-equal') {
// если он сразу успел обработать, то в него можно и дальше писать
return true;
}
else {
// если не сразу - "подергаем", когда освободится
value.then(() => lock[0] == THREAD_FREE && _pulse());
}
}
#destroyWorker(worker) {
// убираем поток из общего набора
this.#workersSet.delete(worker);
// закрываем обе стороны ассоциированного MessageChannel
worker.port.close();
// завершаем сам поток
worker.terminate();
// исключаем воркер из пула, если он там был
const idx = this.#workersPool.indexOf(worker);
if (idx >= 0) {
this.#workersPool.splice(idx, 1);
}
// увеличиваем количество доступных к запуску
this.#workersRemain++;
}
}
class Coordinator extends Worker {
#messages = new Map(); // хранилище всех обрабатываемых сообщений
#p32 = new Uint32Array(2); // префикс данных = [messageID, dataSize]
#p8 = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
#data = [this.#p8]; // массив для склейки частей в двоичный блок
#message2data; // функция получения двоичных данных из сообщения и помещения в массив
#data2buffer() {
// объединяем префикс и данные
const buf = Buffer.concat(this.#data);
// записываем в префикс результирующую длину
buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
// восстанавливаем состояние массива
this.#data.length = 1;
// возвращаем итоговый двоичный контент
return buf.buffer;
};
constructor(filename, options) {
// сигнальный канал
const {port1, port2} = new MessageChannel();
/* доинициализируем необходимые опции
{
workerData : {
workerType : 'coordinator'
, port : port2
}
, transferList : [port2]
}
*/
((options ??= {}).workerData ??= {}).workerType = 'coordinator';
((options ??= {}).workerData ??= {}).port = port2;
((options ??= {}).transferList ??= []).push(port2);
super(filename, options);
const {dataField, dataArray} = options;
this.#p32[0] = 0; // ID
this.#message2data = dataField
? message => this.#data.push(dataField(message))
: message => this.#data.push(...dataArray(message));
const messages = this.#messages;
port1.on('message', ({threadId, port}) => {
// ассоциируем открывшийся порт с конкретным потоком
port.threadId = threadId;
this.emit('port.open', port);
port
.on('message', result => {
// из результата обработки по ID получаем исходное сообщение ...
const message = messages.get(result.id);
if (message) {
messages.delete(result.id);
// ... и передаем вместе с результатом
this.emit('message', result, message);
}
})
.on('close', () => {
// при закрытии порта - отписываемся от него
this.emit('port.close', port);
port.removeAllListeners();
});
});
}
// передача двоичных данных в поток
postMessage(message) {
// сохраняем объект сообщения в хранилище
this.#messages.set(this.#p32[0], message);
// добавляем в общий массив для склейки одно или несколько полей
this.#message2data(message);
// формируем целевой контейнер данных
const buffer = this.#data2buffer();
// передаем двоичный контент с префиксом в поток по ссылке
super.postMessage(buffer, [buffer]);
// id = (id + 1) % 0x10000000
this.#p32[0]++;
this.#p32[0] &= 0x0FFFFFFF;
}
}
const taskSize = 1 << 16;
if (isMainThread) {
const {randomBytes} = require('node:crypto');
const fs = require('node:fs');
const {tmpdir} = require('node:os');
const {sep} = require('node:path');
console.log(Date.now(), 'Main : online');
const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));
// создаем временную папку и в ней файлы со всеми "сообщениями"
const dir = fs.mkdtempSync(tmpdir() + sep);
messages.forEach((data, i) => {
const fn = i.toString(16).padStart(3, '0');
fs.writeFileSync(dir + sep + fn, data);
});
console.log(Date.now(), 'Main : generated');
const hashes = messages.map(() => undefined);
let remain;
const coordinator = new Coordinator(
__filename
, {
dataField : message => message.data // двоичные данные лежат в одном поле
}
);
coordinator
.on('online', () => {
console.log(Date.now(), 'Coordinator : online');
// получаем список всех сообщений
const fns = fs.readdirSync(dir)
.sort()
.map(fn => dir + sep + fn);
remain = fns.length;
fns.forEach((fn, id) => {
const data = fs.readFileSync(fn); // тяжелый синхронный код
coordinator.postMessage({id, fn, data});
});
console.log(Date.now(), 'Main : all send');
})
.on('port.open', port => {
console.log(Date.now(), `Coordinator : dataPort open = +1 worker [${port.threadId}]`);
})
.on('port.close', port => {
console.log(Date.now(), `Coordinator : dataPort close = -1 worker [${port.threadId}]`);
})
.on('message', (result, message) => {
hashes[message.id] = result.hash;
if (!--remain) {
console.log(Date.now(), 'Main : all recv');
// подождем, пока все завершатся все желающие потоки
setTimeout(() => {
process.exit();
}, 1000);
}
});
}
else {
const {workerType, port} = workerData;
switch (workerType) {
case 'coordinator':
// в потоке-координаторе нет активности, кроме управления пулом рабочих потоков
const pool = new WorkersPool(
__filename
, {
poolSize : require('node:os').cpus().length // по количеству CPU-ядер
, dataSize : taskSize // предельный размер данных
, port
, queuePowMin : 8 // 256
, queuePowMax : 16 // 65536
, intervalCheck : 10
, timeoutSpawn : 100
, timeoutIdle : 10
}
);
break;
case 'worker':
const {createHash} = require('node:crypto');
const {shared} = workerData;
// [shared] = {lock:int32} + {size:uint32} + {data:uint8[]}
const lock = new Int32Array(shared, 0, 1);
const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT);
const processMessage = () => {
const [id, size] = lock;
// забираем данные и производим обработку
const hash = createHash('sha256').update(
data.subarray(0, size) // входящий контент
).digest('hex')
// уведомляем координатор о своей доступности
lock[0] = THREAD_FREE;
Atomics.notify(lock, 0, 1);
// отправляем результат
port.postMessage({id, hash});
// ... и возвращаемся к ожиданию блокировки
wait();
};
const wait = () => {
const {value} = Atomics.waitAsync(lock, 0, THREAD_FREE);
if (value === 'not-equal') {
// если значение изменилось, то поток уже обработал задачу, и реагируем сразу
processMessage();
}
else {
// иначе ждем разрешения Promise блокировки
value.then(processMessage);
}
};
// "освобождаем" блокировку и уведомляем координатора
lock[0] = THREAD_FREE;
Atomics.notify(lock, 0, 1);
wait();
// подвешиваем поток в бесконечное ожидание
port.on('message', () => {});
break;
}
}
Если ваша обработка в том или ином виде требует копирования данных из разделяемого буфера (например, преобразование в строку), то вы можете освободить блокировку сразу после него.
В этом случае поток продолжит обработку строки, а координатор уже сможет положить в буфер свежие данные.
Если вы все сделали правильно, то должны увидеть примерно такой вывод:
1666250935654 Main : online
1666250940036 Main : generated
1666250940132 Coordinator : online
1666250941187 Main : all send
1666250941195 Coordinator : dataPort open = +1 worker [2]
1666250941196 Coordinator : dataPort open = +1 worker [3]
1666250941196 Coordinator : dataPort open = +1 worker [4]
1666250941225 Coordinator : dataPort close = -1 worker [3]
1666250941244 Main : all recv
1666250941262 Coordinator : dataPort close = -1 worker [4]
В данном случае было запущено одновременно до 3 рабочих потоков: главный #2
и два вспомогательных #3
и #4
. Один из них успел завершиться даже раньше, чем мы получили все результаты, а второй - чуть погодя.
На этом цикл статей про многопоточность в JavaScript/Node.js я завершаю, а вы прочитайте предыдущие части - не пожалеете!
часть 1: базовые концепты
часть 2: очередь, каналы и координатор
часть 3: разделяемая память, атомарные операции и блокировки
часть 4: координатор против синхронного кода
часть 5: автомасштабирование под нагрузку
Комментарии (6)
rutexd
22.10.2022 17:53Строго говоря это и не многопоточность, это банальное масштабирование. Многопоточность как термин воркеров, здесь мало уместен.
Да иногда можно выиграть. Но с другой стороны, если нужен настоящий выигрыш - легче написать модуль нативный который по настоящему многопоточно что то гоняет а не извращаться подобным образом. Впрочем, зависит от задач.
Kilor Автор
22.10.2022 17:55Если worker_threads - это не про потоки, то про что же?
Ну, и написать нативный модуль - это почти никогда не "легче".
rutexd
22.10.2022 18:36Про что? Про IPC обозванным потоками! :)
На уровне абстракции вы правы - воркеры можно рассматривать как потоки. Однако на деле, под капотом работает почти отдельный самостоятельный инстанс движка, который разделяет родителя\потомка, перенаправляет банально stdin\out итд итп. По итогу мы имеем оверхед просто от существования "потока" в виде инстанса, потребляемой памяти и прочей радости в виде больших потерь на том самом IPC, чего на нативных слоях даже в помине нету кроме условных одного - другого десятка байт на хэндл и прочей метаинфы.Не поймите меня не правильно. Я сам люблю ноду. Однако как уже выше верно сказано - нода не про многопоточность. Нода в принципе не про многопоточность. И никогда не будет. Как и не будет про тяжелую нагрузку.
А в чем сложность написания нативного модуля? Сейчас (да и думаю уже давно) есть решения которые позволяют это делать одной двумя кнопками. Либо если не нативный модуль то можно делегировать тяжелую задачу на какой нибудь бинарь.
Kilor Автор
22.10.2022 18:52Вот что наличие отдельного инстанса V8 внутри дает издержки - согласен, но они точно такие же, как и в основном потоке. И мы ведь осознанно идем на них, выбирая транслируемый ЯП вместо компилируемого.
А вот "не про тяжелую нагрузку" - так это надо внимательно на прикладную задачу смотреть. Грубо, если я использую те же регулярки, http или любой другой нативный модуль, или если сам V8 сгенерил по моему JS-коду байткод с использованием SSE - так ли уж велики издержки относительно вызова тех же функций в C++/Rust/...?
beatleboy
К сожалению стоит признать что многопоточность и нода это практически несовместимые вещи. Параллелить на уровне независимых процессов завернув приложение в докер - еще куда не шло. Но появляется оверхед по ресурсам, коннектам к БД
У меня в одном проекте на проде еще недавно крутилось 120 одинаковых докер контейнеров выполняющих одну и туже логику (но с разными параметрами запуска), конечно начиналось все с нескольких и по мере роста пришлось заскейлить инстансы до такого кол-ва.
Вобщем решил я это дело замногопоточить в рамках одного процесса, помучался NodeJS worker_threads и понял что надо бы это дело на Go попробовать. И вуаля, аналогичный код на Go в проде теперь жрет 200-400 мб и 2-4 ядра и все это один процесс, при этом выполняет очень много операций. По итогу получилось что нагрузка упала примерно в 50 раз, но объем работы выполняется тот же.
Ноду я конечно из-за этого случая не разлюбил, но выводы сделал что для многопоточки хорош Go, возможно и Rust тоже не плох для этого, но до него я так и не добрался.
Kilor Автор
Все-таки чтобы полностью ощутить преимущество другого языка, надо переписать существующее приложение целиком, а это не всегда возможно и оправданно. Но вот так повысить производительность JS за счет многопоточности иногда можно.