Привет, Хабр!
JavaScript по традиции известен как однопоточный язык. Т.е код выполняется последовательно, и одновременное выполнение нескольких задач может быть проблематичным. Если код сталкивается с тяжелыми вычислительными задачами, это может привести к задержкам и замедлению интерфейса юзера. Поэтому один поток не для каких-либо интенсивных вычислений или обработки больших объемов данных.
Чтобы обойти эти ограничения, были введены Web Workers — они позволяют выполнять JS-код в фоновом потоке, параллельно с основным. Однако, все сложилось так, что простой обмен данными между основным потоком и воркерами через postMessage
имеет свои ограничения и может быть недостаточно хорошим для некоторых задач.
Здесь помогают SharedArrayBuffer и Atomics.
SharedArrayBuffer
SharedArrayBuffer — это особый тип буфера, который позволяет нескольким потокам разделять один и тот же блок памяти. В отличие от того же дефолтного ArrayBuffer, который доступен только одному потоку, SharedArrayBuffer дает общую область памяти, к которой могут обращаться несколько рабочих потоков.
Внутренне SharedArrayBuffer представляет собой блок памяти фикс. размера, который можно использовать для хранения бинарных данных. Он используется в связке с типизированными массивами, такими как Uint8Array или Int32Array.
Но перед тем как работать с SharedArrayBuffer очень важно использовать определенные требования по настройке среды - настройку заголовков HTTP.
COOP заголовок позволяет защитить сайт от атак типа cross-origin. Он должен быть установлен на значение same-origin
:
Cross-Origin-Opener-Policy: same-origin
COEP заголовок предотвращает встраивание сайта в другие документы. Он должен быть установлен на значение require-corp
или credentialless
:
Cross-Origin-Embedder-Policy: require-corp
Теперь рассмотрим как создать и передать SharedArrayBuffer между потоками.
Создание SharedArrayBuffer
Конструктор SharedArrayBuffer
используется для создания буфера определенного размера в байтах. Он принимает два параметра:
length
: размер буфера в байтах.options
(необязательный): объект с параметрами, например какmaxByteLength
, который определяет максимальный размер буфера.
const sab = new SharedArrayBuffer(1024);
const growableSab = new SharedArrayBuffer(8, { maxByteLength: 16 });
Методы SharedArrayBuffer
grow
позволяет увеличивать размер SharedArrayBuffer
, если он был создан с параметром maxByteLength
. Новый размер должен быть меньше или равен maxByteLength
:
const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
if (buffer.growable) {
buffer.grow(12);
}
slice
возвращает новый SharedArrayBuffer
, содержащий копию байтов из оригинального буфера от индекса start
(включительно) до end
(исключительно).
const sab = new SharedArrayBuffer(1024);
const slicedSab = sab.slice(2, 100);
Так можно создавать буфеы, основанные на подмножествах данных из исходного буфера, без изменения оригинального.
Свойства SharedArrayBuffer
byteLength
возвращает длину SharedArrayBuffer
в байтах. Это значение устанавливается при создании буфера и не может быть изменено.
const sab = new SharedArrayBuffer(1024);
console.log(sab.byteLength); // 1024
Свойство growable
указывает, может ли SharedArrayBuffer
изменять свой размер. Если буфер был создан с параметром maxByteLength
, это свойство будет равно true
.
const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.growable); // true
maxByteLength
возвращает максимальный размер буфера в байтах, который был установлен при создании буфера.
const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.maxByteLength); // 16
Atomics
Atomics — это встроенный объект в JS, который предоставляет набор статических методов для выполнения атомарных операций. Атомарные операции выполняются как единое, неделимое действие, именно так atomics позволяет избегать гонок данных.
Основные методы Atomics:
Atomics.add(typedArray, index, value)
Атомарно добавляет значение к элементу массива по заданному индексу и возвращает предыдущее значение:
const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
Atomics.add(int32, 0, 5); // добавляет 5 к int32[0]
Atomics.sub(typedArray, index, value)
Атомарно вычитает значение из элемента массива по заданному индексу и возвращает предыдущее значение:
Atomics.sub(int32, 0, 2); // вычитает 2 из int32[0]
Atomics.load(typedArray, index)
Атомарно считывает значение элемента массива по заданному индексу:
let value = Atomics.load(int32, 0); // считывает значение int32[0]
Atomics.store(typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу:
Atomics.store(int32, 0, 10); // Записывает 10 в int32[0]
Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
Атомарно сравнивает текущее значение элемента массива по заданному индексу с ожидаемым значением. Если они равны, то записывает новое значение и возвращает старое:
Atomics.compareExchange(int32, 0, 10, 20); // если int32[0] равно 10, то записывает 20 и возвращает старое значение
Atomics.exchange(typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу и возвращает старое значение:
Atomics.exchange(int32, 0, 15); // записывает 15 в int32[0] и возвращает старое значение
Atomics.wait(typedArray, index, value, timeout)
Проверяет значение элемента массива и ожидает его изменения до заданного значения или истечения времени тайм-аута. Возвращает "ok", "not-equal" или "timed-out":
let result = Atomics.wait(int32, 0, 10, 1000); // ожидает изменения int32[0] до 10 или 1000 мс
Atomics.notify(typedArray, index, count)
Уведомляет потоки, ожидающие изменения элемента массива по заданному индексу. Возвращает количество уведомленных потоков:
let notified = Atomics.notify(int32, 0, 1); // уведомляет один поток, ожидающий изменения int32[0]
Atomics можно юзать для синхронных операций. Например, для синхронизации работы потоков:
const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// в главном потоке
const worker = new Worker('worker.js');
worker.postMessage(sab);
Atomics.store(int32, 0, 0);
Atomics.wait(int32, 0, 0); // ожидание изменения int32[0]
// В рабочем потоке (worker.js)
self.onmessage = function(event) {
const int32 = new Int32Array(event.data);
Atomics.store(int32, 0, 1);
Atomics.notify(int32, 0, 1); // уведомление главного потока
};
Объединяем Atomics и SharedArrayBuffer
Счетчик запросов в реал тайме
Допустим, есть веб-сервис, который обрабатывает большое количество запросов, и нам нужно следить за кол-вом активных запросов в реальном времени:
// главный поток
const buffer = new SharedArrayBuffer(4); // Создаем буфер на 4 байта
const counter = new Int32Array(buffer);
const worker = new Worker('worker.js');
worker.postMessage(buffer);
// обрабатываем новый запрос
function handleRequest() {
Atomics.add(counter, 0, 1); // увеличиваем счетчик
console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
// симулируем завершение запроса через 2 секунды
setTimeout(() => {
Atomics.sub(counter, 0, 1); // уменьшаем счетчик
console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
}, 2000);
}
// рабочий поток (worker.js)
self.onmessage = function(event) {
const counter = new Int32Array(event.data);
setInterval(() => {
console.log(`[Worker] Активные запросы: ${Atomics.load(counter, 0)}`);
}, 1000);
};
Используем SharedArrayBuffer для хранения счетчика активных запросов, а Atomics дает безопасное увеличение и уменьшение счетчика из разных потоков.
Параллельная обработка данных
Рассмотрим задачу параллельной обработки большого массива данных, например, применение функции к каждому элементу массива:
/// главный поток
const buffer = new SharedArrayBuffer(1024 * 4); // буфер для 1024 чисел
const data = new Int32Array(buffer);
// инициализируем массив случайными числами
for (let i = 0; i < data.length; i++) {
data[i] = Math.floor(Math.random() * 100);
}
const worker = new Worker('worker.js');
worker.postMessage(buffer);
function processData() {
for (let i = 0; i < data.length; i++) {
data[i] *= 2; // применяем функцию к каждому элементу
}
console.log('Обработка данных завершена');
}
processData();
// рабочий поток (worker.js)
self.onmessage = function(event) {
const data = new Int32Array(event.data);
for (let i = 0; i < data.length; i++) {
Atomics.store(data, i, Atomics.load(data, i) * 2); // атомарное умножение на 2
}
console.log('Рабочий поток: обработка данных завершена');
};
Синхронизация состояния между потоками
Предположим, есть некая игрушка, где несколько потоков должны синхронизировать свое состояние:
// главный поток
const buffer = new SharedArrayBuffer(4); // буфер на 4 байта для хранения состояния
const state = new Int32Array(buffer);
const worker = new Worker('worker.js');
worker.postMessage(buffer);
function updateState(newState) {
Atomics.store(state, 0, newState); // обовляем состояние
Atomics.notify(state, 0); // уведомляем рабочий поток об изменении состояния
console.log(`Состояние обновлено: ${newState}`);
}
updateState(1);
// рабочий поток (worker.js)
self.onmessage = function(event) {
const state = new Int32Array(event.data);
function waitForStateChange() {
Atomics.wait(state, 0, Atomics.load(state, 0)); // ожидаем изменения состояния
console.log(`Рабочий поток: состояние изменено на ${Atomics.load(state, 0)}`);
waitForStateChange(); // рекурсивно продолжаем ожидание изменений
}
waitForStateChange();
};
Про востребованные языки программирования и практические инструменты мои коллеги из OTUS рассказывают в рамках онлайн-курсов. По этой ссылке вы можете ознакомиться с полным каталогом курсов, а также записаться на открытые уроки.
Комментарии (13)
alexdora
26.06.2024 21:38Интересно смотреть как рожденного ползать пытаются заставить летать.
Личный опыт показал, что если задача требует многопоточности сразу переходить с javascript и не мучать себя. Вариантов много.
У меня остался вопрос, а с планировщиком что-то порешали? На сколько я помню, главный поток из-за планировщика являлся узким местом.
Dolios
26.06.2024 21:38+1Однако, все сложилось так, что простой обмен данными между основным потоком и воркерами через postMessage имеет свои ограничения и может быть недостаточно хорошим для некоторых задач.
Какие ограничения, почему недостаточно хороший, для каких задач? Какую проблему вы решаете в статье?
AlexSpaizNet
26.06.2024 21:38+2Только у меня упоминание "
Cross-Origin-Opener-Policy" в этой статье вызывает вопросы? Кто-то не почистил ответ чатжпт?
Tsimur_S
26.06.2024 21:38Почему вызывает?
Certain features depend on cross-origin isolation
Certain features like
SharedArrayBuffer
objects orPerformance.now()
with unthrottled timers are only available if your document has a COOP header with the valuesame-origin
set.AlexSpaizNet
26.06.2024 21:38Ну вот в доках конкретно написано на что заголовок влияет. А тут в статье не понятно. Может я поиграться хочу, нужно мне думать про embedding и защищаться?
Tsimur_S
26.06.2024 21:38Но в статье же и написано:
Но перед тем как работать с SharedArrayBuffer очень важно использовать определенные требования по настройке среды - настройку заголовков HTTP.
Или этого текста раньше не было?
Avangardio
Отличная статья, хочется увидеть более реальные примеры, а то так на ум ничего не приходит…
pvvv
Заблокировать исполнение основного потока через Atomics.wait, чтобы подождать выполнения асинхронной функции и тем самым превратить её в синхронную, потому что wasm про асинхронный яваскрипт ничего не знает и для вызова асинхронных функций из wasm нужны вот такие замечательные костыли :)
mayorovp
Так в главном же потоке как раз работа Atomics.wait и не гарантирована...
pvvv
На этот случай есть ещё более замечательные костыли из перехвата serviceworkerом синхронного xmlhttprequest