В первой части статьи мы исследовали скорость различных вариантов обмена информацией между JavaScript и WASM-кодом. В этом продолжении - наконец-то займемся написанием прикладного кода нашего парсера.

Мы ведь теперь пишем "прямо на ассемблере" - значит, все будет супербыстро! Правда ведь?

Портируем алгоритм наивно

Начнем с самых простых действий - опишем, где-какие данные будут лежать в разделяемой памяти.

JavaScript

  • первые 12 * 8 байт [0x0000..0x005F] используем для возврата 64bit-значений каждого из 12 потенциально возможных ключей

  • данные в эти поля будет записывать импортируемая parseBuffersWASM

  • в последней 32bit-ячейке первой страницы памяти [0xFFFC..0xFFFF] будем сохранять текущее положение "курсора" парсера

  • [0x10000..] содержит разбираемый контент

// ...
  const data = readFileSync('buffers.txt');
  memory.grow((data.byteLength >> 16) + 1);
  // формируем проекции
  const prj32 = new Uint32Array(memory.buffer);
  const prj8 = new Uint8Array(memory.buffer);

  // единственный раз передадим исходное смещение через переменную
  const off = new WebAssembly.Global({value : 'i32', mutable : true}, 1 << 16);

  // передаем содержимое файла, начиная с нужного смещения
  data.copy(prj8, off.value);
  // дописываем отсутствующий '\n' у последней строки
  prj8[off.value + data.length] = 0x0A;

  const importObject = {
    js : {
      memory
    , off
    }
  };

  const instance = await WebAssembly.instantiate(module, importObject);
  const parseBuffersWASM = instance.exports.parseBuffers;

  // набор наших ключей
  const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
  const lnBK = buffersKeys.length;

  const parseBuffers = () => {
    // зануляем все значения перед каждым вызовом
    prj32.fill(0x0, 0, lnBK << 1); // 32bit-ячеек в 2 раза больше, чем 64bit-значений
    parseBuffersWASM();
    let rv = {};
    // заполняем ненулевые значения ключей
    for (let i = 0, off = 0; i < lnBK; i++) {
      let val = prj32[off++] + prj32[off++] * 0x100000000;
      if (val) {
        rv[buffersKeys[i]] = val;
      }
    }
    return rv;
  };
// ...
  // текущее смещение "курсора" вычитываем из памяти 0xFFFC
  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }
// ...

WASM

Переносим наш JS-код конечного автомата из начала первой части "как есть":

  • все переменные, с которыми работают несколько функций - глобальные

  • повторяющиеся конструкции типа off += off_v вынесены в несколько вспомогательных функций

