Доброго времени суток
У меня на работе возник спор между мной и дотнетчиками насчет потоков в новой версии Node.JS и необходимости их синхронизоровать. Для начала решили выбрать задачу о параллельной записи строк в файл. Тема с worker_threads горячая, прошу под кат.
Немного о самих потоках. Они являются экспериментальной технологией в Node.JS 10.5.0, и для того, чтобы иметь доступ к модулю «worker_threads», необходимо запускать наше Node.JS приложение с флагом "--experimental-worker". Я прописал этот флаг в start скрипте в файле package.json:
{
"name": "worker-test",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": {
"start": "node --max-old-space-size=4096 --experimental-worker app.js "
},
"author": "",
"license": "ISC"
}
Теперь о самой логике. Главный поток порождает N рабочих потоков, все они пишут с каким-то интервалом в файл. В отличие от всех примеров, где главные и дочерние потоки стартуют с одного файла, я отделил потоки в отдельный, мне это кажется более чистым и элегантным.
Собственно, код.
Главный файл app.js — точка входа.
const { Worker } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Hello from main!');
for (var i = 1; i <= WORKERS_NUMBER ; i++) {
const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}
Здесь мы просто создаем дочерние потоки используя класс Worker и указывая путь к стартовому файлу для потока './writer-worker-app/app.js'. При создании потока передаем самописный айдишник как данные workerData.
Стартовый файл для потока ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker ${id} initializad.`);
while (true) {
sendMessage();
}
function sendMessage() {
logger.log(`Hello from worker number ${workerData.id}\r\n`);
}
Ну и простейший класс-логер: ./writer-worker-app/logger.js
const fs = require('fs');
function log(message) {
return fs.appendFileSync('./my-file.txt', message);
}
module.exports = {
log
};
При запуске этого приложения мы все надеялись на то, что в итоге получим кашу в файле и дотнетчики закричат, как нужны блокировки с семафорами и прочими радостями параллельного исполнения. Но нет! В файле все строки идут не прерываясь, разве что в случайном порядке:
Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
Замечательный эксперимент, очередная маленькая победа Ноды :-) Моё предположение в том, что вся синхронизация происходит на уровне I\O потоков Ноды, но буду рад узнать в комментариях правильный вариант. На всякий случай мы проверили работу, используя не fs.appendFileSync, а fs.createWriteStream и метода stream.write.
Результат вышел такой же.
Но мы на этом не остановились.
Коллега предложил задачу о синхронизации потоков. Для нашего конкретного примера, пусть это будет задача последовательной записи в файл в порядке возврастания айдишников. Сначала пишет первый поток, потом второй, потом третий и так далее.
Для этого я ввёл еще один поток-Менеджер. Можно было обойтись главным, но мне так приятно создавать этих изолированных рабочих и выстраивать общение посредством сообщений. Прежде чем начать писать имплементацию потока-Менеджера, необходимо создать канал связи между ним и писателями-рабочими. Для этого был использован класс MessageChannel. Инстансы этого класса имеют два поля: port1 и port2, каждый из которых умеет слушать и отправлять сообщения другому посредством методов .on('message') и .postMessage(). Этот класс и был создан в рамках модуля «worker_threads» для коммуникации между потоками, потому что обычно при передачи объекта происходит просто его клонирование, и в изолированной среде выполнения потока он будет бесполезен.
Для коммуникации между 2 потоками мы каждому должны дать по порту.
Интересный факт: на 10.5.0 невозможно передать порт через конструктор воркера, необходимо это делать только через worker.postMessage(), причем обязательно указывая порт в transferList параметре!
Сам поток-менеджер будет отсылать команды потокам-писателям в порядке возрастания их идентификаторов, причем следующую команду он отправит только после получения ответа писателя об успешной операции.
Недо-UML-диаграмма приложения:
Наш видоизмененный главный файл ./app.js:
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Main app initialized and started.');
const workersMeta = [];
for (var i = 1; i <= WORKERS_NUMBER; i++) {
const channel = new MessageChannel();
const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
workersMeta.push({ id: i, worker, channel });
}
workersMeta.forEach(({ worker, channel }) => {
worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})
setTimeout(() => {
const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));
console.log('All worker threads have been initialized');
}, WORKERS_NUMBER * 10);
Здесь мы сначала создаем воркеров, потом каждому отправляем порт для связи с менеджером (и только так, через конструктор это сделать невозможно).
Потом создаем поток-менеджер, отправляем ему список портов для связи с потоками-писателями.
Updated: эмпирическим путем я выяснил, что при работе с потоками лучше сначала дать им настояться (проинициализироваться как надо). По хорошему надо было слушать какие то ответы от потоков в стиле «Я готов!», но я решил пойти более легким путем.
Изменим и поведение потока-писателя, чтобы он отправлял сообщение только когда ему скажут, а также возвращал результат, когда операция записи закончена:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker ${id} initializad.`);
parentPort.on('message', value => {
const orchestratorPort = value.orchestratorPort;
orchestratorPort.on('message', data => {
if (data.command == 'write') {
console.log(`Worker ${id} received write command`);
sendMessage();
sendResult(orchestratorPort);
}
});
console.log(`Worker ${id} started.`);
});
function sendMessage() {
logger.log(`Hello from worker number ${workerData.id}\r\n`);
}
function sendResult(port) {
port.postMessage({ id, status: 'completed' });
}
Мы правильно проинициализировались от сообщение родительского потока, начали случать канал потока-менеджера, при получении команды сначала пишем в файл, потом отправляем результат. Нужно заметить, что в файл пишется синхронно, поэтому sendResult() вызывается сразу за sendMessage().
Всё, что осталось — написать имплементацию нашего умного менеджера
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads');
console.log('Orchestrator initialized.')
let workerPorts;
parentPort.on('message', (value) => {
workerPorts = value.workerPorts;
workerPorts.forEach(wp => wp.port.on('message', handleResponse));
console.log('Orchestrator started.');
sendCommand(workerPorts[0]);
});
function handleResponse(status) {
const responseWorkerId = status.id;
let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
if (!nextWorker) {
nextWorker = workerPorts[0];
}
sendCommand(nextWorker);
}
function sendCommand(worker) {
worker.port.postMessage({ command: 'write' });
}
Получили список портов, упорядочили, для каждого порта установили колбек на респонз, ну и отправили команду первому. В самом колбеке ищем следующего писателя и отправляем команду ему. Чтобы не сильно напрягать систему, был установлен интервал между командами.
Вот и всё, наше многопоточное приложение с управлением потоков готово. Мы научились не просто порождать воркеры-потоки в Node.JS, но и создавать эффективные способы коммуникации между ними. На мой личный взгляд, архитектура изолированных потоков в Node.JS с ожиданием и отправкой сообщений более чем удобная и перспективная. Всем спасибо за внимание.
Весь исходный код может быть найден здесь.
UPDATED
Чтобы не вводить в заблуждение читателей, а также не давать лишних поводов написать, что я жульничаю с таймаутами, я проапдейтил статью и репозиторий.
Изменения:
1) удалены интервалы в первоначальных писателях, теперь по хардкору идет while(true)
2) добавлен --max-old-space-size=4096 флаг, просто на всякий случай, т.к. текущая имплементация потоков не очень стабильная и я надеюсь, что это как то поможет.
3) удалены интервалы отправки сообщений у потока-менеджера. теперь запись идёт нон-стоп.
4) добавлен таймаут при инициализации менеджера, почему — расписано выше.
TO DO:
1) добавить сообщения изменяемой длины или подсчет вызова логера — спасибо FANAT1242
2) добавить бенчмарк, сравнить работу первой и второй версии (сколько строк запишет за 10 секунд, к примеру)
Комментарии (13)
Aquahawk
03.07.2018 12:34+1блджад. Для начала сообщение раз в 100 миллисекунд это супер редко, вы просто не нарвались. Запустите каждый поток синхронно просто писать, без таймаута. Это раз. Второе, если оно синхронизировано — это ужасно, это очередной раз убийство производительности в угоду возможности привлечь к работе малограмотных работников. Вместо адекватного инструмента синхронизации, тупо сделать всегда просто потому что так удобно.
LoserKiss Автор
03.07.2018 13:02+21) мы пробовали запускать 100 потоков с 10 миллисекунд задержкой, результат был тот же. Оставил так как есть просто для наглядности.
2) Прошу вас, напишите здесь другой инструмент синхронизации Node.JS потоков)AxisPod
04.07.2018 06:07Вы думаете 10мс достаточно? А что если на простые запросы бд могут отвечать за 10-100 наносекунд, это для сравнения. Вывод уж явно не медленее будет. Чтобы понять какую задержку сделать, надо хотя бы первоночально провести бенчмарк и определить как же быстро выводится строка с текстом. А к примеру когда я тестирую многопоточность, вообще никакие задержки не использую ибо вносят они очень большие погрешности.
А с паузами в 10-100мс, многопоточность и не нужна вовсе.LoserKiss Автор
04.07.2018 12:14Чтобы никто не думал, что я жульничаю, я обновил статью и код, убрав все таймауты.
Lynn
04.07.2018 10:52Хочется увидеть верстю с createWriteStream
LoserKiss Автор
04.07.2018 12:16Поверьте, она не стоит того, чтобы показывать)
меняем метод log у логера на function getStream(){
return fs.createWriteStream('./my-file.txt');
}
при инициализации потоков они сразу берут стрим в виде const fs = logger.getStream();
при записи вызывается fs.write('text');
FANAT1242
04.07.2018 11:36При указанном способе параллельной записи в файл «каши» не будет и в .NET, т.к. все сообщения равной длины. Когда два потока одновременно дозаписывают данные в файл, данные будут просто записаны в один и тот же участок файла. Т.е. один поток перезапишет данные другого потока.
Можете попробовать посчитать общее количество записанных строк в файле. Если Node.JS выполняет какую-то I/O-синхронизацию, то 10 потоков, записывающих по 1000 строк, дадут ровно 10000 строк в файле. В противном случае строк будет меньше.
Или попробуйте в разных потоках писать сообщения разной длины.
Fengol
А почему у Вас в цикле i < 10, а в выводе id отсутствуют 5,6,7,8, но зато повторяются 0,1,2?
LoserKiss Автор
Это я просто маленькую часть файла скопировал, и в первой версии выводит строки в случайном порядке, вот 0. 1 и 2 потоки как самые первые и начали вывод