В тот момент, когда вы набираете что-то на клавиатуре, читаете файл с диска или скачиваете его через Интернет, поток информации (биты) проходит через различные устройства и приложения.

Если вы научитесь работать с этими потоками битов, то сможете создавать высокопроизводительные и полноценные приложения. Например, вспомните, как просматривается видео на YouTube. Необязательно ждать, пока загрузится весь видеоролик. Как только в буфер попадает небольшой фрагмент, он начинает воспроизводиться, а остальное продолжает загружаться по мере просмотра.

Node.js включает встроенный модуль stream, который позволяет нам работать с потоковыми данными. В данной статье на нескольких простых примерах мы объясним, как можно использовать этот модуль. Мы также опишем, как можно построить пайплайны, склеивая различные потоки вместе, чтобы создавать высокопроизводительные приложения для сложных сценариев использования.

Перед тем как перейти к созданию приложений, важно понять, какие возможности предоставляет Node.js модуль stream.

Давайте начнем!

Типы Node.js .Streams

Node.js streams предоставляет четыре типа потоков:

  • Потоки для чтения (Readable Streams).

  • Потоки для записи (Writable Streams).

  • Дуплексные потоки (Duplex Streams).

  • Потоки преобразования (Transform Streams).

Более подробную информацию о типах потоков можно найти в официальных документах Node.js.

Давайте рассмотрим каждый тип потока на высоком уровне.

Readable Streams

Readable stream может считывать данные из определенного источника, чаще всего из файловой системы. Другие распространенные виды использования readable streams в приложениях Node.js:

  • process.stdin - для чтения пользовательского ввода через stdin в терминальном приложении.

  • http.IncomingMessage - для чтения содержимого входящего запроса в HTTP-сервере или для чтения HTTP-ответа сервера в HTTP-клиенте.

Writable Streams

Вы используете writable streams, чтобы записывать данные из приложения в определенное место назначения, например, в файл.

Поток process.stdout может использоваться для записи данных в стандартный вывод и используется внутри console.log.

Далее следуют duplex и transform streams, которые можно определить как "гибридные" типы потоков, построенные на readable и writable streams.

Duplex Streams

Duplex stream - это комбинация readable и writable streams. Он обеспечивает возможность записи данных в определенное место назначения и чтения данных из источника. Наиболее распространенным примером duplex stream является net.Socket, используемый для чтения/записи данных в/из сокета.

Важно знать, что в duplex stream стороны readable и writeable работают независимо друг от друга. Данные не перетекают с одной на другую.

Transform Streams

Transform stream немного похож на duplex stream, но здесь readable-сторона соединена с writable в transform stream.

Хорошим примером может служить класс crypto.Cipher, который имплементирует поток шифрования. Используя поток crypto.Cipher, приложение может записывать простые текстовые данные во writeable часть потока и считывать зашифрованный шифрованный текст из readable части потока. Трансформационная природа этого типа потоков является причиной того, что они называются "transform streams".

Примечание: Другой transform stream - stream.PassThrough, который передает данные с записываемой стороны на читаемую без какого-либо преобразования. Хотя это может показаться тривиальным, Passthrough очень полезны для создания кастомных имплементаций потоков и пайплайнов (например, создание нескольких копий данных одного потока).

Чтение данных из Readable Node.js Streams 

После того как readable stream 'connected' (подключен) к источнику, генерирующему данные (например, файлу), существует несколько способов считывания данных через поток.

Сначала создадим образец текстового файла с именем myfile, содержащий 85 байт текста 'lorem ipsum':

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.

Теперь давайте рассмотрим два различных метода чтения данных из readable stream.

1. Прослушивание событий 'data'

Наиболее распространенным способом чтения данных из readable stream является прослушивание событий 'data', испускаемых потоком. Ниже приведена программа, демонстрирующая этот подход:

const fs = require("fs");
const readable = fs.createReadStream("./myfile", { highWaterMark: 20 });
 
readable.on("data", (chunk) => {
  console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
});

Свойство HighWaterMark, передаваемое в качестве опции в fs.createReadStream, определяет, сколько данных буферизируется внутри потока. Затем данные сбрасываются в механизм чтения (в данном случае в наш data хэндлер). По умолчанию у fs потоков, доступных для чтения, HighWaterMark установлен на 64 кБ. Мы намеренно переопределили его на 20 байт, чтобы инициировать несколько data событий.

При запуске программы, которая описана выше, она прочитает 85 байт из myfile за пять итераций. В консоли вы увидите следующий вывод:

Read 20 bytes
"Lorem ipsum dolor si"

Read 20 bytes
"t amet, consectetur "

