На дворе стоит двадцать второй год и все основные среды исполнения JavaScript уже вовсю поддерживают доступ к потокам. Причем, в отличие от языков с глобальной блокировкой интерпретатора, вроде Python и Ruby, где для параллельного выполнения задач рекомендуется запускать отдельные процессы, в JS это именно потоки с возможностью использования разделяемой памяти, а также всеми достоинствами и недостатками такой свободы. Конечно, есть и серьезные ограничения - например, не получится совместно использовать один JavaScript-объект в нескольких потоках. Но тем не менее, задачи, связанные с трудоемкими математическими вычислениями или обработкой графики можно смело выносить в отдельные потоки уже сейчас.

Что же с поддержкой?

Посмотрим, что с поддержкой у основных инструментов, задействованных в реализации многопоточного кода: собственно возможности запуска кода в отдельном потоке, низкоуровневых методах синхронизации Atomics, а также буфера для создания области разделяемой памяти SharedArrayBuffer.

C Node.js, где модуль worker_threads стабилен начиная с версии 11.7.0, а Atomics и SharedArrayBuffer и того раньше, все просто. А вот в браузерах, и в особенности в мобильных версиях, поддержка Atomics и SharedArrayBuffer появилась в полной мере лишь в прошлом году. Однако теперь они с нами и готовы разгружать основной поток от тяжелых вычислений.

О структуре статьи

Дабы не зевать, перекладывая ничего не значащие байты из ячейки в ячейку, наблюдая за возникновением коллизий, попробуем разобраться с простейшими примитивами синхронизации на основе физического процесса. Таковым выступит хоккейный матч. На этапе раскатки перед матчем с помощью бинарного семафора разделим игроков на две равных группы, а уже в процессе игры с помощью семафора со счетчиком избавимся от штрафов за нарушение численного состава.

Примеры кода напишем для среды Node.js. Для полноценного восприятия примеров стоит познакомиться с возможностями модуля worker_threads, особенностями использования SharedArrayBuffer и атомарными операциями над ним.

Кипятков Кирилл - Хоккей (2016)
Кипятков Кирилл - Хоккей (2016)

Часть первая: разминка и бинарный семафор

Итак, матч еще впереди, игрокам нужно размяться и тренер решает следующее: первая половина игроков будет тренировать броски, вторая — катание. Очередной, вышедший из раздевалки игрок, получает следующее задание - присоединиться к группе тренирующих катание, или к группе, тренирующих броски так, чтобы количество игроков в группах было равным. Код будем писать в одном файле, воспользовавшись флагом isMainThread для разделения логики основного и вспомогательных потоков. Текущее количество игроков в каждой из групп будем хранить в двух ячейках буфера разделяемой памяти.

const threads = require('worker_threads');
const { Worker, isMainThread } = threads;

if (isMainThread) {
  //Главный поток
} else {
  //Вспомогательный поток - игрок
}

Код для логики главного потока будет включать в себя три составляющих: инициализацию буфера, запуск потоков, проверка итогового распределения игроков по группам.

/* Код главного потока */

// Инициализируем буфер c двумя счетчиками 
// [*делают броски*, *тренируют катание*]
const buffer = new SharedArrayBuffer(2);

// Запускам/выпускаем не площадку 22 потока-игрока
for (let i = 0; i < 22; i++) {
  new Worker(__filename, { workerData: buffer });
}

// Проверяем итоговое распределение по группам после разминки
setTimeout(() => {
	const array = new Int8Array(buffer, 0);
	console.log(`Делали броски: ${array[0]}`);
  console.log(`Тренировали катание: ${array[1]}`);
}, 1000);

Задача вспомогательного потока (потока-игрока) заключается в следующем: выбрать группу, где игроков не больше чем в соседней, тем самым сохранив баланс. Сделаем это через проверку на равенство для более яркой иллюстрации состояния гонки.

/* Код вспомогательного потока */

const { threadId, workerData } = threads;
const array = new Int8Array(workerData, 0);

// Получаем количество игроков в каждой из групп
const [doKicks, doSkating] = array;

console.log(`На лед выходит ${threadId}.`);