WASM-HSM
(module
  (import "js" "memory" (memory 1))

  ;; текущее смещение "курсора"
  (global $off (import "js" "off") (mut i32))

  ;; текущее начитываемое число
  (global $val (mut i64)
    (i64.const 0)
  )

  ;; состояние HSM
  (global $state (mut i32)
    (i32.const 0)
  )
  ;; позиция атрибута в массиве
  (global $key (mut i32)
    (i32.const 0)
  )

  ;; обновляем значение в сегменте результатов
  (func $setData
    ;; buf[key << 3] = val
    (i64.store
      (i32.shl
        (global.get $key)
        (i32.const 3)
      )
      (global.get $val)
    )
  )

  ;; $off += $off_v
  (func $step (param $off_v i32)
    (global.set $off
      (i32.add
        (global.get $off)
        (local.get $off_v)
      )
    )
  )

  ;; разбор числа - посимвольно
  ;; ' ', ',' и '\n' - стоп-символы
  (func $parseNumber
    (local $ch i32)

    (global.set $val
      (i64.const 0)
    )

    (block
      (loop
        (local.set $ch
          (i32.load8_u
            (global.get $off)
          )
        )

        ;; ch == ' '
        (if
          (i32.eq
            (local.get $ch)
            (i32.const 0x20)
          )
          (then
            ;; off++
            (call $step (i32.const 1))
            (return)
          )
        )
        ;; ch == ','
        (if
          (i32.eq
            (local.get $ch)
            (i32.const 0x2C)
          )
          (then
            ;; off += 2
            (call $step (i32.const 2))
            (return)
          )
        )
        ;; ch == '\n'
        (if
          (i32.eq
            (local.get $ch)
            (i32.const 0x0A)
          )
          (then
            ;; off++
            (call $step (i32.const 1))
            (return)
          )
        )

        ;; val = val * 10 + (ch - 0x30)
        (global.set $val
          (i64.add
            (i64.mul
              (global.get $val)
              (i64.const 10)
            )
            (i64.extend_i32_u
              (i32.sub
                (local.get $ch)
                (i32.const 0x30) ;; '0'
              )
            )
          )
        )

        ;; off++
        (call $step (i32.const 1))

        br 0 ;; do loop
      )
    )
  )

  ;; [state, off] = [state_v, off + off_v]
  (func $iterate0 (param $state_v i32) (param $off_v i32)
    ;; state = state_v
    (global.set $state
      (local.get $state_v)
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  ;; [key, off] = [state + state_v, off + off_v]
  (func $iterate1 (param $state_v i32) (param $off_v i32)
    ;; key = state + state_v
    (global.set $key
      (i32.add
        (global.get $state)
        (local.get $state_v)
      )
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  ;; аналог String.charCodeAt
  (func $charAtEq (param $char i32) (result i32)
    (i32.eq
      (i32.load8_u
        (global.get $off)
      ) ;; AL = line[$off] as unsigned
      (local.get $char)
    )
  )

  (func (export "parseBuffers")
    ;; off += 'Buffers: '.length
    (global.set $off
      (i32.add
        (global.get $off)
        (i32.const 9)
      )
    )
    ;; for (...)
    (block
      (loop
        ;; case 's' // shared
        (if
          (call $charAtEq (i32.const 0x73))
          (then
            ;; $state = 0
            ;; $off += 7
            (call $iterate0
              (i32.const 0)
              (i32.const 7)
            )
            br 1 ;; уходим на начало текущего цикла
          )
        )
        ;; case 'l' // local
        (if
          (call $charAtEq (i32.const 0x6c))
          (then
            ;; $state = 4
            ;; $off += 6
            (call $iterate0
              (i32.const 4)
              (i32.const 6)
            )
            br 1
          )
        )
        ;; case 't' // temp
        (if
          (call $charAtEq (i32.const 0x74))
          (then
            ;; $state = 8
            ;; $off += 5
            (call $iterate0
              (i32.const 8)
              (i32.const 5)
            )
            br 1
          )
        )

        ;; case 'h' // hit
        (if
          (call $charAtEq (i32.const 0x68))
          (then
            ;; key = state + 0;
            ;; $off += 4
            (call $iterate1
              (i32.const 0)
              (i32.const 4)
            )
            ;;
            call $parseNumber
            call $setData
            br 1
          )
        )
        ;; case 'r' // read
        (if
          (call $charAtEq (i32.const 0x72))
          (then
            ;; key = state + 1;
            ;; $off += 5
            (call $iterate1
              (i32.const 1)
              (i32.const 5)
            )
            ;;
            call $parseNumber
            call $setData
            br 1
          )
        )
        ;; case 'd' // dirtied
        (if
          (call $charAtEq (i32.const 0x64))
          (then
            ;; key = state + 2;
            ;; $off += 8
            (call $iterate1
              (i32.const 2)
              (i32.const 8)
            )
            ;;
            call $parseNumber
            call $setData
            br 1
          )
        )
        ;; case 'w' // written
        (if
          (call $charAtEq (i32.const 0x77))
          (then
            ;; key = state + 3;
            ;; $off += 8
            (call $iterate1
              (i32.const 3)
              (i32.const 8)
            )
            ;;
            call $parseNumber
            call $setData
            br 1
          )
        )

        br 1
      )
    )

    ;; сохраняем текущее смещение
    (i32.store
      (i32.const 0xFFFC)
      (global.get $off)
    )
  )
)

Запускаем…

76ms - ой, и это вместо 48ms исходного JS… Давайте разбираться.

Прокачиваем производительность

Маска возвращаемых данных

А ведь мы каждый раз все 96 байт, куда записываются возвращаемые результаты, затираем нулями - причем неважно, одно, два или 10 значений мы реально записывали. Да еще и читаем их все!

А ведь если вернуть информацию о том, какие именно ключи были записаны, то и от зануления можно отказаться, и от чтения лишнего. Сделать это весьма просто - достаточно вернуть из WASM-кода значение с битовой маской записанных ключей:

  const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];

  const parseBuffers = () => {
    let mask = parseBuffersWASM();
    let rv = {};
    // читаем только ячейки, соответствующие установленным битам маски 
    for (let i = 0; mask; mask >>= 1, i++) {
      if (mask & 1) {
        let off = i << 1;
        rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * 0x100000000;
      }
    }
    return rv;
  };
;; ...
  (global $mask (mut i32)
    (i32.const 0)
  )
;; ...
  ;; обновляем маску атрибутов и записываемое значение
  (func $setData
    ;; ...
    ;; mask |= 1 << key
    (global.set $mask
      (i32.or
        (global.get $mask)
        (i32.shl
          (i32.const 1)
          (global.get $key)
        )
      )
    )
  )
;; ...
  ;; теперь наша функция возвращает значение маски
  (func (export "parseBuffers") (result i32)
    ;; mask = 0
    (global.set $mask
      (i32.const 0)
    )
    ;; ...

    ;; возвращаем маску
    global.get $mask
  )

Запускаем - уже 54ms, практически вплотную приблизились к исходному времени!

Используем статистические знания о данных

Если внимательно посмотреть на исходные данные, то станет очевидно, что большинство строк имеют либо вид 'shared hit=XXX', либо 'shared read=YYY', либо 'shared hit=XXX read=YYY', и существенно реже что-то другое - что вполне логично, ведь почти всегда узел плана или читает что-то из памяти, или с диска, или получает вверх по дереву накопленную сумму этих же показателей.

В нашем случае это означает значение маски 1, 2 или 3, обработку которых мы можем пустить по сокращенному пути:

  const m32to64 = 0x100000000;

  const parseBuffers = () => {
    let mask = parseBuffersWASM();
    switch (mask) {
      case 1:
        return {'shared-hit'  : prj32[0] + prj32[1] * m32to64};
      case 2:
        return {'shared-read' : prj32[2] + prj32[3] * m32to64};
      case 3:
        return {
          'shared-hit'  : prj32[0] + prj32[1] * m32to64
        , 'shared-read' : prj32[2] + prj32[3] * m32to64
        };
      default:
        let rv = {};
        for (let i = 0; mask; mask >>= 1, i++) {
          if (mask & 1) {
            let off = i << 1;
            rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
          }
        }
        return rv;
    }
  };

Результат - 44ms, ура! Мы уже опередили исходный вариант. А еще быстрее - можно?

Условная адресация функций

Заметим, что в нашем WASM-коде есть несколько весьма похожих кусков типа такого:

        ;; case 'w' // written
        (if
          (call $charAtEq (i32.const 0x77))
          (then
          ;; ...

Причем, если нам приходится проверять совпадение с 'w', то, значит, уже 6 неудачных сравнений до этого мы произвели - с вариантами 's', 'l', 't', 'h', 'r', 'd'.

Чтобы избежать необходимости подобного перебора, в WASM используется инструкция call_indirect, реализующая вызов функции в соответствии с заранее сформированной таблицей.

Собственно, для этого нам и придется преобразовать каждое такое условие в отдельную функцию, возвращающую 1. 0 будет возвращать функция-заглушка, означающая возникновение неопознанного символа. В нашем случае это как раз будет '\n', сигнализирующая окончание строки:

;; ...
  ;; таблица косвенной адресации функций
  (table 128 anyfunc)
  (elem (i32.const 0)
    $null ;; 00
    $null ;;  1
    ;; ...
    $step_d ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $step_h ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $step_l ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 70
    $null ;;  1
    $step_r ;;  2
    $step_s ;;  3
    $step_t ;;  4
    $null ;;  5
    $null ;;  6
    $step_w ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
  )

;; ...
  (func $step_w (result i32) ;; written
    ;; key = state + 3;
    ;; $off += 8
    (call $iterate1
      (i32.const 3)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )
;; ...
  (func (export "parseBuffers") (result i32)
    ;; ...
    ;; for (...)
    (block
      (loop
        ;; if (table[line[off]]()) continue;
        (br_if 0
          (call_indirect (result i32)
            (i32.load8_u
              (global.get $off)
            )
          )
        )
        ;; break, end loop
      )
    )
;; ...
Полный код таблицы и всех 7 функций
  ;; таблица косвенной адресации функций
  (table 128 anyfunc)
  (elem (i32.const 0)
    $null ;; 00
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 10
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 20
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 30
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 40
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 50
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 60
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $step_d ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $step_h ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $step_l ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 70
    $null ;;  1
    $step_r ;;  2
    $step_s ;;  3
    $step_t ;;  4
    $null ;;  5
    $null ;;  6
    $step_w ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
  )
;; ...
  (func $null (result i32) ;; заглушка
    i32.const 0
  )

  (func $step_s (result i32) ;; shared
    ;; $state = 0
    ;; $off += 7
    (call $iterate0
      (i32.const 0)
      (i32.const 7)
    )

    i32.const 1
  )

  (func $step_l (result i32) ;; local
    ;; $state = 4
    ;; $off += 6
    (call $iterate0
      (i32.const 4)
      (i32.const 6)
    )

    i32.const 1
  )

  (func $step_t (result i32) ;; temp
    ;; $state = 8
    ;; $off += 5
    (call $iterate0
      (i32.const 8)
      (i32.const 5)
    )

    i32.const 1
  )

  (func $step_h (result i32) ;; hit
    ;; key = state + 0;
    ;; $off += 4
    (call $iterate1
      (i32.const 0)
      (i32.const 4)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_r (result i32) ;; read
    ;; key = state + 1;
    ;; $off += 5
    (call $iterate1
      (i32.const 1)
      (i32.const 5)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_d (result i32) ;; dirtied
    ;; key = state + 2;
    ;; $off += 8
    (call $iterate1
      (i32.const 2)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_w (result i32) ;; written
    ;; key = state + 3;
    ;; $off += 8
    (call $iterate1
      (i32.const 3)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func (export "parseBuffers") (result i32)
    ;; mask = 0
    (global.set $mask
      (i32.const 0)
    )
    ;; off += 'Buffers: '.length
    (global.set $off
      (i32.add
        (global.get $off)
        (i32.const 9)
      )
    )
    ;; for (...)
    (block
      (loop
        ;; if (table[line[off]]()) continue;
        (br_if 0
          (call_indirect (result i32)
            (i32.load8_u
              (global.get $off)
            )
          )
        )
        ;; break, end loop
      )
    )

    ;; сохраняем текущее смещение
    (i32.store
      (i32.const 0xFFFC) ;; 16383 * 4
      (global.get $off)
    )
    ;; возвращаем маску
    global.get $mask
  )

Такой вариант вызова позволяет отыграть еще немного и добиться результата в 40ms - почти 20% выигрыша!

Объединяем условия

Выше у нас приведен код, в котором стоп-символ сравнивается в худшем случае трижды - давайте объединим эти условия в одно:

    (if
      (i64.eq ;; n == 8
        (local.get $n)
        (i64.const 8)
      )
      (then
        ;; off += 8
        (call $step
          (i32.const 8)
        )
      )
      (else
        ;; шагаем на n символов
        ;; ch == '\n' || ch == ' ' => +1
        ;; ch == ','               => +2
        (call $step
          (i32.add
            (i32.wrap_i64
              (local.get $n)
            )
            (i32.add
              (i32.const 1)
              (i64.eq
                (local.get $ch)
                (i64.const 0x2C)
              )
            )
          )
        )
        (return)
      )
    )

Поскольку CPU достаточно сильно не любит ветвления, этот способ позволяет отыграть еще 2мс.

Прокачиваем алгоритм (SWAR)

Вспомним, что мы считываем число "посимвольно", а все развитие процессорной отрасли последние пару десятилетий говорит о том, что обрабатывать надо как можно больше данных за единственную операцию - то есть SIMD-подход в чистом виде.

Воспользуемся техникой, описанной в статье про парсинг double - зачем перебирать "поцифирно", если можно обрабатывать сразу 8 последовательных символов, считав их в регистр.

Основных сложностей тут две:

  • надо четко понимать, где/чем число заканчивается - пробелом (0x20), запятой (0x2C) или концом строки (0x0A), поскольку от этого зависит, через сколько символов дальше надо "перешагнуть"

  • число может не умещаться в 8 байт

Узнаем длину числа

В нашем случае число '12345' с пробелом на конце будет считано в регистр как 0x..203534333231 .... - и тут можно легко заметить, что любой из стоп-символов не превосходит 0x2F, в то время как цифры - заведомо не меньше 0x30.

Поэтому определить количество цифр в числе можно достаточно легко:

  • все цифры при наложении маски & 0x10 дают ненулевой результат, а стоп-символы - нулевой

  • находим первый с конца ненулевой байт с помощью инструкции .ctz, определяющей количество нулевых бит "в хвосте" числа

  • незначимое для нас битовое смещение внутри байта (ctz = 44) усечется при делении на 8 с помощью битового сдвига

 #5        #4        #3        #2        #1        #0        - старший <- младший
0x20      0x35      0x34      0x33      0x32      0x31       = qword:i64
0010 0000 0011 0101 0011 0100 0011 0011 0011 0010 0011 0001
1101 1111 1100 1010 1100 0100 1100 1100 1100 1101 1100 1110  = not(qword) = qword ^ -1

0001 0000 0001 0000 0001 0000 0001 0000 0001 0000 0001 0000  = mask:i64

0001 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000  = not(qword) & mask
   ^                                                         = .ctz >> 3 = 5 byte
    ;; позиция "нецифрового" символа
    ;; n = ctz >> 3
    (local.set $n
      (i64.shr_u
        (i64.ctz
          (i64.and ;; digit mask == not(d) & 0x10
            (i64.xor ;; not(d) = d ^ -1
              (local.get $qword)
              (i64.const 0xFFFFFFFFFFFFFFFF)
            )
            (i64.const 0x1010101010101010) ;; digit mask
          )
        )
        (i64.const 3)
      )
    )

Определяем стоп-символ

Чтобы получить значение первого нецифрового символа, сдвинем наше число на n << 3 бит вправо - самый младший байт и будет искомым:

    ;; nb = n << 3 // num bits
    (local.set $nb
      (i64.shl
        (local.get $n)
        (i64.const 3)
      )
    )

    ;; ch = line[off + n]
    (local.set $ch
      (i32.wrap_i64
        (i64.and
          (i64.shr_u
            (local.get $qword)
            (local.get $nb)
          )
          (i64.const 0xFF) ;; младший байт
        )
      )
    )

Выравниваем число

Теперь, чтобы SWAR-хинт отработал как надо, необходимо обеспечить выравнивание числа в максимальную позицию:

    ;; top-align
    ;; qword <<= 64 - nb
    (local.set $qword
      (i64.shl
        (local.get $qword)
        (i64.sub
          (i64.const 64)
          (local.get $nb)
        )
      )
    )

SWAR-конвертация

    ;; SWAR
    ;; https://habr.com/ru/company/ruvds/blog/542640/

    ;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x0F0F0F0F0F0F0F0F)
          )
          (i64.const 2561)
        )
        (i64.const 8)
      )
    )
    ;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x00FF00FF00FF00FF)
          )
          (i64.const 6553601)
        )
        (i64.const 16)
      )
    )
    ;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x0000FFFF0000FFFF)
          )
          (i64.const 42949672960001)
        )
        (i64.const 32)
      )
    )