Read 20 bytes
"adipiscing elit. Cur"

Read 20 bytes
"abitur nec mauris tu"

Read 5 bytes
"rpis."

2. Использование асинхронных итераторов

Альтернативным способом чтения данных из readable stream является использование асинхронных итераторов:

const fs = require("fs");
const readable = fs.createReadStream("./myfile", { highWaterMark: 20 });
 
(async () => {
  for await (const chunk of readable) {
    console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
  }
})();

Если вы запустите эту программу, то получите тот же вывод, что и в предыдущем примере.

Состояние readable Node.js stream 

Когда к событиям 'data' readable stream подключается слушатель, поток переходит в  состояние 'flowing' (если только он не приостановлен явным образом). Проверить состояние потока можно с помощью свойства readableFlowing объекта потока.

Мы можем продемонстрировать это с помощью немного измененной версии нашего предыдущего примера с 'data' хэндлером:

const fs = require("fs");
const readable = fs.createReadStream("./myfile", { highWaterMark: 20 });
 
let bytesRead = 0;
 
console.log(
  `before attaching 'data' handler. is flowing: ${readable.readableFlowing}`
);
readable.on("data", (chunk) => {
  console.log(`Read ${chunk.length} bytes`);
  bytesRead += chunk.length;
 
  // Pause the readable stream after reading 60 bytes from it.
  if (bytesRead === 60) {
    readable.pause();
    console.log(`after pause() call. is flowing: ${readable.readableFlowing}`);
 
    // resume the stream after waiting for 1s.
    setTimeout(() => {
      readable.resume();
      console.log(
        `after resume() call. is flowing: ${readable.readableFlowing}`
      );
    }, 1000);
  }
});
console.log(
  `after attaching 'data' handler. is flowing: ${readable.readableFlowing}`
);

В этом примере мы осуществляем чтение из myfile через readable stream, но временно выполняем режим 'paused' для потока данных на 1 с после чтения 60 байт из файла. Регистрируем значение свойства readableFlowing в разные моменты времени, чтобы понять, как оно меняется.

При запуске приведенной выше программы на выходе вы получается следующее:

before attaching 'data' handler. is flowing: null
after attaching 'data' handler. is flowing: true
Read 20 bytes
Read 20 bytes
Read 20 bytes
after pause() call. is flowing: false
after resume() call. is flowing: true
Read 20 bytes
Read 5 bytes

Мы можем объяснить результат так:

  • Когда наша программа запускается, readableFlowing имеет значение null, потому что мы не предоставляем никакого механизма потребления из потока.

  • После подключения хэндлера 'data' readable stream переходит в режим 'flowing', и readableFlowing меняется на true.

  • После считывания 60 байт поток устанавливается на 'paused' с помощью вызова pause(), который, в свою очередь, меняет readableFlowing на false.

  • После ожидания в течение 1 с поток снова переключается в режим 'flowing' путем вызова resume(), изменяя readableFlowing на true. После этого оставшееся содержимое файла проходит через поток.

Обработка больших объемов данных с помощью потоков Node.js

Благодаря потокам, приложениям не нужно хранить в памяти большие объемы информации: небольшие фрагменты данных можно обрабатывать по мере их получения.

В этом разделе мы объединим различные потоки для создания реального приложения, способного обрабатывать большие объемы данных. Используем небольшую программу-утилиту, которая генерирует SHA-256 заданного файла.

Но сначала давайте создадим большой файл-макет размером 4 ГБ для тестирования. Это можно сделать с помощью небольшой shell-команды, как показано ниже:

  • На macOS: mkfile -n 4g 4gb_file

  • В Linux: xfs_mkfile 4096m 4gb_file

После создания нашего муляжа 4gb_file, давайте сгенерируем хэш SHA-256 файла без использования stream модуля:

const fs = require("fs");
const crypto = require("crypto");
 
fs.readFile("./4gb_file", (readErr, data) => {
  if (readErr) return console.log(readErr);
  const hash = crypto.createHash("sha256").update(data).digest("base64");
  fs.writeFile("./checksum.txt", hash, (writeErr) => {
    writeErr && console.error(err);
  });
});

Если выполнить приведенный выше код, может возникнуть следующая ошибка:

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB
at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) {
code: 'ERR_FS_FILE_TOO_LARGE'
}

Приведенная выше ошибка возникает потому, что среда выполнения JavaScript не может обрабатывать произвольно большие буферы. Максимальный размер буфера, с которым может справиться среда выполнения, зависит от архитектуры вашей операционной системы. Это можно проверить с помощью переменной buffer.constants.MAX_LENGTH во встроенном модуле buffer.

Даже если у нас не возникло вышеописанной ошибки, хранение больших файлов в памяти проблематично. Имеющаяся физическая память будет ограничивать объем памяти, который может использовать наше приложение. Значительное потребление памяти также может привести к снижению производительности приложения с точки зрения использования CPU, поскольку сборка мусора становится затратной.

Сокращайте используемый объем памяти вашего приложения с помощью pipeline()

Теперь давайте рассмотрим, как мы можем модифицировать наше приложение для использования потоков и избежать появления данной ошибки:

const fs = require("fs");
const crypto = require("crypto");
const { pipeline } = require("stream");
 
const hashStream = crypto.createHash("sha256");
hashStream.setEncoding("base64");
 
const inputStream = fs.createReadStream("./4gb_file");
const outputStream = fs.createWriteStream("./checksum.txt");
 
pipeline(inputStream, hashStream, outputStream, (err) => {
  err && console.error(err);
});

В этом примере мы используем потоковый подход, предоставляемый функцией crypto.createHash. Она возвращает объект "transform stream" hashStream, генерируя хэши для произвольно больших файлов.

Чтобы передать содержимое файла в этот transform stream, мы создали readable stream - inputStream - для 4gb_file с помощью fs.createReadStream. Вывод из из transform stream hashStream мы направляем в writeable outputStream и файл checksum.txt, созданный с помощью fs.createWriteStream.

Если вы запустите приведенное выше приложение, то обнаружите, что файл checksum.txt заполняется хэшем SHA-256 нашего файла размером 4 ГБ.

Использование pipeline() по сравнению с pipe() для потоков

В нашем предыдущем примере мы использовали функцию pipeline для соединения нескольких потоков. Альтернативным общепринятым подходом является использование функции .pipe(), как показано ниже:

inputStream.pipe(hashStream).pipe(outputStream);

Однако использование .pipe() в рабочих приложениях не рекомендуется по нескольким причинам. Если один из piped-потоков закрыт или выдает ошибку, pipe() не будет автоматически уничтожать соединенные потоки. Это может привести к утечке памяти в приложениях. Кроме того, pipe() не пересылает автоматически ошибки между потоками, чтобы они обрабатывались в одном месте.

Для решения этих проблем была введена функция pipeline(), поэтому рекомендуется использовать pipeline() вместо pipe() для соединения нескольких потоков. Рассмотренный выше пример pipe() можно переписать для использования функции pipeline() следующим образом:

pipeline(inputStream, hashStream, outputStream, (err) => {
  err && console.error(err);
});

pipeline() принимает функцию обратного вызова в качестве последнего параметра. Любые пересылаемые ошибки от какого-либо из piped-потоков будут вызывать коллбек, поэтому проще обрабатывать ошибки для всех потоков в одном месте.

Подведение итогов: Экономия памяти и повышение производительности с помощью потоков Node.js

Использование потоков в Node.js помогает нам создавать высокопроизводительные приложения, способные обрабатывать большие объемы данных.

В этой статье мы рассмотрели:

  • Четыре типа потоков Node.js (читаемые, записываемые, дуплексные и трансформируемые потоки).

  • Как можно считать данные из readable Node.js streams, прослушивая события 'data' или используя асинхронные итераторы.

  • Сокращение требуемого объема памяти в приложениях за счет использования pipeline для соединения нескольких потоков.

Небольшое предупреждение: Скорее всего, в большинстве случаев необходимость использования потоков отсутствует, а их применение может усложнить ваше приложение. Убедитесь, что преимущества потокового подхода перевешивают возможные неудобства.

Я бы посоветовал вам прочитать официальную документацию по stream Node.js, чтобы узнать больше и изучить дополнительные случаи их использования.


В любом приложении среднего размера разработчик сталкивается с задачей централизованного управления стейтом. В современном Vue 3 мы можем это делать и без Vuex, полагаясь только на hooks + provide/inject. Рассмотрим плюсы и минусы такого подхода в реальном приложении. Также в сообществе широко обсуждается упрощённый стейт-менеджер под названием Pinya. На открытом занятии установим его и научимся пользоваться. Регистрация для всех желающих доступна по ссылке.

Также приглашаем на открытое занятие «Обзор мира микро-фронтенда», на котором мы рассмотрим идеи, архитектуру и сравним существующие решения Micro FrontEnds.

Комментарии (2)


  1. Rehtide
    04.03.2022 19:10

    Pinya (pinia)


  1. acsais-com
    04.03.2022 19:14

    Спасибо, работа с потоковыми данными - очень актуальная для меня тема!