// Отправляем игрока тренировать катание или делать броски
if (doKicks === doSkating) {
	array[1]++;
} else {
	array[0]++;
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

if (isMainThread) {
  const buffer = new SharedArrayBuffer(2);

  for (let i = 0; i < 22; i++) {
    new Worker(__filename, { workerData: buffer });
  }

  setTimeout(() => {
    const array = new Int8Array(buffer, 0);
    console.log(`Делали броски: ${array[0]}`);
    console.log(`Тренировали катание: ${array[1]}`);
  }, 1000);
} else {
  const { threadId, workerData } = threads;
  const array = new Int8Array(workerData, 0);
  const [doKicks, doSkating] = array;

  console.log(`На лед выходит ${threadId}.`);

  if (doKicks === doSkating) {
    array[1]++;
  } else {
    array[0]++;
  }
}

Таким образом, выходя на поле, игрок присоединится к тренирующим катание, если количество игроков в группах равно, и выберет броски, если количество отличается. Выполняя такую логику в одном потоке, мы будем ожидать, что количество игроков в группах всего будет равным — по 11. Запустив же программу выше, мы получаем совершенно ненадежные результаты (возможно придется запустить ее несколько раз).

Делали броски: 14
Тренировали катание: 8

Это происходит по той причине, что в критическую секцию (код проверки количества игроков в группах и выбора одной из них) одновременно могут войти сразу несколько потоков. Это похоже на то, как игроки выходя на поле и увидев, что группы наполнены одинаково, оба отправляются в одну и ту же группу. Этого можно было бы избежать, если бы один из игроков дождался решения другого.

Бинарный семафор

Бинарный семафор является инструментом для управления доступом к разделяемым данным (в нашем случае счетчикам количества игроков). Он гарантирует, что в каждый момент времени работать с ресурсом может не более одного потока. Используя бинарный семафор поток может захватить блокировку, выполнить код и освободить блокировку, не беспокоясь, что в момент выполнения другой поток зайдет в критическую секцию. Похожим примитивом синхронизации является мьютекс за тем лишь исключением, что в случае мьютекса освободить блокировку может только поток-владелец, первым захвативший мьютекс.

Чтобы вникнуть в суть идеи пройдем путь создания бинарного семафора итеративно. Для начала посмотрим, как может выглядеть вариант его использования.

/* Код вспомогательного потока */

const { threadId, workerData } = threads;

// Инициализируем семафор в каждом потоке
const semaphore = new BinarySemaphore(workerData);
const array = new Int8Array(workerData, 1);

// Блокируем критическую секцию
semaphore.enter();
const [doKicks, doSkating] = array;

console.log(`На лед выходит ${threadId}.`);

if (doKicks === doSkating) {
	array[1]++;
} else {
	array[0]++;
}

// Освобождаем критическую секцию
semaphore.leave();

Итак войдя в критическую секцию поток блокирует доступ к ней, а выполнив код — освобождает ее. Флаг для хранения состояния семафора поместим в том же буфере разделяемой памяти на нулевой позиции. Посмотрим как покажет себя следующая наивная реализация.

const LOCKED = 1;
const UNLOCKED = 0;

class BinarySemaphore {
	constructor(shared, offset = 0) {
	  this.lock = new Int8Array(shared, offset, 1);
	}

	enter() {
	  while (this.lock[0] !== UNLOCKED);
	  this.lock[0] = LOCKED;
	}
  
	leave() {
	  this.lock[0] = UNLOCKED;
	}
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

const LOCKED = 1;
const UNLOCKED = 0;

class BinarySemaphore {
  constructor(shared, offset = 0) {
    this.lock = new Int8Array(shared, offset, 1);
  }

  enter() {
    while (this.lock[0] !== UNLOCKED);
    this.lock[0] = LOCKED;
  }

  leave() {
    this.lock[0] = UNLOCKED;
  }
}

if (isMainThread) {
  const buffer = new SharedArrayBuffer(3);

  for (let i = 0; i < 22; i++) {
    new Worker(__filename, { workerData: buffer });
  }

  setTimeout(() => {
    const array = new Int8Array(buffer, 1);
    console.log(`Делали броски: ${array[0]}`);
    console.log(`Тренировали катание: ${array[1]}`);
  }, 2000);
} else {
  const { threadId, workerData } = threads;
  const semaphore = new BinarySemaphore(workerData);
  const array = new Int8Array(workerData, 1);

  semaphore.enter();
  const [doKicks, doSkating] = array;

  console.log(`На лед выходит ${threadId}.`);

  if (doKicks === doSkating) {
    array[1]++;
  } else {
    array[0]++;
  }

  semaphore.leave();
}

Запускаем, и... видим, что стало значительно лучше — игроки почти всегда делятся на равные группы. Однако запуская программу снова и снова (или увеличив количество игроков-потоков), мы снова видим, что ошибки случаются. Почему же это происходит? А причина в том, что теперь у нас снова есть критическая секция - это код метода enter семафора. Получив значение флага мы не гарантируем, что оно будет тем же в момент, когда мы устанавливаем его в состояние блокировки. Самое время воспользоваться возможностями Atomics.

В этот момент можно задаться вопросом, зачем в принципе использовать бинарный семафор или мьютекс, если мы уже имеем Atomics с возможностью атомарно обращаться с разделяемой памятью. Причина в требуемой сложности операций. Если нам достаточно логики для проверки и изменения значения одной ячейки памяти — можно и нужно использовать Atomics, его будет достаточно. В противном случае, когда требуется атомарно считать/отредактировать две и больше ячейки памяти — потребуется более сложная логика для синхронизации.

Перепишем семафор на Atomics, воспроизведя по сути ту же логику из примера выше, используя атомарные операции.

const {
  compareExchange, wait, notify
} = Atomics;

class BinarySemaphore {
  constructor(shared, offset = 0) {
    this.lock = new Int32Array(shared, offset, 1);
  }

  enter() {
    while (true) {
      if (compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {
        return;
      }
      wait(this.lock, 0, LOCKED);
    }
  }

  leave() {
    if (compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
      // Лучше выкинуть исключение, чтобы не прозевать такой момент
      return;
    }
    notify(this.lock, 0, 1);
  }
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

const LOCKED = 1;
const UNLOCKED = 0;

class BinarySemaphore {
  constructor(shared, offset = 0) {
    this.lock = new Int32Array(shared, offset, 1);
  }

  enter() {
    while (true) {
      if (Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {
        return;
      }
      Atomics.wait(this.lock, 0, LOCKED);
    }
  }

  leave() {
    if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
      return;
    }
    Atomics.notify(this.lock, 0, 1);
  }
}

if (isMainThread) {
  const buffer = new SharedArrayBuffer(6);

  for (let i = 0; i < 22; i++) {
    new Worker(__filename, { workerData: buffer });
  }

  setTimeout(() => {
    const array = new Int8Array(buffer, 4);
    console.log(`Делали броски: ${array[0]}`);
    console.log(`Тренировали катание: ${array[1]}`);
  }, 1000);
} else {
  const { threadId, workerData } = threads;
  const semaphore = new BinarySemaphore(workerData);
  const array = new Int8Array(workerData, 4);

  semaphore.enter();
  const [doKicks, doSkating] = array;

  console.log(`На лед выходит ${threadId}.`);

  if (doKicks === doSkating) {
    array[1]++;
  } else {
    array[0]++;
  }

  semaphore.leave();
}

Это уже абсолютно надежный вариант. Можно проверить, увеличивая количество потоков и выполняя программу большое число раз — игроки всегда будут делиться на две одинаковые группы. В процесс проверки и изменения флага посредством compareExchange не сможет вклиниться ни один другой поток кроме текущего, обращающегося к семафору.

Делали броски: 11
Тренировали катание: 11

Следует обратить внимание, что в качестве типизированного массива для представления буффера теперь используется экземпляр Int32Array. Основные методы Atomics работают только с экземплярами Int32Array и BigInt64Array. Попробовав применить их к неподдерживаемому типу TypedArray, мы получим ошибку вроде приведенной ниже.

Uncaught TypeError: [object Int8Array] is not an int32 or BigInt64 typed array

Осталось лишь добавить, что на практике гораздо удобнее использовать семафор в callback-формате, реализовав дополнительный метод в коде семафора.

exec(callback) {
  this.enter();
  try {
    return callback();
  } finally {
    this.leave();
  }
}

Применить его мы сможем следующим образом.

semaphore.exec(() => {
  const [doKicks, doSkating] = array;

  console.log(`На лед выходит ${threadId}.`);

  if (doKicks === doSkating) {
    array[1]++;
  } else {
    array[0]++;
  }
});
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

const LOCKED = 1;
const UNLOCKED = 0;

class BinarySemaphore {
  constructor(shared, offset = 0) {
    this.lock = new Int32Array(shared, offset, 1);
  }

  enter() {
    while (true) {
      if (
        Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED
      ) {
        return;
      }
      Atomics.wait(this.lock, 0, LOCKED);
    }
  }

  leave() {
    if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {
      throw new Error("Cannot leave unlocked BinarySemaphore");
    }
    Atomics.notify(this.lock, 0, 1);
  }

  exec(callback) {
    this.enter();
    try {
      return callback();
    } finally {
      this.leave();
    }
  }
}

if (isMainThread) {
  const buffer = new SharedArrayBuffer(6);

  for (let i = 0; i < 22; i++) {
    new Worker(__filename, { workerData: buffer });
  }

  setTimeout(() => {
    const array = new Int8Array(buffer, 4);
    console.log(`Делали броски: ${array[0]}`);
    console.log(`Тренировали катание: ${array[1]}`);
  }, 1000);
} else {
  const { threadId, workerData } = threads;
  const semaphore = new BinarySemaphore(workerData);
  const array = new Int8Array(workerData, 4);

  semaphore.exec(() => {
    const [doKicks, doSkating] = array;

    console.log(`На лед выходит ${threadId}.`);

    if (doKicks === doSkating) {
      array[1]++;
    } else {
      array[0]++;
    }
  });
}

Естественно, существует не один вариант реализации методов enter и leave на основе Atomics, например, возможен рекурсивный вариант enter. В любом случае, итоговое решение может выбираться из требований по памяти и быстродействию. Есть также вероятность, что дополнительная логика кроме атомарных операций Atomics и не потребуется в принципе (что было бы замечательно).

Часть вторая: матч и семафор со счетчиком

Матч в разгаре, игроки меняются. Отдохнувшие постепенно заменяют уставших, следуя правилу: на площадке не должно быть больше пяти игроков (вратаря не меняем). В критическую секцию (площадку для игры) допускаем теперь до пяти потоков одновременно (но не более). Булевого флага, использовавшегося в бинарном семафоре для этого недостаточно — нужен счетчик.

Семафор со счетчиком

Пойдем проторенной тропинкой, посмотрим на код потоков, а затем доработаем семафор.

/* Код главного потока */

const buffer = new SharedArrayBuffer(4);

// Инициализируем семафор, устанавливая счетчик в 5
const semaphore = new CountingSemaphore(buffer, 0, 5);

console.log(`Счетчик семафора: ${semaphore.counter[0]}`);

// Постепенно выпускаем на площадку игроков
for (let i = 0; i < 50; i++) {
  new Worker(__filename, { workerData: buffer });
}

В игре одновременно задействовано много игроков (их больше 20, так как пятерки сменяются не один раз за игру), а мест на площадке всего 5.

/* Код вспомогательного потока */

const { threadId, workerData } = threads;
const semaphore = new CountingSemaphore(workerData);
  
// Входим в критическую секцию
semaphore.enter();
console.log(`На лед выходит ${threadId}.`);

// Вычисляем текущее количество игроков на площадке
const players = 5 - semaphore.counter[0];
if (players > 5) {
	console.log('Нарушение! На поле ' + players + ' игроков');
}

// Спустя 10мс выходим из нее
setTimeout(() => semaphore.leave(), 10);

Доработаем первую версию бинарного семафора, заменив флаг на счетчик и попробуем, что из этого получится.

class CountingSemaphore {
  constructor(shared, offset = 0, initial) {
    this.counter = new Int32Array(shared, offset, 1);
    if (typeof initial === "number") {
      this.counter[0] = initial;
    }
  }

  enter() {
    while (this.counter[0] === 0);
    this.counter[0]--;
  }

  leave() {
    this.counter[0]++;
  }
}
Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

class CountingSemaphore {
  constructor(shared, offset = 0, initial) {
    this.counter = new Int32Array(shared, offset, 1);
    if (typeof initial === "number") {
      this.counter[0] = initial;
    }
  }

  enter() {
    while (this.counter[0] === 0);
    this.counter[0]--;
  }

  leave() {
    this.counter[0]++;
  }
}

if (isMainThread) {
  const buffer = new SharedArrayBuffer(4);
  const semaphore = new CountingSemaphore(buffer, 0, 5);
  console.log(`Счетчик семафора: ${semaphore.counter[0]}`);
  for (let i = 0; i < 50; i++) {
    new Worker(__filename, { workerData: buffer });
  }
} else {
  const { threadId, workerData } = threads;
  const semaphore = new CountingSemaphore(workerData);

  semaphore.enter();
  console.log(`На лед выходит ${threadId}.`);
  const players = 5 - semaphore.counter[0];
  if (players > 5) {
    console.log(`Нарушение! На поле ${players} игроков`);
  }
  setTimeout(() => {
    semaphore.leave();
  }, 10);
}

Результат достаточно ожидаем для тех, кто прочитал первую часть статьи. Запускаем программу видим, что команда получает штраф за нарушение численного состава с завидной регулярностью.

На лед выходит 40
На лед выходит 42
Нарушение! На поле 6 игроков

Причина кроется все в том же — в метод enter семафора вхожи все потоки одновременно, что приводит к состоянию гонки. Решение на поверхности — привет, Atomics!

class CountingSemaphore {
  constructor(shared, offset = 0, initial) {
    this.counter = new Int32Array(shared, offset, 1);
    if (typeof initial === "number") {
      Atomics.store(this.counter, 0, initial);
    }
  }

  enter() {
    while (true) {
      Atomics.wait(this.counter, 0, 0);
      const n = Atomics.load(this.counter, 0);
      if (n > 0) {
        const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);
        if (prev === n) return;
      }
    }
  }

  leave() {
    Atomics.add(this.counter, 0, 1);
    Atomics.notify(this.counter, 0, 1);
  }
}

Здесь пришлось попотеть над методом enter по той причине, что при установке счетчика должны быть выполнены два условия: он не должен опуститься ниже ноля, в также не должен поменяться в промежутке между считыванием и установкой нового значения. Учтя эти два условия получаем надежный семафор, не допускающий внутрь критической секции лишние потоки.

Полный код примера
const threads = require("worker_threads");
const { Worker, isMainThread } = threads;

class CountingSemaphore {
  constructor(shared, offset = 0, initial) {
    this.counter = new Int32Array(shared, offset, 1);
    if (typeof initial === "number") {
      Atomics.store(this.counter, 0, initial);
    }
  }

  enter() {
    while (true) {
      Atomics.wait(this.counter, 0, 0);
      const n = Atomics.load(this.counter, 0);
      if (n > 0) {
        const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);
        if (prev === n) return;
      }
    }
  }

  leave() {
    Atomics.add(this.counter, 0, 1);
    Atomics.notify(this.counter, 0, 1);
  }
}

if (isMainThread) {
  const buffer = new SharedArrayBuffer(4);
  const semaphore = new CountingSemaphore(buffer, 0, 5);
  console.log(`Счетчик семафора: ${semaphore.counter[0]}`);
  for (let i = 0; i < 50; i++) {
    new Worker(__filename, { workerData: buffer });
  }
} else {
  const { threadId, workerData } = threads;
  const semaphore = new CountingSemaphore(workerData);
  semaphore.enter();
  console.log(`На лед выходит ${threadId}.`);
  const players = 5 - semaphore.counter[0];
  if (players > 5) {
    console.log(`Нарушение! На поле ${players} игроков`);
  }

  setTimeout(() => {
    semaphore.leave();
  }, 10);
}

Запускаем программу и видим, что полностью избавились от штрафов. На площадке (критической секции) в любой момент времени не более пяти потоков-игроков.

Итого

Что же имеем в сухом остатке. Семафоры — простое средство для блокировки доступа к разделяемому ресурсу (в нашем случае буферу памяти). Они позволяют выполнять код в критической секции только ограниченному числу потоков (одному или нескольким), не опасаясь вмешательства со стороны других потоков. Комбинируя атомарные операции, можно создавать элементы для многопоточного программирования любой сложности. В текущий момент все необходимое для реализации многопоточных выислений достаточно широко поддержано во всех средах JavaScript вплоть до мобильных браузеров.

Комментарии (4)


  1. gdt
    26.07.2022 16:48
    +1

    Ходят слухи, что бинарный семафор больше подходит для синхронизации между потоками, в то время как мьютекс как раз отлично подходит для организации критической секции, т. е. take ownership. Так ли это?


    1. alexprozoroff Автор
      26.07.2022 21:32
      +1

      Все так, реализуя и используя семафор, мы ожидаем, что другой поток может перехватить управление и войти в критическую секцию раньше первого потока. Единственное, в чем мы точно уверены в этом случае - одновременно в критической секции будет только один поток. Мьютекс же полностью заблокирует доступ к ресурсу, и снять его можно будет только из текущего потока (владельца ресурса).


  1. anonymous
    00.00.0000 00:00


    1. alexprozoroff Автор
      26.07.2022 23:17

      Должно быть верно, здесь логика следущая:

      // Ждем пока освободится место для нашего потока, то есть счетчик станет больше 0
      Atomics.wait(this.counter, 0, 0);
      // Получаем значение счетчика
      const n = Atomics.load(this.counter, 0);
      // Еще раз проверям на то, что n > 0, так как другой поток мог вклиниться между wait и load
      if (n > 0) {
         // Пытаемся атомарно уменьшить счетчик, получив его предыдущее значение
        const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);
         // Если prev === n, значит мы успешно уменьшили счетчик, все ок, можно выходить из цикла
        if (prev === n) return;
        // Eсли prev !== n, значит мы не изменили счетчик, надо пробовать снова - идем в начало цикла
      }