В этой статье мы разберем особенности мониторинга многопоточного Node.JS приложения на примере нашего коллектора для сервиса мониторинга и анализа логов серверов PostgreSQL.



Для масштабирования коллектора мы используем многопроцессный подход, с одним управляющим и несколькими рабочими процессами, межпроцессное взаимодействие происходит только между рабочим и управляющим процессом.

Рабочие процессы выполняют одинаковые задачи — сбор, обработка и запись логов с серверов PostgreSQL. При этом сбор и запись — это по сути IO-задачи, в которых nodejs очень хороша. А вот обработка и парсинг планов запросов — это довольно CPU-емкая задача, блокирующая event-loop. Поэтому такие задачи лучше выносить в отдельный воркер или пул воркеров, передавая им данные на обработку посредством обмена IPC-сообщениями.


Раньше, для задачи обработки и парсинга планов запросов мы использовали именно такой подход. Но у него есть недостаток — большие объемы передаваемых данных по IPC могут привести к значительному увеличению затрат на сериализацию в JSON и обратно.

Например при передаче по IPC буфера, в которой содержится строка 'test' происходит передача строки:

'{"type":"Buffer","data":[116,101,115,116]}'

При большом количестве передаваемых данных накладные расходы могут стать такими:


Решением для нас стало использование worker_threads, появившихся в Node.JS 10.5.0, работающих в рамках одного процесса и позволяющих использовать новые методы коммуникации между потоками.

Архитектура изменилась:


А вместе с ней и подходы к мониторингу. Например, использование CPU внутри worker_thread традиционным способом измерить не получится.

Т.е. раньше, для каждого процесса-воркера, мы измеряли CPU-usage с помощью process.cpuUsage() и process.hrtime() примерно так:


let startCpuUsage = process.cpuUsage();
let startTime = process.hrtime();
let n = 1000;
while (n--) Math.sin(n);
let {user, system} = process.cpuUsage(startCpuUsage); // время в микросекундах
let time = process.hrtime(startTime); // наносекунды
let cpuUsage = 100 * 1000 * (user + system) / (time[0] * 1e9 + time[1]);

Но для процесса с worker_threads вызов process.cpuUsage() выдает процессорное время для всего процесса в целом, суммируя все его потоки. И такой же результат мы получим, если вызовем process.cpuUsage() изнутри worker_thread.

Почему так происходит?

Дело в то что process.cpuUsage() использует вызов uv_getrusage, а тот в ОС Linux выполняет системный вызов getrusage с параметром RUSAGE_SELF, т.е. возвращает статистику для вызывающего процесса как сумму по всем его потокам, при этом не важно из какого потока мы делаем этот вызов — во всех потоках он будет возвращать одинаковые значения.

Как же получить CPU-usage для worker_threads и почему в Node.JS нет встроенных методов для профилирования CPU worker_threrads?

Здесь есть ответ разработчика worker_threads.

Предложено два варианта — либо с помощью системного вызова gettid() получить tid для worker_thread и просматривать данные в /proc/${tid}, либо использовать getrusage() с параметром RUSAGE_THREAD, позволяющим получать статистику только для вызывающего потока.

Кстати, таким же образом можно получать метрики использования CPU и для основного потока процесса, без учета всех дополнительных потоков и worker_threads.

Итак, разобравшись с этим вопросом, мы стали искать готовые модули для мониторинга worker_threads, и не нашли… Поэтому сделали свой , заодно добавив в него выдачу всех остальных метрик для мониторинга Node.JS приложения. Серверные метрики мы уже получаем с помощью своей системы сбора метрик .

Мониторинг CPU


Для анализа использования CPU мы берем метрики от воркеров и worker_threads, а также метрики общей загруженности CPU и в разрезе ядер:

  • Для воркеров в целом:


  • Для основных потоков воркеров:


  • Для worker_threads (по первым из пула, но полезным будет и суммарный в разрезе воркера):


  • Общая загрузка CPU на сервере:


С метриками CPU разобрались, а что насчет профайлинга worker_threads?

Проверим, запустив этот небольшой тест с параметром node --prof

Код теста

const { Worker, isMainThread} = require('worker_threads');
const crypto = require('crypto');

function mainTest() {
  let cnt = 10000;
  while (cnt--) crypto.randomBytes(cnt);
}

function threadTest() {
  let cnt = 10000;
  while (cnt--) crypto.randomBytes(cnt);
}

