В предыдущей части мы научились эффективно передавать данные вспомогательным потокам из основного через разделяемую память, используя Atomics-операции и блокировки.

Но мы рассматривали все-таки идеальную ситуацию, когда основной поток больше ничем не занимался, кроме обмена с "подчиненными" уже заранее готовыми данными. В реальных же приложениях такое встречается достаточно редко - обычно эти самые данные приходится готовить непосредственно перед передачей. И, бывает, в этом участвует существенная доля синхронного кода, что для JavaScript крайне неприятно, но иногда неизбежно - например, при вычислении регулярных выражений.

Давайте оценим, насколько синхронные операции "роняют" производительность нашего тестового приложения. И узнаем, как можно в разы улучшить ее, "скрестив ужа с ежом", используя выделенный поток-координатор из позапрошлой части статьи совместно с разделяемой памятью.

Добавим немного синхронности

Возьмем тот же самый тест из предыдущей части про разделяемую память и вместо массива сохраним все "сообщения" во временный каталог:

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });

А уже в ходе теста будем их синхронно вычитывать с помощью readFileSync, намеренно "притормаживая" EventLoop в основном потоке приложения:

      // получаем список всех сообщений в папке
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);

      remain = fns.length;
      tsh = hrtime();
      tfs = 0n; // длительность синхронных операций

      fns.forEach((fn, id) => {
        const ts = hrtime();
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        tfs += hrtime() - ts;             // ... и его продолжительность
        pool.postMessage({id, data});
      });
Миримся с нестабильностью времени чтения с диска

При проведении подобного теста достаточно сильно может влиять аспект кэширования чтения.

То есть если нам "повезло", и данные конкретного файла ОС еще не успела вытеснить и держит в pagecache, то уменьшится время синхронного выполнения для все того же readSync того же файла в следующем прогоне, и поэтому может меньше повлиять на деградацию общего времени теста.

Тут мы замеряем как общее время отработки теста, так и отдельно длительность синхронных операций. Их разность даст нам, конечно, не само время обработки (пока основной поток синхронно читает, остальные-то работают), но некоторую оценку общей потери производительности.

Полный код и результаты тестов
const {
  Worker
, isMainThread
, parentPort
, workerData
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const THREAD_FREE = -1;

const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
  #queue;
  #workersPool;

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workersPool = [...workersPool];
  }

  #shareMessage(worker, {id, data}) {
    worker.data.set(data, 0);
    worker.lock[0] = id;
    Atomics.notify(worker.lock, 0, 1);

    const lock = Atomics.waitAsync(worker.lock, 0, id);
    if (lock.value === 'not-equal') {
      this.#onMessage(worker);
    }
    else {
      lock.value.then(result => {
        this.#onMessage(worker);
      });
    }
  }

  #onMessage(worker) {
    const msg = this.#queue.shift();
    if (msg) {
      this.#shareMessage(worker, msg);
    }
    else {
      this.#workersPool.push(worker);
    }
  }

  postMessage(msg) {
    const worker = this.#workersPool.pop();
    if (worker) {
      this.#shareMessage(worker, msg);
    }
    else {
      this.#queue.push(msg);
    }
  }
}