Решение для 64bit-чисел

В подавляющем большинстве ситуаций, числа будут умещаться в 7 знаков, а из 8-го мы извлечем стоп-символ. Но значения длиной 8 и более знаков тоже встречаются - для этого, вместо организации цикла, просто повторим SWAR для следующего 8-символьного блока.

Только надо не забыть, что второй блок может содержать 0 цифр, в отличие от первого.

В результате, мы получим в val число из первых 8 символов, а в qword - из следующего блока. Чтобы получить итоговый результат, надо val домножить на соответствующую количеству цифр во втором блоке количество цифр:

'123456789' = '12345678' | '9'
val = 12345678, qword = 9, n = 1
val = val * 10 ^ n + qword

Чтобы не вычислять коэффициенты домножения каждый раз, занесем их сразу в память по смещению 128 (32 ячейки по 4 байта):

  // 10^N для домножения первого 32bit-сегмента
  let pow10 = 1;
  for (let i = 0; i < 8; i++) {
    prj32[i + 32] = pow10;
    pow10 *= 10;
  }
    ;; val = val * memory[128 + (n << 2)] + qword
    (global.set $val
      (i64.add
        (i64.mul
          (global.get $val)
          (i64.extend_i32_u
            (i32.load
              (i32.add
                (i32.shl
                  (i32.wrap_i64
                    (local.get $n)
                  )
                  (i32.const 2)
                )
                (i32.const 128)
              )
            )
          )
        )
        (local.get $qword)
      )
    )