if (isMainThread) {
  let worker = new Worker(__filename);
  setInterval(mainTest, 1000);
} else {
  setInterval(threadTest, 1000);
}


В результате получили два isolate-* файла, для основного потока и для worker_thread.
Далее, с помощью node --prof-process <isolate_file> можем посмотреть нужный профайл.
Кстати, с опцией --no-logfile-per-isolate вместо нескольких isolate* файлов будет один — v8.log с суммарным результатом по всем потокам, включая основной.

И еще — используя опцию node --inspect или послав сигнал SIGUSR1 работающему процессу с целью снять CPU профайл в Chrome DevTools, мы увидим данные только по основному потоку.

Использование памяти


Также как и для CPU, снимая профайл в Chrome DevTools мы получим Heap snapshot только основного потока.

К счастью, с версии node 12.17.0 появилась возможность получить heap snapshot прямо из кода worker_threads с помощью вызова worker.getHeapSnapshot(), а с версии 11.13.0 также для основного потока вызовом v8.getHeapSnapshot().

Попробуем

const { Worker, isMainThread } = require('worker_threads');
const v8 = require('v8');
const fs = require('fs');

if (isMainThread) {
  let worker = new Worker(__filename);
  let mainArray = [];
  function mainTest() {
    let cnt = 100;
    while (cnt--) mainArray.push(`main-msg-${cnt}`);
  }
  process.on('SIGUSR2', () => {
    v8.getHeapSnapshot().pipe(fs.createWriteStream(`process_${process.pid}.heapsnapshot`));
    worker.getHeapSnapshot().then((heapsnapshot) => {
      heapsnapshot.pipe(fs.createWriteStream(`process_${process.pid}_wt_${worker.threadId}.heapsnapshot`));
    })
  });
  setInterval(mainTest, 1000);
} else {
  let threadArray = [];
  function threadTest() {
    let cnt = 100;
    while (cnt--) threadArray.push(`thread-msg-${cnt}`);
  }
  setInterval(threadTest, 1000);
}


Послав сигнал SIGUSR2 процессу, мы получим два heapsnapshot, которые затем можно проанализировать в Chrome DevTools:

  • Основной процесс:


  • worker_thread:



Какие метрики памяти интересны для анализа?

Мы используем те, что выдает process.memoryUsage() — rss, heapTotal, heapUsed, external.

И также v8.getHeapSpaceStatistics(), с его помощью можно получить данные по сегментам Heap — new_space, old_space, code_space, large_object_space.

rss всегда выдается для всего процесса, а остальные метрики — в рамках вызывающего контекста.

  • Суммарный по воркерам:


  • По воркеру:


  • По worker_threads:



Сборка мусора


Т.к. в каждом worker_thread запускается свой инстанс Node.JS с v8/libuv, то и GC у каждого тоже свой и мониторить их надо по отдельности.

Для мониторинга GC нам нужно получать данные об общей продолжительности сборки мусора, а также количество запусков и время выполнения каждого цикла.

Уже довольно давно, с версии 8.5.0, в Node.JS появился механизм PerformanceObserver, позволяющий кроме всего прочего получить всю необходимую информацию по циклам GC.

Например так