if (isMainThread) {
  const taskSize = 1 << 16;
  
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;
  let tfs;

  process
    .on('test:start', () => {
      hashes.fill();

      const Pow2Buffer = require('./Pow2Buffer');
      pool = new WorkersPool({
        queue : new Pow2Buffer(8, 16)
      , workersPool : workers.slice(0, active)
      });

      // получаем список всех сообщений в папке
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);

      remain = fns.length;
      tsh = hrtime();
      tfs = 0n; // длительность синхронных операций

      fns.forEach((fn, id) => {
        const ts = hrtime();
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        tfs += hrtime() - ts;             // ... и его продолжительность
        pool.postMessage({id, data});
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| fs.read'
      , (Number(tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| total'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms'
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // выделяем сегменты разделяемой памяти
      const sharedBufferData = new SharedArrayBuffer(taskSize);
      const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
      // инициализируем "пустое" состояние блокировки
      const lock = new Int32Array(sharedBufferLock);
      lock.fill(THREAD_FREE);
      // передаем в поток ссылки на сегменты разделяемой памяти
      const worker = new Worker(__filename,
        {
          workerData : {
            data : sharedBufferData
          , lock : sharedBufferLock
          }
        }
      );
      worker.data = new Uint8Array(sharedBufferData);
      worker.lock = lock;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
    .then(result => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  const {data, lock} = workerData;
  const sharedData = new Uint8Array(data);
  const sharedLock = new Int32Array(lock);

  do {
    const lock = Atomics.wait(sharedLock, 0, THREAD_FREE);
    parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')});
    sharedLock[0] = THREAD_FREE;
    Atomics.notify(sharedLock, 0, 1);
  }
  while (true);
}
generated: 1959 ms
hashed  1: 1237 ms | fs.read 4152 ms | total 5390 ms
hashed  2:  616 ms | fs.read  555 ms | total 1171 ms
hashed  3:  411 ms | fs.read  667 ms | total 1079 ms
hashed  4:  344 ms | fs.read  801 ms | total 1145 ms
hashed  5:  326 ms | fs.read  560 ms | total  887 ms
hashed  6:  307 ms | fs.read  619 ms | total  926 ms
hashed  7:  336 ms | fs.read  574 ms | total  911 ms
hashed  8:  315 ms | fs.read  572 ms | total  887 ms
hashed  9:  315 ms | fs.read  908 ms | total 1224 ms
hashed 10:  305 ms | fs.read  904 ms | total 1209 ms
hashed 11:  343 ms | fs.read  580 ms | total  924 ms
hashed 12:  300 ms | fs.read  598 ms | total  899 ms
hashed 13:  326 ms | fs.read  597 ms | total  924 ms
hashed 14:  308 ms | fs.read  584 ms | total  893 ms
hashed 15:  315 ms | fs.read  580 ms | total  895 ms
hashed 16:  310 ms | fs.read  579 ms | total  890 ms

Синхронный код основного потока снижает общую производительность
Синхронный код основного потока снижает общую производительность

В точке максимальной эффективности "4 потока на 4 ядрах", которую мы вычислили в прошлый раз, условно "чистое" время ухудшилось на 15% - а ведь мы всего лишь на некоторое время заняли основной поток.

К сожалению, отреагировать на освобождение вспомогательного потока и дать ему следующую задачу основной может не раньше, чем завершит текущую синхронную операцию:

Ожидание вспомогательного потока из-за синхронного кода в основном
Ожидание вспомогательного потока из-за синхронного кода в основном

Поток-координатор

Но давайте тогда процесс "раздачи заданий" вытащим в отдельный поток, описанный во второй части нашего цикла - пусть в нем будет минимум синхронного кода и его реакции ничем не затормаживаются:

Из основного потока передавать информацию в координатор мы будем "по ссылке", без копирования, с помощью transferList:

            fns.forEach((fn, id) => {
              const ts = hrtime();
              const data = fs.readFileSync(fn); // тяжелый синхронный код
              tfs += hrtime() - ts;             // ... и его продолжительность
              coordinator.postMessage(
                {id, data}
              , [data.buffer] // передача по ссылке через transferList
              );
            });

... а уже между координатором и вспомогательными потоками - копировать через разделяемую память, которую передадим при увязке дочерних потоков:

// ...
        Promise.all(
          workers.slice(0, active)
            .flatMap(worker => {
              const shared = { // сегменты разделяемой памяти вспомогательного процесса
                data : worker.data
              , lock : worker.lock
              };
              return [{worker}, {worker : coordinator, shared}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, shared} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(shared); // shared попадет только в координатор
            }))
        )
// ...

Но если мы оставим в потоке бесконечный синхронный цикл ожидания блокировки, у нас пройдет только первый тест, и все встанет - поскольку "слушать" информацию от нового экземпляра координатора будет уже некому.

Поэтому переведем основной рабочий код тоже на Atomics.waitAsync, заодно сможем вернуть и оценку загрузки EventLoop:

// ...
    case 'worker':
      const {data, lock} = workerData;
      const sharedData = new Uint8Array(data);
      const sharedLock = new Int32Array(lock);

      const processMessage = () => {
        // обрабатываем сообщение
        parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
        // уведомляем координатор о своей доступности
        sharedLock[0] = THREAD_FREE;
        Atomics.notify(sharedLock, 0, 1);
        // ... и возвращаемся к ожиданию блокировки
        wait();
      };

      const wait = () => {
        const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          processMessage();
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            processMessage();
          });
        }
      };
      // сигнализируем готовность и начинаем ждать
      signalPort.on('message', () => {
        signalPort.postMessage(undefined);
        wait();
      });
      break;
// ...
Полный код и результаты тестов
const {
  Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const THREAD_FREE = -1;

if (isMainThread) {
  const taskSize = 1 << 16;
  
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;
  let tfs;

  let coordinator;
  process
    .on('test:start', () => {
      hashes.fill();

      const channel = new MessageChannel();
      coordinator = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'coordinator'
          }
        , transferList : [channel.port1]
        }
      );
      coordinator.signalPort = channel.port2;
      coordinator.signalPort.setMaxListeners(0);

      coordinator.on('online', () => {
        Promise.all(
          workers.slice(0, active)
            .flatMap(worker => {
              const shared = { // сегменты разделяемой памяти вспомогательного процесса
                data : worker.data
              , lock : worker.lock
              };
              return [{worker}, {worker : coordinator, shared}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, shared} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(shared); // shared попадет только в координатор
            }))
        )
          .then(result => {
            // получаем список всех сообщений
            const fns = fs.readdirSync(dir)
              .sort()
              .map(fn => dir + sep + fn);
            remain = fns.length;

            [coordinator, ...workers].forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
            tsh = hrtime();
            tfs = 0n;

            fns.forEach((fn, id) => {
              const ts = hrtime();
              const data = fs.readFileSync(fn); // тяжелый синхронный код
              tfs += hrtime() - ts;             // ... и его продолжительность
              coordinator.postMessage(
                {id, data}
              , [data.buffer] // передача по ссылке через transferList
              );
            });
          });
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      [coordinator, ...workers].forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| fs.read'
      , (Number(tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| total'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , (coordinator.util * 100 | 0).toString().padStart(3) + ' c'
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(3)
        ).join(' ')
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // выделяем сегменты разделяемой памяти
      const sharedBufferData = new SharedArrayBuffer(taskSize);
      const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
      // инициализируем "пустое" состояние блокировки
      const lock = new Int32Array(sharedBufferLock);
      lock.fill(THREAD_FREE);

      const channel = new MessageChannel();
      // передаем в поток ссылки на сегменты разделяемой памяти
      const worker = new Worker(__filename,
        {
          workerData : {
            signalPort : channel.port1
          , workerType : 'worker'
          , data : sharedBufferData
          , lock : sharedBufferLock
          }
        , transferList : [channel.port1]
        }
      );
      worker.signalPort = channel.port2;
      worker.data = new Uint8Array(sharedBufferData);
      worker.lock = lock;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
    .then(result => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  const {signalPort, workerType} = workerData;
  switch (workerType) {
    case 'worker':
      const {data, lock} = workerData;
      const sharedData = new Uint8Array(data);
      const sharedLock = new Int32Array(lock);

      const processMessage = () => {
        // обрабатываем сообщение
        parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
        // уведомляем координатор о своей доступности
        sharedLock[0] = THREAD_FREE;
        Atomics.notify(sharedLock, 0, 1);
        // ... и возвращаемся к ожиданию блокировки
        wait();
      };

      const wait = () => {
        const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          processMessage();
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            processMessage();
          });
        }
      };
      // сигнализируем готовность и начинаем ждать
      signalPort.on('message', () => {
        signalPort.postMessage(undefined);
        wait();
      });
      break;
    case 'coordinator':
      const pool = [];
      const queue = new (require('./Pow2Buffer'))(8, 16);

      const shareMessage = (worker, {id, data}) => {
        worker.data.set(data, 0);
        worker.lock[0] = id;
        Atomics.notify(worker.lock, 0, 1);

        const lock = Atomics.waitAsync(worker.lock, 0, id);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          onMessage(worker);
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            onMessage(worker);
          });
        }
      }

      const onMessage = worker => {
        const msg = queue.shift();
        if (msg) {
          shareMessage(worker, msg);
        }
        else {
          pool.push(worker);
        }
      }

      // по сигнальному каналу передаем порт worker'а
      signalPort.on('message', worker => {
        // добавляем в пул и подписываемся на обработку сигнала готовности от worker'а
        pool.push(worker);
        signalPort.postMessage(undefined);
      });
      // обработка входящего сообщения координатору
      parentPort.on('message', message => {
        const worker = pool.pop();
        if (worker) {
          shareMessage(worker, message);
        }
        else {
          queue.push(message);
        }
      });
      break;
  }
}
generated: 2043 ms
hashed  1:  373 ms | fs.read 2387 ms | total 2761 ms | 42 |   10 c  42   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  2:  118 ms | fs.read  602 ms | total  720 ms | 89 |   39 c  98  79   0   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  3:  119 ms | fs.read  570 ms | total  690 ms | 80 |   33 c 100  99  40   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  4:   75 ms | fs.read  603 ms | total  679 ms | 77 |   23 c 100 100  96  12   0   0   0   0   0   0   0   0   0   0   0   0
hashed  5:  204 ms | fs.read  931 ms | total 1135 ms | 81 |   27 c 100 100 100  98  11   0   0   0   0   0   0   0   0   0   0   0
hashed  6:  146 ms | fs.read  994 ms | total 1141 ms | 82 |   12 c 100 100 100 100  88   9   0   0   0   0   0   0   0   0   0   0
hashed  7:  260 ms | fs.read  952 ms | total 1212 ms | 81 |   22 c 100 100 100 100 100  67   5   0   1   0   0   0   1   1   0   1
hashed  8:  142 ms | fs.read 1433 ms | total 1576 ms | 89 |   16 c 100 100 100 100 100 100 100  13   0   0   0   0   0   0   0   0
hashed  9:  253 ms | fs.read 1554 ms | total 1807 ms | 86 |    9 c 100 100 100 100 100 100 100  74   0   0   0   0   0   0   0   0
hashed 10:  196 ms | fs.read 1587 ms | total 1784 ms | 80 |    7 c 100 100 100 100 100 100 100 100   6   2   0   0   0   0   0   0
hashed 11:  311 ms | fs.read 1948 ms | total 2259 ms | 87 |   21 c 100 100 100 100 100 100 100 100  81  81   4   0   0   0   0   0
hashed 12:  237 ms | fs.read 1973 ms | total 2210 ms | 83 |    7 c 100 100 100 100 100 100 100 100 100 100   0   0   0   0   0   0
hashed 13:  362 ms | fs.read 2014 ms | total 2377 ms | 78 |   14 c 100 100 100 100 100 100 100 100 100 100   5   5   8   0   0   0
hashed 14:   73 ms | fs.read 2280 ms | total 2354 ms | 77 |    6 c 100 100 100 100 100 100 100 100 100 100   7  38  37   5   0   0
hashed 15:  413 ms | fs.read 2513 ms | total 2927 ms | 83 |   12 c 100 100 100 100 100 100 100 100 100 100  52 100 100   3   1   0
hashed 16:  241 ms | fs.read 2619 ms | total 2861 ms | 86 |   21 c 100 100 100 100 100 100 100 100 100 100 100 100 100  52  11  12