Полный код buffers-test.js
const { readFileSync } = require('fs');

const fn = 'buffers-test';

const run = async () => {
  const buffer = readFileSync(`${fn}.wasm`);
  const module = await WebAssembly.compile(buffer);

  const memory = new WebAssembly.Memory({initial : 1});

  const data = readFileSync('buffers.txt');
  memory.grow((data.byteLength >> 16) + 1);
  const prj32 = new Uint32Array(memory.buffer);
  const prj8 = new Uint8Array(memory.buffer);

  // единственный раз передадим исходное смещение через переменную
  const off = new WebAssembly.Global({value : 'i32', mutable : true}, 1 << 16);

  data.copy(prj8, off.value);
  // дописываем отсутствующий '\n' у последней строки
  prj8[off.value + data.length] = 0x0A;

  const importObject = {
    js : {
      memory
    , off
    }
  };

  // предзаписываем 10^N в виде i64-ячеек для домножения первого блока
  let pow10 = 1;
  for (let i = 0; i < 8; i++) {
    prj32[(i << 1) + 32] = pow10;
    pow10 *= 10;
  }

  const instance = await WebAssembly.instantiate(module, importObject);
  const parseBuffersWASM = instance.exports.parseBuffers;

  const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
  const m32to64 = 0x100000000;

  const parseBuffers = () => {
    let mask = parseBuffersWASM();
    switch (mask) {
      case 1:
        return {'shared-hit'  : prj32[0] + prj32[1] * m32to64};
      case 2:
        return {'shared-read' : prj32[2] + prj32[3] * m32to64};
      case 3:
        return {
          'shared-hit'  : prj32[0] + prj32[1] * m32to64
        , 'shared-read' : prj32[2] + prj32[3] * m32to64
        };
      default:
        let rv = {};
        for (let i = 0; mask; mask >>= 1, i++) {
          if (mask & 1) {
            let off = i << 1;
            rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
          }
        }
        return rv;
    }
  };

  const hrb = process.hrtime();
  // -- 8< --
  // в дальнейшем текущее смещение "курсора" вычитываем из памяти
  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }
  // -- 8< --
  const hre = process.hrtime(hrb);
  const usec = hre[0] * 1e+9 + hre[1];
  console.log(usec);
};