const { PerformanceObserver, constants } = require('perf_hooks');
let stats = {};
let gcObserver = new PerformanceObserver((list) => {
  list
    .getEntries()
    .map(({kind, duration}) => {
      stats['gc.time'] += duration;
      switch (kind) {
        case constants.NODE_PERFORMANCE_GC_MINOR:
          stats['gc.Scavenge.count']++;
          stats['gc.Scavenge.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_MAJOR:
          stats['gc.MarkSweepCompact.count']++;
          stats['gc.MarkSweepCompact.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_INCREMENTAL:
          stats['gc.IncrementalMarking.count']++;
          stats['gc.IncrementalMarking.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_WEAKCB:
          stats['gc.ProcessWeakCallbacks.count']++;
          stats['gc.ProcessWeakCallbacks.time'] += duration;
          break;
      }
    })
});

function resetStats() {
  Object.assign(stats, {
    'gc.time': 0,
    'gc.Scavenge.count': 0,
    'gc.Scavenge.time': 0,
    'gc.MarkSweepCompact.count': 0,
    'gc.MarkSweepCompact.time': 0,
    'gc.IncrementalMarking.count': 0,
    'gc.IncrementalMarking.time': 0,
    'gc.ProcessWeakCallbacks.count': 0,
    'gc.ProcessWeakCallbacks.time': 0,
  });
}

resetStats();
gcObserver.observe({entryTypes: ['gc'], buffered: true});

function triggerScavenge() {
  let arr = [];
  for (let i = 0; i < 5000; i++) {
    arr.push({});
  }

  setTimeout(triggerScavenge, 50);
}

let ds = [];

function triggerMarkCompact() {
  for (let i = 0; i < 10000; i++) {
    ds.push(new ArrayBuffer(1024));
  }

  if (ds.length > 40000) {
    ds = [];
  }

  setTimeout(triggerMarkCompact, 200);
}

triggerScavenge();
triggerMarkCompact();

setInterval(() => {
  console.log(stats);
  resetStats();
}, 5000);

Результат:

{
  'gc.time': 158.716144,
  'gc.Scavenge.count': 11,
  'gc.Scavenge.time': 135.690545,
  'gc.MarkSweepCompact.count': 2,
  'gc.MarkSweepCompact.time': 22.96941,
  'gc.IncrementalMarking.count': 2,
  'gc.IncrementalMarking.time': 0.056189,
  'gc.ProcessWeakCallbacks.count': 0,
  'gc.ProcessWeakCallbacks.time': 0
}


Этот метод работает как в основном потоке так и в worker_threads, для нашего коллектора мы получаем графики с метриками за секунду:

  • По воркерам


  • По worker_threads


  • Общее время GC в разрезе воркеров


  • Общее время GC в разрезе worker_threads



Event-loop latency


Для мониторинга задержек event-loop удобно использовать появившийся в версии 11.10.0 monitorEventLoopDelay — тут можно получить не только среднее и предельные значения, но и различные перцентили.

Мы используем max, min, mean, и percentile(99):

  • Суммарный по всем воркерам


  • Суммарный по worker_threads


  • По воркеру


  • По worker_thread


Мониторинг пула worker_threads


Системные показатели работы пула уже приведены выше, а здесь поговорим о метриках производительности многопоточного приложения.

При старте каждый воркер коллектора запускает пул с одним worker_thread, который обрабатывает очередь поступающих планов запросов.

Дополнительные worker_thread запускаются при увеличении размера очереди и при нахождении задач в очереди дольше определенного времени. Также они автоматически завершаются после периода неактивности.

Код обработки очереди задач

  const SPAWN_LAG = 2000;
  this._queue = [];

  assignTask(msg) {
    if (this.mainExplainer.ready === true) {
      this.mainExplainer.ready = false;
      this.mainExplainer.sendMessage(msg);
    } else if (this._idleExplainers.length > 0) {
      let explainer = this._idleExplainers.pop();
      clearTimeout(explainer.timeoutIdle);
      explainer.sendMessage(msg);
    } else {
      this._checkAndStartNew(msg);
    }
  }

  _checkAndStartNew(msg) {
    let ts = Date.now();
    let q = this._queue;
    if (msg && process[hasFreeRAM]) q.push({msg, ts});
    if (this._canCreateExplainer && q.length > this._workersCnt() && q[0].ts + SPAWN_LAG < ts) {
      this._createExplainer();
    }
  }

  explainer.on('explainDone', (msg) => {
    explainer.pulse();
  });

  explainer.pulse = () => {
    if (this._queue.length > explainer.id) {
      explainer.sendMessage(this._queue.shift().msg);
    } else if (this._isMain(explainer)) {
      explainer.ready = true;
    } else {
      this._idleExplainers.push(explainer);
      explainer.unpool();
    }
  };



Важными метриками пула worker_thread являются размер очереди и количество работающих потоков:



Кроме этого мы мониторим скорость и производительность worker_thread и воркеров в целом:
  • Скорость обработки планов запросов:


  • Производительность воркеров по количеству задач:


  • Производительность воркеров по объему данных:


  • Производительность worker_thread по количеству задач:


  • Производительность worker_thread по объему данных:


Заключение


Мы рассмотрели особенности мониторинга многопоточного приложения Node.JS.

Для комплексного анализа работы приложения необходимо отслеживать массу показателей — метрики по серверу в целом, использование приложением системных ресурсов, метрики среды выполнения, а также различные показатели самого приложения. В общем всего, что включает в себя APM.