Координатор решает!
Координатор решает!

Код нашего приложения уже совсем перестал быть простым, но зато мы все-таки еще почти в 4 раза уменьшили время обработки.

При этом по координатору еще остается запас для другой полезной работы - например, для динамического управления количеством вспомогательных потоков. Но про это - в следующей части.


Комментарии (12)


  1. mayorovp
    04.10.2022 17:57

    А не проще ли вынести из главного потока те самые синхронные задачи, чем придумывать какие-то координаторы?


    1. Kilor Автор
      04.10.2022 18:11

      Не проще, если/когда вы над этим не властны.

      Пример из жизни: процесс принимает стрим текстовых строк, применяет на них регулярку (синхронно, увы!) и, когда совпадение найдено, нарезает текстовый блок до следующего совпадения. Примерно так: ......AAA......BB.... А блок (AAA, BB) отдается как раз вспомогательным потокам на CPU-intensive парсинг.

      Применение регулярок вы в потоки тут особо не засунете, поскольку издержки на саму передачу контента становятся слишком велики.


      1. PaulIsh
        04.10.2022 18:28
        +2

        Offtopic про регулярки. Если их можно заменить на поиск через indexOf или еще как-то, то желательно заменить. Сталкивался с очень серьезным падением производительности (на порядок) в нагруженном сервисе именно на поиске подстрок через регулярки.


        1. Kilor Автор
          04.10.2022 18:35
          +1

          Согласен на 100%. Но многие регулярки так не заменишь - например, банальный поиск первого непробельного символа /\S/.


          1. PaulIsh
            04.10.2022 18:43

            можно сравнить по скорости с поиском через цикл по строке и проверкой на пробел, \t, \r, \n, \f

            А в целом, да, я конечно признаю, что без регулярок местами очень плохо.


            1. Kilor Автор
              04.10.2022 18:54

              Это будет реализация в прикладном коде ровно того же, что делает RE-библиотека в коде скомпилированном. Явно не быстрее.


  1. Format-X22
    04.10.2022 21:32
    -1

    Я в продакшене всё такое в отдельные микросервисы выношу. Апи отдельно, агрегаторы отдельно. Между ними БД. Апи горизонтально масштабируются отдельно от агрегаторов. Получается очень неплохо.


    1. Kilor Автор
      04.10.2022 21:34

      Исключительно вопрос эффективности использования выделенных ресурсов. Очевидно, что передача по сети даже в рамках одного ДЦ медленее, чем передача в поток через память, а ресурсов требует больше.


      1. Format-X22
        04.10.2022 23:30

        Если нужно посчитать один раз и результат всегда уникален - да, вы правы. В остальных случаях мой способ чаще всего будет эффективнее.


        1. Kilor Автор
          04.10.2022 23:57

          А можно чуть подробнее? Вроде если считать надо не один раз, и результат неуникален, то это решается просто кэшированием.


          1. Format-X22
            05.10.2022 00:12

            Не всё можно закешировать. Кейс - есть несколько внешних систем, мы из них данные собираем, после планируем их по апи отдавать. Можно было бы кешированием, но если нам нужны фильтры, сортировка и пагинация по этим данным - кеши раздует очень сильно. Мы решаем что будем данные отдавать из памяти, но тогда у нас блокировка одного потока, пока там какие-нибудь хитрые вычисления - апи не отдаст ничего. Тогда мы можем как в этой статье выносить в отдельные потоки. Но тогда при масштабировании у нас будет много копий данных, если они большие - нужна БД. Если по данным сложные фильтры - документная или реляционная. Но если мы хотим горизонтальное масштабирование - нам нужно будет много инстансоа апи, но агрегаторов совсем не много. И тогда исчезает смысл распаралеливания и всяких хитрых механизмов переключения, получения данных и тп. Пусть за нас это ОС решает, там уже отлажено, к тому же это могут быть разные машины и даже континенты земли. И тогда порезать по паттерну когда апи отдельно, агрегаторы отдельно - уже выходит лучшим вариантом.

            В целом могут быть кейсы когда поток будет лучше, вот тот же вариант когда нужно что-то считать вместе и быстро - потоки будут быстрее, лучше и эффективнее. Но нужно понимать когда реально стоит, потому что это действительно быстрее, а когда это велосипед. Только лишь от этого предостерегаю :)


            1. PaulIsh
              05.10.2022 07:53
              +2

              Плюсы потоков:

              • Потоки дешевле процессов по ресурсам для операционки.

              • В поток не надо передавать данные через сеть как обычно в микросервис.

              • Поток всегда доступен, а микросервис может оказаться не доступен (или очередь куда вы шлете задания микросервису).

              Плюсы микросервисов:

              • Можно докинуть количество микросервисов если очередь задач слишком большая (количество потоков обычно регулирует основной процесс и вмешаться тут сложнее).

              • Можно развернуть на разных машинах (и на разных цодах) и не упереться в вычислительные лимиты.

              • Один микросервис может выполнять задачи для разных заказчиков в то время как поток работает на пользу основного процесса.

              • А также следует учитывать полное разделение кода и возможность применения других языков и фреймворков для микросервиса.

              Возможно, я много чего еще упустил, но как мне кажется, уже исходя из этих плюсов можно смотреть как лучше реализовать ту или иную сложную вычислительную задачу.