На дворе стоит двадцать второй год и все основные среды исполнения JavaScript уже вовсю поддерживают доступ к потокам. Причем, в отличие от языков с глобальной блокировкой интерпретатора, вроде Python и Ruby, где для параллельного выполнения задач рекомендуется запускать отдельные процессы, в JS это именно потоки с возможностью использования разделяемой памяти, а также всеми достоинствами и недостатками такой свободы. Конечно, есть и серьезные ограничения - например, не получится совместно использовать один JavaScript-объект в нескольких потоках. Но тем не менее, задачи, связанные с трудоемкими математическими вычислениями или обработкой графики можно смело выносить в отдельные потоки уже сейчас.
Что же с поддержкой?
Посмотрим, что с поддержкой у основных инструментов, задействованных в реализации многопоточного кода: собственно возможности запуска кода в отдельном потоке, низкоуровневых методах синхронизации Atomics, а также буфера для создания области разделяемой памяти SharedArrayBuffer.
C Node.js, где модуль worker_threads стабилен начиная с версии 11.7.0, а Atomics и SharedArrayBuffer и того раньше, все просто. А вот в браузерах, и в особенности в мобильных версиях, поддержка Atomics и SharedArrayBuffer появилась в полной мере лишь в прошлом году. Однако теперь они с нами и готовы разгружать основной поток от тяжелых вычислений.
О структуре статьи
Дабы не зевать, перекладывая ничего не значащие байты из ячейки в ячейку, наблюдая за возникновением коллизий, попробуем разобраться с простейшими примитивами синхронизации на основе физического процесса. Таковым выступит хоккейный матч. На этапе раскатки перед матчем с помощью бинарного семафора разделим игроков на две равных группы, а уже в процессе игры с помощью семафора со счетчиком избавимся от штрафов за нарушение численного состава.
Примеры кода напишем для среды Node.js. Для полноценного восприятия примеров стоит познакомиться с возможностями модуля worker_threads, особенностями использования SharedArrayBuffer и атомарными операциями над ним.
Часть первая: разминка и бинарный семафор
Итак, матч еще впереди, игрокам нужно размяться и тренер решает следующее: первая половина игроков будет тренировать броски, вторая — катание. Очередной, вышедший из раздевалки игрок, получает следующее задание - присоединиться к группе тренирующих катание, или к группе, тренирующих броски так, чтобы количество игроков в группах было равным. Код будем писать в одном файле, воспользовавшись флагом isMainThread для разделения логики основного и вспомогательных потоков. Текущее количество игроков в каждой из групп будем хранить в двух ячейках буфера разделяемой памяти.
const threads = require('worker_threads');
const { Worker, isMainThread } = threads;
if (isMainThread) {
//Главный поток
} else {
//Вспомогательный поток - игрок
}
Код для логики главного потока будет включать в себя три составляющих: инициализацию буфера, запуск потоков, проверка итогового распределения игроков по группам.
/* Код главного потока */
// Инициализируем буфер c двумя счетчиками
// [*делают броски*, *тренируют катание*]
const buffer = new SharedArrayBuffer(2);
// Запускам/выпускаем не площадку 22 потока-игрока
for (let i = 0; i < 22; i++) {
new Worker(__filename, { workerData: buffer });
}
// Проверяем итоговое распределение по группам после разминки
setTimeout(() => {
const array = new Int8Array(buffer, 0);
console.log(`Делали броски: ${array[0]}`);
console.log(`Тренировали катание: ${array[1]}`);
}, 1000);
Задача вспомогательного потока (потока-игрока) заключается в следующем: выбрать группу, где игроков не больше чем в соседней, тем самым сохранив баланс. Сделаем это через проверку на равенство для более яркой иллюстрации состояния гонки.
/* Код вспомогательного потока */
const { threadId, workerData } = threads;
const array = new Int8Array(workerData, 0);
// Получаем количество игроков в каждой из групп
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
// Отправляем игрока тренировать катание или делать броски
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
if (isMainThread) {
const buffer = new SharedArrayBuffer(2);
for (let i = 0; i < 22; i++) {
new Worker(__filename, { workerData: buffer });
}
setTimeout(() => {
const array = new Int8Array(buffer, 0);
console.log(`Делали броски: ${array[0]}`);
console.log(`Тренировали катание: ${array[1]}`);
}, 1000);
} else {
const { threadId, workerData } = threads;
const array = new Int8Array(workerData, 0);
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
}
Таким образом, выходя на поле, игрок присоединится к тренирующим катание, если количество игроков в группах равно, и выберет броски, если количество отличается. Выполняя такую логику в одном потоке, мы будем ожидать, что количество игроков в группах всего будет равным — по 11. Запустив же программу выше, мы получаем совершенно ненадежные результаты (возможно придется запустить ее несколько раз).
Делали броски: 14
Тренировали катание: 8
Это происходит по той причине, что в критическую секцию (код проверки количества игроков в группах и выбора одной из них) одновременно могут войти сразу несколько потоков. Это похоже на то, как игроки выходя на поле и увидев, что группы наполнены одинаково, оба отправляются в одну и ту же группу. Этого можно было бы избежать, если бы один из игроков дождался решения другого.
Бинарный семафор
Бинарный семафор является инструментом для управления доступом к разделяемым данным (в нашем случае счетчикам количества игроков). Он гарантирует, что в каждый момент времени работать с ресурсом может не более одного потока. Используя бинарный семафор поток может захватить блокировку, выполнить код и освободить блокировку, не беспокоясь, что в момент выполнения другой поток зайдет в критическую секцию. Похожим примитивом синхронизации является мьютекс за тем лишь исключением, что в случае мьютекса освободить блокировку может только поток-владелец, первым захвативший мьютекс.
Чтобы вникнуть в суть идеи пройдем путь создания бинарного семафора итеративно. Для начала посмотрим, как может выглядеть вариант его использования.
/* Код вспомогательного потока */
const { threadId, workerData } = threads;
// Инициализируем семафор в каждом потоке
const semaphore = new BinarySemaphore(workerData);
const array = new Int8Array(workerData, 1);
// Блокируем критическую секцию
semaphore.enter();
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
// Освобождаем критическую секцию
semaphore.leave();
Итак войдя в критическую секцию поток блокирует доступ к ней, а выполнив код — освобождает ее. Флаг для хранения состояния семафора поместим в том же буфере разделяемой памяти на нулевой позиции. Посмотрим как покажет себя следующая наивная реализация.
const LOCKED = 1;
const UNLOCKED = 0;
class BinarySemaphore {
constructor(shared, offset = 0) {
this.lock = new Int8Array(shared, offset, 1);
}
enter() {
while (this.lock[0] !== UNLOCKED);
this.lock[0] = LOCKED;
}
leave() {
this.lock[0] = UNLOCKED;
}
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
const LOCKED = 1;
const UNLOCKED = 0;
class BinarySemaphore {
constructor(shared, offset = 0) {
this.lock = new Int8Array(shared, offset, 1);
}
enter() {
while (this.lock[0] !== UNLOCKED);
this.lock[0] = LOCKED;
}
leave() {
this.lock[0] = UNLOCKED;
}
}
if (isMainThread) {
const buffer = new SharedArrayBuffer(3);
for (let i = 0; i < 22; i++) {
new Worker(__filename, { workerData: buffer });
}
setTimeout(() => {
const array = new Int8Array(buffer, 1);
console.log(`Делали броски: ${array[0]}`);
console.log(`Тренировали катание: ${array[1]}`);
}, 2000);
} else {
const { threadId, workerData } = threads;
const semaphore = new BinarySemaphore(workerData);
const array = new Int8Array(workerData, 1);
semaphore.enter();
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
semaphore.leave();
}
Запускаем, и... видим, что стало значительно лучше — игроки почти всегда делятся на равные группы. Однако запуская программу снова и снова (или увеличив количество игроков-потоков), мы снова видим, что ошибки случаются. Почему же это происходит? А причина в том, что теперь у нас снова есть критическая секция - это код метода enter семафора. Получив значение флага мы не гарантируем, что оно будет тем же в момент, когда мы устанавливаем его в состояние блокировки. Самое время воспользоваться возможностями Atomics.
В этот момент можно задаться вопросом, зачем в принципе использовать бинарный семафор или мьютекс, если мы уже имеем Atomics с возможностью атомарно обращаться с разделяемой памятью. Причина в требуемой сложности операций. Если нам достаточно логики для проверки и изменения значения одной ячейки памяти — можно и нужно использовать Atomics, его будет достаточно. В противном случае, когда требуется атомарно считать/отредактировать две и больше ячейки памяти — потребуется более сложная логика для синхронизации.
Перепишем семафор на Atomics, воспроизведя по сути ту же логику из примера выше, используя атомарные операции.
const {
compareExchange, wait, notify
} = Atomics;
class BinarySemaphore {
constructor(shared, offset = 0) {
this.lock = new Int32Array(shared, offset, 1);
}
enter() {
while (true) {
if (compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {
return;
}
wait(this.lock, 0, LOCKED);
}
}
leave() {
if (compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
// Лучше выкинуть исключение, чтобы не прозевать такой момент
return;
}
notify(this.lock, 0, 1);
}
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
const LOCKED = 1;
const UNLOCKED = 0;
class BinarySemaphore {
constructor(shared, offset = 0) {
this.lock = new Int32Array(shared, offset, 1);
}
enter() {
while (true) {
if (Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {
return;
}
Atomics.wait(this.lock, 0, LOCKED);
}
}
leave() {
if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
return;
}
Atomics.notify(this.lock, 0, 1);
}
}
if (isMainThread) {
const buffer = new SharedArrayBuffer(6);
for (let i = 0; i < 22; i++) {
new Worker(__filename, { workerData: buffer });
}
setTimeout(() => {
const array = new Int8Array(buffer, 4);
console.log(`Делали броски: ${array[0]}`);
console.log(`Тренировали катание: ${array[1]}`);
}, 1000);
} else {
const { threadId, workerData } = threads;
const semaphore = new BinarySemaphore(workerData);
const array = new Int8Array(workerData, 4);
semaphore.enter();
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
semaphore.leave();
}
Это уже абсолютно надежный вариант. Можно проверить, увеличивая количество потоков и выполняя программу большое число раз — игроки всегда будут делиться на две одинаковые группы. В процесс проверки и изменения флага посредством compareExchange не сможет вклиниться ни один другой поток кроме текущего, обращающегося к семафору.
Делали броски: 11
Тренировали катание: 11
Следует обратить внимание, что в качестве типизированного массива для представления буффера теперь используется экземпляр Int32Array. Основные методы Atomics работают только с экземплярами Int32Array и BigInt64Array. Попробовав применить их к неподдерживаемому типу TypedArray, мы получим ошибку вроде приведенной ниже.
Uncaught TypeError: [object Int8Array] is not an int32 or BigInt64 typed array
Осталось лишь добавить, что на практике гораздо удобнее использовать семафор в callback-формате, реализовав дополнительный метод в коде семафора.
exec(callback) {
this.enter();
try {
return callback();
} finally {
this.leave();
}
}
Применить его мы сможем следующим образом.
semaphore.exec(() => {
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
});
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
const LOCKED = 1;
const UNLOCKED = 0;
class BinarySemaphore {
constructor(shared, offset = 0) {
this.lock = new Int32Array(shared, offset, 1);
}
enter() {
while (true) {
if (
Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED
) {
return;
}
Atomics.wait(this.lock, 0, LOCKED);
}
}
leave() {
if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
throw new Error("Cannot leave unlocked BinarySemaphore");
}
Atomics.notify(this.lock, 0, 1);
}
exec(callback) {
this.enter();
try {
return callback();
} finally {
this.leave();
}
}
}
if (isMainThread) {
const buffer = new SharedArrayBuffer(6);
for (let i = 0; i < 22; i++) {
new Worker(__filename, { workerData: buffer });
}
setTimeout(() => {
const array = new Int8Array(buffer, 4);
console.log(`Делали броски: ${array[0]}`);
console.log(`Тренировали катание: ${array[1]}`);
}, 1000);
} else {
const { threadId, workerData } = threads;
const semaphore = new BinarySemaphore(workerData);
const array = new Int8Array(workerData, 4);
semaphore.exec(() => {
const [doKicks, doSkating] = array;
console.log(`На лед выходит ${threadId}.`);
if (doKicks === doSkating) {
array[1]++;
} else {
array[0]++;
}
});
}
Естественно, существует не один вариант реализации методов enter и leave на основе Atomics, например, возможен рекурсивный вариант enter. В любом случае, итоговое решение может выбираться из требований по памяти и быстродействию. Есть также вероятность, что дополнительная логика кроме атомарных операций Atomics и не потребуется в принципе (что было бы замечательно).
Часть вторая: матч и семафор со счетчиком
Матч в разгаре, игроки меняются. Отдохнувшие постепенно заменяют уставших, следуя правилу: на площадке не должно быть больше пяти игроков (вратаря не меняем). В критическую секцию (площадку для игры) допускаем теперь до пяти потоков одновременно (но не более). Булевого флага, использовавшегося в бинарном семафоре для этого недостаточно — нужен счетчик.
Семафор со счетчиком
Пойдем проторенной тропинкой, посмотрим на код потоков, а затем доработаем семафор.
/* Код главного потока */
const buffer = new SharedArrayBuffer(4);
// Инициализируем семафор, устанавливая счетчик в 5
const semaphore = new CountingSemaphore(buffer, 0, 5);
console.log(`Счетчик семафора: ${semaphore.counter[0]}`);
// Постепенно выпускаем на площадку игроков
for (let i = 0; i < 50; i++) {
new Worker(__filename, { workerData: buffer });
}
В игре одновременно задействовано много игроков (их больше 20, так как пятерки сменяются не один раз за игру), а мест на площадке всего 5.
/* Код вспомогательного потока */
const { threadId, workerData } = threads;
const semaphore = new CountingSemaphore(workerData);
// Входим в критическую секцию
semaphore.enter();
console.log(`На лед выходит ${threadId}.`);
// Вычисляем текущее количество игроков на площадке
const players = 5 - semaphore.counter[0];
if (players > 5) {
console.log('Нарушение! На поле ' + players + ' игроков');
}
// Спустя 10мс выходим из нее
setTimeout(() => semaphore.leave(), 10);
Доработаем первую версию бинарного семафора, заменив флаг на счетчик и попробуем, что из этого получится.
class CountingSemaphore {
constructor(shared, offset = 0, initial) {
this.counter = new Int32Array(shared, offset, 1);
if (typeof initial === "number") {
this.counter[0] = initial;
}
}
enter() {
while (this.counter[0] === 0);
this.counter[0]--;
}
leave() {
this.counter[0]++;
}
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
class CountingSemaphore {
constructor(shared, offset = 0, initial) {
this.counter = new Int32Array(shared, offset, 1);
if (typeof initial === "number") {
this.counter[0] = initial;
}
}
enter() {
while (this.counter[0] === 0);
this.counter[0]--;
}
leave() {
this.counter[0]++;
}
}
if (isMainThread) {
const buffer = new SharedArrayBuffer(4);
const semaphore = new CountingSemaphore(buffer, 0, 5);
console.log(`Счетчик семафора: ${semaphore.counter[0]}`);
for (let i = 0; i < 50; i++) {
new Worker(__filename, { workerData: buffer });
}
} else {
const { threadId, workerData } = threads;
const semaphore = new CountingSemaphore(workerData);
semaphore.enter();
console.log(`На лед выходит ${threadId}.`);
const players = 5 - semaphore.counter[0];
if (players > 5) {
console.log(`Нарушение! На поле ${players} игроков`);
}
setTimeout(() => {
semaphore.leave();
}, 10);
}
Результат достаточно ожидаем для тех, кто прочитал первую часть статьи. Запускаем программу видим, что команда получает штраф за нарушение численного состава с завидной регулярностью.
На лед выходит 40
На лед выходит 42
Нарушение! На поле 6 игроков
Причина кроется все в том же — в метод enter семафора вхожи все потоки одновременно, что приводит к состоянию гонки. Решение на поверхности — привет, Atomics!
class CountingSemaphore {
constructor(shared, offset = 0, initial) {
this.counter = new Int32Array(shared, offset, 1);
if (typeof initial === "number") {
Atomics.store(this.counter, 0, initial);
}
}
enter() {
while (true) {
Atomics.wait(this.counter, 0, 0);
const n = Atomics.load(this.counter, 0);
if (n > 0) {
const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);
if (prev === n) return;
}
}
}
leave() {
Atomics.add(this.counter, 0, 1);
Atomics.notify(this.counter, 0, 1);
}
}
Здесь пришлось попотеть над методом enter по той причине, что при установке счетчика должны быть выполнены два условия: он не должен опуститься ниже ноля, в также не должен поменяться в промежутке между считыванием и установкой нового значения. Учтя эти два условия получаем надежный семафор, не допускающий внутрь критической секции лишние потоки.
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;
class CountingSemaphore {
constructor(shared, offset = 0, initial) {
this.counter = new Int32Array(shared, offset, 1);
if (typeof initial === "number") {
Atomics.store(this.counter, 0, initial);
}
}
enter() {
while (true) {
Atomics.wait(this.counter, 0, 0);
const n = Atomics.load(this.counter, 0);
if (n > 0) {
const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);
if (prev === n) return;
}
}
}
leave() {
Atomics.add(this.counter, 0, 1);
Atomics.notify(this.counter, 0, 1);
}
}
if (isMainThread) {
const buffer = new SharedArrayBuffer(4);
const semaphore = new CountingSemaphore(buffer, 0, 5);
console.log(`Счетчик семафора: ${semaphore.counter[0]}`);
for (let i = 0; i < 50; i++) {
new Worker(__filename, { workerData: buffer });
}
} else {
const { threadId, workerData } = threads;
const semaphore = new CountingSemaphore(workerData);
semaphore.enter();
console.log(`На лед выходит ${threadId}.`);
const players = 5 - semaphore.counter[0];
if (players > 5) {
console.log(`Нарушение! На поле ${players} игроков`);
}
setTimeout(() => {
semaphore.leave();
}, 10);
}
Запускаем программу и видим, что полностью избавились от штрафов. На площадке (критической секции) в любой момент времени не более пяти потоков-игроков.
Итого
Что же имеем в сухом остатке. Семафоры — простое средство для блокировки доступа к разделяемому ресурсу (в нашем случае буферу памяти). Они позволяют выполнять код в критической секции только ограниченному числу потоков (одному или нескольким), не опасаясь вмешательства со стороны других потоков. Комбинируя атомарные операции, можно создавать элементы для многопоточного программирования любой сложности. В текущий момент все необходимое для реализации многопоточных выислений достаточно широко поддержано во всех средах JavaScript вплоть до мобильных браузеров.
Комментарии (4)
anonymous
00.00.0000 00:00alexprozoroff Автор
26.07.2022 23:17Должно быть верно, здесь логика следущая:
// Ждем пока освободится место для нашего потока, то есть счетчик станет больше 0 Atomics.wait(this.counter, 0, 0); // Получаем значение счетчика const n = Atomics.load(this.counter, 0); // Еще раз проверям на то, что n > 0, так как другой поток мог вклиниться между wait и load if (n > 0) { // Пытаемся атомарно уменьшить счетчик, получив его предыдущее значение const prev = Atomics.compareExchange(this.counter, 0, n, n - 1); // Если prev === n, значит мы успешно уменьшили счетчик, все ок, можно выходить из цикла if (prev === n) return; // Eсли prev !== n, значит мы не изменили счетчик, надо пробовать снова - идем в начало цикла }
gdt
Ходят слухи, что бинарный семафор больше подходит для синхронизации между потоками, в то время как мьютекс как раз отлично подходит для организации критической секции, т. е. take ownership. Так ли это?
alexprozoroff Автор
Все так, реализуя и используя семафор, мы ожидаем, что другой поток может перехватить управление и войти в критическую секцию раньше первого потока. Единственное, в чем мы точно уверены в этом случае - одновременно в критической секции будет только один поток. Мьютекс же полностью заблокирует доступ к ресурсу, и снять его можно будет только из текущего потока (владельца ресурса).