run();
Полный код buffers-test.wat
(module
  (import "js" "memory" (memory 1))

  ;; текущее смещение "курсора"
  (global $off (import "js" "off") (mut i32))

  ;; текущее возвращаемое значение
  (global $val (mut i64)
    (i64.const 0)
  )
  (global $mask (mut i32)
    (i32.const 0)
  )
  (global $state (mut i32)
    (i32.const 0)
  )
  (global $key (mut i32)
    (i32.const 0)
  )

  ;; таблица косвенной адресации функций
  (table 128 anyfunc)
  (elem (i32.const 0)
    $null ;; 00
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 10
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 20
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 30
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 40
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 50
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 60
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $step_d ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $step_h ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $step_l ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 70
    $null ;;  1
    $step_r ;;  2
    $step_s ;;  3
    $step_t ;;  4
    $null ;;  5
    $null ;;  6
    $step_w ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
  )

  ;; обновляем маску атрибутов и значение
  (func $setData
    ;; buf[key << 3] = val
    (i64.store
      (i32.shl
        (global.get $key)
        (i32.const 3)
      )
      (global.get $val)
    )
    ;; mask |= 1 << key
    (global.set $mask
      (i32.or
        (global.get $mask)
        (i32.shl
          (i32.const 1)
          (global.get $key)
        )
      )
    )
  )

  (func $step (param $off_v i32)
    ;; $off += $off_v
    (global.set $off
      (i32.add
        (global.get $off)
        (local.get $off_v)
      )
    )
  )

  ;; разбор числа - SWAR
  (func $parseNumber
    (local $qword i64)
    (local $n i64)
    (local $nb i64)
    (local $ch i64)

    (global.set $val
      (i64.const 0)
    )

    ;; qword = line[off..+7]
    (local.set $qword
      (i64.load align=4
        (global.get $off)
      )
    )

    ;; позиция "нецифрового" символа
    ;; n = num digits
    (local.set $n
      (i64.shr_u
        (i64.ctz
          (i64.and ;; digit mask == not(d) & 0x10
            (i64.xor
              (local.get $qword)
              (i64.const 0xFFFFFFFFFFFFFFFF)
            )
            (i64.const 0x1010101010101010) ;; digit mask
          )
        )
        (i64.const 3)
      )
    )

    ;; nb = n << 3 // num bits
    (local.set $nb
      (i64.shl
        (local.get $n)
        (i64.const 3)
      )
    )

    ;; ch = line[off + n]
    (local.set $ch
      (i64.and
        (i64.shr_u
          (local.get $qword)
          (local.get $nb)
        )
        (i64.const 0xFF)
      )
    )

    ;; top-align
    ;; qword <<= 64 - nb
    (local.set $qword
      (i64.shl
        (local.get $qword)
        (i64.sub
          (i64.const 64)
          (local.get $nb)
        )
      )
    )

    ;; SWAR
    ;; https://habr.com/ru/company/ruvds/blog/542640/

    ;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x0F0F0F0F0F0F0F0F)
          )
          (i64.const 2561)
        )
        (i64.const 8)
      )
    )
    ;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x00FF00FF00FF00FF)
          )
          (i64.const 6553601)
        )
        (i64.const 16)
      )
    )
    ;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
    (local.set $qword
      (i64.shr_u
        (i64.mul
          (i64.and
            (local.get $qword)
            (i64.const 0x0000FFFF0000FFFF)
          )
          (i64.const 42949672960001)
        )
        (i64.const 32)
      )
    )

    (global.set $val
      (local.get $qword)
    )

    (if
      (i64.eq ;; n == 8
        (local.get $n)
        (i64.const 8)
      )
      (then
        ;; off += 8
        (call $step
          (i32.const 8)
        )
      )
      (else
        ;; шагаем на n символов
        ;; ch == '\n' || ch == ' ' => +1
        ;; ch == ','               => +2
        (call $step
          (i32.add
            (i32.wrap_i64
              (local.get $n)
            )
            (i32.add
              (i32.const 1)
              (i64.eq
                (local.get $ch)
                (i64.const 0x2C)
              )
            )
          )
        )
        (return)
      )
    )

    ;; qword = line[off..+7]
    (local.set $qword
      (i64.load
        (global.get $off)
      )
    )

    ;; n = (ctz - 4) / 8 == ctz >> 3 // num digits
    (local.set $n
      (i64.shr_u
        (i64.ctz
          (i64.and
            (i64.xor ;; digit mask == not(d) & 0x10
              (local.get $qword)
              (i64.const 0xFFFFFFFFFFFFFFFF)
            )
            (i64.const 0x1010101010101010) ;; digit mask
          )
        )
        (i64.const 3)
      )
    )

    ;; nb = n << 3 // num bits
    (local.set $nb
      (i64.shl
        (local.get $n)
        (i64.const 3)
      )
    )

    ;; ch = line[off + n]
    (local.set $ch
      (i64.and
        (i64.shr_u
          (local.get $qword)
          (local.get $nb)
        )
        (i64.const 0xFF)
      )
    )

    ;; if (n) ...
    (if
      (i32.wrap_i64
        (local.get $n)
      )
      (then
        ;; top-align
        ;; qword <<= 64 - nb
        (local.set $qword
          (i64.shl
            (local.get $qword)
            (i64.sub
              (i64.const 64)
              (local.get $nb)
            )
          )
        )

        ;; SWAR
        ;; https://habr.com/ru/company/ruvds/blog/542640/

        ;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
        (local.set $qword
          (i64.shr_u
            (i64.mul
              (i64.and
                (local.get $qword)
                (i64.const 0x0F0F0F0F0F0F0F0F)
              )
              (i64.const 2561)
            )
            (i64.const 8)
          )
        )
        ;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
        (local.set $qword
          (i64.shr_u
            (i64.mul
              (i64.and
                (local.get $qword)
                (i64.const 0x00FF00FF00FF00FF)
              )
              (i64.const 6553601)
            )
            (i64.const 16)
          )
        )
        ;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
        (local.set $qword
          (i64.shr_u
            (i64.mul
              (i64.and
                (local.get $qword)
                (i64.const 0x0000FFFF0000FFFF)
              )
              (i64.const 42949672960001)
            )
            (i64.const 32)
          )
        )

        ;; val = val * 10 ^ memory[128 + (n << 3)]:i32 + qword
        (global.set $val
          (i64.add
            (i64.mul
              (global.get $val)
              (i64.load align=8
                (i32.wrap_i64
                  (i64.add
                    (i64.shl
                      (local.get $n)
                      (i64.const 3)
                    )
                    (i64.const 128)
                  )
                )
              )
            )
            (local.get $qword)
          )
        )
      )
    )

    ;; шагаем на n символов
    ;; ch == '\n' || ch == ' ' => +1
    ;; ch == ','               => +2
    (call $step
      (i32.add
        (i32.wrap_i64
          (local.get $n)
        )
        (i32.add
          (i32.const 1)
          (i64.eq
            (local.get $ch)
            (i64.const 0x2C)
          )
        )
      )
    )
  )

  ;; [state, off] = [state_v, off + off_v]
  (func $iterate0 (param $state_v i32) (param $off_v i32)
    ;; state = state_v
    (global.set $state
      (local.get $state_v)
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  ;; [key, off] = [state + state_v, off + off_v]
  (func $iterate1 (param $state_v i32) (param $off_v i32)
    ;; key = state + state_v
    (global.set $key
      (i32.add
        (global.get $state)
        (local.get $state_v)
      )
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  (func $null (result i32)
    i32.const 0
  )

  (func $step_s (result i32) ;; shared
    ;; $state = 0
    ;; $off += 7
    (call $iterate0
      (i32.const 0)
      (i32.const 7)
    )

    i32.const 1
  )

  (func $step_l (result i32) ;; local
    ;; $state = 4
    ;; $off += 6
    (call $iterate0
      (i32.const 4)
      (i32.const 6)
    )

    i32.const 1
  )

  (func $step_t (result i32) ;; temp
    ;; $state = 8
    ;; $off += 5
    (call $iterate0
      (i32.const 8)
      (i32.const 5)
    )

    i32.const 1
  )

  (func $step_h (result i32) ;; hit
    ;; key = state + 0;
    ;; $off += 4
    (call $iterate1
      (i32.const 0)
      (i32.const 4)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_r (result i32) ;; read
    ;; key = state + 1;
    ;; $off += 5
    (call $iterate1
      (i32.const 1)
      (i32.const 5)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_d (result i32) ;; dirtied
    ;; key = state + 2;
    ;; $off += 8
    (call $iterate1
      (i32.const 2)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_w (result i32) ;; written
    ;; key = state + 3;
    ;; $off += 8
    (call $iterate1
      (i32.const 3)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func (export "parseBuffers") (result i32)
    ;; mask = 0
    (global.set $mask
      (i32.const 0)
    )
    ;; off += 'Buffers: '.length
    (global.set $off
      (i32.add
        (global.get $off)
        (i32.const 9)
      )
    )
    ;; for (...)
    (block
      (loop
        ;; if (table[line[off]]()) continue;
        (br_if 0
          (call_indirect (result i32)
            (i32.load8_u
              (global.get $off)
            )
          )
        )
        ;; break, end loop
      )
    )

    ;; сохраняем текущее смещение
    (i32.store
      (i32.const 0xFFFC) ;; 16383 * 4
      (global.get $off)
    )
    ;; возвращаем маску
    global.get $mask
  )
)

Проверяем - 32ms!

Влияние оптимизирующего компилятора

Поскольку наш код должен работать "на потоке", давайте посмотрим, какое время продемонстрирует он при повторном прогоне:

  // первый прогон
  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }

  // сбрасываем смещения в начало файла
  off.value = 1 << 16;
  prj32[16383] = off.value;

  const hrb = process.hrtime();
  // -- 8< --
  // замеряемый второй прогон
  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }
  // -- 8< --

Хм… почему-то стало 26ms. Приятно, что меньше, но - почему? Для ответа на этот вопрос посмотрим генерируемый байткод:

node --print-wasm-code buffers-test.js >_bytecode

В теле сформировавшегося файла поищем уникальную для нашего WASM-кода сигнатуру - например, "маску цифр" 1010101010101010.

Что интересно, она найдется дважды:

--- WebAssembly code ---
index: 2
kind: wasm function
compiler: Liftoff
...
75  REX.W movq rdx,[rax+rcx*1]      ; rdx = qword

79  REX.W movq rax,rdx              ; rax = rdx
7c  REX.W xorq rax,0xff             ; rax = rax ^ 0xFF
80  REX.W movq rcx,1010101010101010 ; rcx = digit mask
8a  REX.W andq rax,rcx              ; rax = rax & rcx
8d  REX.W bsfq rax,rax              ; rax = ctz(rax)
--- WebAssembly code ---
index: 2
kind: wasm function
compiler: TurboFan
...
42  REX.W movq rbx,[rbx+r9*1]      ; rbx = qword

46  REX.W movq rdx,rbx             ; rdx = rbx
49  REX.W notq rdx                 ; rdx = not(rdx)    <--
4c  REX.W movq r9,1010101010101010 ; r9 = digit mask
56  REX.W andq rdx,r9              ; rdx = rdx & r9
59  REX.W bsfq rdx,rdx             ; rdx = ctz(rdx)

Мы видим два примера ассемблерного кода, полученного из одного и того же WASM-кода. Например, среди WASM-инструкций банально отсутствует i32.not(v), которую вполне официально предлагается реализовывать через i32.xor(v, -1).

Собственно, именно это мы и видим в нашем участке кода. Компилятор Liftoff, хоть и считается более продвинутым, превратил такой хинт "по написанному" в xorq rax,0xff, а TurboFan использовал более эффективный вариант notq rdx.

А это только одна из множества мелких деталей, поэтому хорошая производительность результирующего ассемблерного кода в V8 - результат не только правильного алгоритма, но и все-таки определенной доли везения.

В следующей части посмотрим, как можно использовать вместо SWAR-хинта нативные SIMD-инструкции в WebAssembly, и сильно ли это поможет конкретно нам.


Материалы: