Привет, друзья!


Представляю вашему вниманию перевод первой части этой замечательной статьи.


Веб-потоки (web streams) — это стандарт для потоков (streams), который поддерживается всеми основными веб-платформами: веб-браузерами, Node.js и Deno. Потоки — это абстракция для чтения и записи данных последовательно, небольшими частями из любого вида источника — файлов, данных, находящихся на сервере, и т.д.


Например, глобальная функция fetch (которая используется для загрузки онлайн-ресурсов) асинхронно возвращает ответ (Response), содержащий свойство body с веб-потоком.


В данной статье рассматриваются веб-потоки в Node.js, но то, о чем мы будем говорить, применимо к любой поддерживающей их платформе.


Содержание



1. Что такое веб-поток?


Поток — это структура данных (data structure) для доступа к таким данным, как:


  • файлы;
  • данные, находящиеся на сервере;
  • и т.д.

Двумя основными преимуществами потоков являются:


  • возможность работы с большим количеством данных благодаря тому, что потоки разделяют их на небольшие части (называемые "чанками"/chunks), которые могут обрабатываться по одному за раз;
  • возможность использования одной структуры данных для обработки разных данных, что облегчает повторное использование кода.

Веб-потоки ("веб" часто опускается) — это относительно новый стандарт, изначально поддерживаемый браузерами, но теперь поддерживаемый также Node.js и Deno, как показано в этой таблице.


Чанки бывают 2 видов:


  • текстовые потоки (text streams): строки;
  • бинарные потоки (потоки байтов) (binary streams): Uint8Array (разновидность TypedArray (типизированных массивов)).

1.1. Виды потоков


Существует 3 основных вида потоков:


  • RS (поток для чтения) (далее — RS) используется для чтения данных из источника (source). Код, который это делает, называется потребителем (consumer);
  • WS (поток для записи) (далее — WS) используется для записи данных в приемник (sink). Код, который это делает, называется производителем (producer);
  • TS (поток для преобразования) (далее — TS) состоит из 2 потоков:
    • он получает данные от записывающей стороны (стороны для записи) (writable side), WS;
    • он отправляет данные читающей стороне (стороне для чтения) (readable side), RS.

Основная идея TS состоит в преобразовании данных, проходящих через туннель (конвейер) (pipe). Другими словами, данные записываются на стороне для записи и после преобразования читаются на стороне для чтения. Следующие TS встроены в большинство платформ, поддерживающих JavaScript:


  • поскольку строки в JS имеют кодировку UTF-16, данные в кодировке UTF-8 обрабатываются как двоичные. TextDecoderStream конвертирует такие данные в строки;
  • TextEncoderStream конвертирует строки в данные в кодировке UTF-8;
  • CompressionStream сжимает двоичные данные в GZIP и другие форматы сжатия;
  • DecompressionStream извлекает данные из GZIP и других форматов.

RS, WS и TS могут применяться для передачи текстовых или бинарных данных. В статье мы будем в основном говорить о текстовых данных. Байтовые потоки для бинарных данных кратко упоминаются в конце.


1.2. Конвейер


"Туннелирование" (piping) — это операция, позволяющая объединять (pipe) RS и WS: до тех пор, пока RS производит данные, данная операция читает их и записывает в WS. При объединении 2 потоков мы получаем надежный способ передачи данных из одной локации в другую (например, для копирования файла). Однако, можно объединить больше 2 потоков и получить конвейер для обработки данных разными способами. Пример конвейера:


  • начинается с RS;
  • затем следует TS;
  • цепочка (chain) заканчивается WS.

1.3. Противодавление


Одной из проблем конвейера может стать ситуация, когда один из звеньев цепочки получает больше данных, чем можем обработать в данный момент. Противодавление (обратное давление) (backpressure) позволяет решить эту задачу: получатель сообщает отправителю о необходимости временно прекратить передачу данных во избежание перегрузки (переполнения).


Другими словами, противодавление — это сигнал, передающийся от перегруженного звена к началу цепочки. Представим, что у нас имеется такая цепочка:


RS -> TS -> WS

Путь противодавления будет следующим:


  • WS сигнализирует, что не справляется с обработкой данных;
  • конвейер прекращает читать данные из TS;
  • данные аккумулируются (накапливаются) внутри TS (это называется буферизацией/buffering);
  • TS сигнализирует о заполненности;
  • конвейер перестает читать данные из RS.

Мы достигли начала цепочки. Пока данные накапливаются внутри RS, у WS есть время на восстановление. После восстановления WS сигнализирует о готовности к получению данных. Этот сигнал также передается в начало цепочки, и обработка данных возобновляется.


1.4. Поддержка потоков в Node.js


В Node.js потоки доступны из 2 источников:


  • из модуля node:stream/web;
  • через глобальные переменные (как в браузере).

На данный момент только один API напрямую поддерживает потоки в Node.jsFetch API:


const response = await fetch("https://exmple.com");
const readableStream = response.body;

Для всего остального следует использовать один из следующих статических методов модуля node:stream для преобразования Node.js-потока в веб-поток, и наоборот:


  • Readable:
    • Readable.toWeb(nodeReadable);
    • Readable.fromWeb(webReadableStream, options?);
  • Writable:
    • Writable.toWeb(nodeWritable);
    • Writable.fromWeb(webWritableStream, options);
  • Duplex:
    • Duplex.toWeb(nodeDuplex);
    • Duplex.fromWeb(webTransformStream, options?).

FileHandle — еще один API, частично поддерживающий потоки через метод readableWebStream.


2. Чтение из RS


RS позволяют читать чанки данных из разных источников. Они имеют следующую сигнатуру:


interface RS<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;

  readonly locked: boolean;

  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WS<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;

  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): RS<TChunk2>;

  // Не рассматривается в статье
  tee(): [RS<TChunk>, RS<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

Свойства:


  • getReader(): возвращает Reader — объект, позволяющий читать из RS. Readers похожи на итераторы, возвращаемые перебираемыми сущностями;
  • locked: одновременно может использоваться только один Reader для одного RS. RS блокируется на время использования Reader, getReader() в этот период вызываться не может;
  • [Symbol.asyncIterator](): данный метод делает RS асинхронно перебираемыми. В настоящее время он реализован только для некоторых платформ;
  • cancel(): отменяет поток, поскольку потребитель больше в нем не заинтересован. reason (причина) передается в базовый источник (underlying source) RS (об этом позже). Возвращаемый промис разрешается после выполнения этой операции;
  • pipeTo(): передает содержимое RS в WS. Возвращаемый промис разрешается после выполнения этой операции. pipeTo() обеспечивает корректную передачу противодавления, сигналов закрытия, ошибок и т.п. по цепочке. В качестве второго параметра он принимает следующие настройки:
    • signal: позволяет передавать AbortSignal для прерывания цепочки с помощью AbortController;
    • preventClose: если имеет значение true, предотвращает закрытие WS при закрытии RS. Может быть полезным при подключении нескольких RS к одному WS;
    • остальные настройки в статье не рассматриваются. Почитать о них можно здесь;
  • pipeThrough(): подключает RS к ReadableWritablePair (по сути, TS, об этом позже). Возвращает результирующий RS (сторону для чтения ReadableWritablePair).

Существует 2 способа потребления RS:


  • Readers;
  • асинхронный перебор.

2.1. Потребление RS через Readers


Для чтения данных из RS могут использоваться Readers. Они имеют следующую сигнатуру:


interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}

interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

Свойства:


  • closed: данный промис разрешается после закрытия потока. Он отклоняется при возникновении ошибки или в случае, когда блокировка Reader снимается до закрытия потока;
  • cancel(): в активном Reader данный метод отменяет соответствующий RS;
  • releaseLock(): деактивирует Reader и разблокирует поток;
  • read(): возвращает промис для ReadableStreamReadResult (обертка для чанка) с 2 свойствами:
    • done: логическое значение — false, если чанки могут читаться, true после последнего чанка;
    • value: чанк или undefined после последнего чанка.

RS похожи на итерируемые сущности, Readers — на итераторы, а ReadableStreamReadResult — на объекты, возвращаемые методом next итераторов.


Код, демонстрирующий протокол использования Readers:


const reader = readableStream.getReader(); // 1

assert.equal(readableStream.locked, true); // 2

try {
  while (true) {
    const { done, value: chunk } = await reader.read(); // 3

    if (done) break;
    // Используем `chunk`
  }
} finally {
  reader.releaseLock(); // 4
}

Получение Reader. Мы можем читать прямо из RS. Сначала получаем Reader (1). Каждый RS может иметь только один Reader. После получения Reader RS блокируется (2). Перед следующим вызовом getReader() необходимо вызвать releaseLock() (4).


Чтение чанков. read() возвращает промис для объекта со свойствами done и value (3). После чтения последнего чанка done принимает значение true. Это похоже на то, как в JS работает асинхронный перебор.


2.1.1. Пример: чтение файла через RS


В следующем примере читаются чанки (строки) из текстового файла data.txt:


import * as fs from "node:fs";
import { Readable } from "node:stream";

const nodeReadable = fs.createReadStream(
  "data.txt",
  { encoding: "utf-8" }
);

const webReadableStream = Readable.toWeb(nodeReadable); // 1

const reader = webReadableStream.getReader();

try {
  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    console.log(value);
  }
} finally {
  reader.releaseLock();
}

Поток Node.js конвертируется в веб-поток. Затем приведенный выше протокол используется для чтения чанков.


2.1.2. Пример: формирование строки из содержимого RS


В следующем примере чанки из RS объединяются в возвращаемую строку:


async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();

  try {
    let result = "";

    while (true) {
      const { done, value } = await reader.read();

      if (done) {
        return result; // 1
      }

      result += value;
    }
  } finally {
    reader.releaseLock(); // 2
  }
}

Блок finally выполняется всегда, независимо от результата блока try. Поэтому блокировка корректно снимается (2) после возвращения результата (1).


2.2. Потребление RS с помощью асинхронного перебора


RS можно потреблять с помощью асинхронного перебора:


const iterator = readableStream[Symbol.iterator]();

let exhaustive = false;

try {
  while (true) {
    let chunk;

    ({ done: exhaustive, value: chunk } = await iterator.next());

    if (exhaustive) break;

    console.log(chunk);
  }
} finally {
  // Если цикл был прерван до окончательного (exhaustive) перебора
  // (через исключение или `return`), мы должны вызвать `iterator.return`
  if (!exhaustive) {
    iterator.return();
  }
}

К счастью, цикл for-await-of обрабатывает все детали асинхронного перебора автоматически:


for await (const chunk of readableStream) {
  console.log(chunk);
}

2.2.1. Пример: использование асинхронного перебора для чтения потока


import * as fs from "node:fs";
import { Readable } from "node:stream";

const nodeReadable = fs.createReadStream(
  "data.txt",
  { encoding: "utf-8" }
);

const webReadableStream = Readable.toWeb(nodeReadable);

for await (const chunk of webReadableStream) {
  console.log(chunk);
}

2.2.2. В настоящее время браузеры не поддерживают асинхронный перебор RS


В данный момент асинхронный перебор RS поддерживается Node.js и Deno, но не браузерами. Соответствующее обращение GitHub.


Обертка из предложения в отчете о багах Chromium:


async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();

  try {
    while (true) {
      const { done, value } = await reader.read();

      if (done) return;

      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

2.3. Создание конвейера


RS предоставляют 2 метода для создания конвейера:


  • readableStream.pipeTo(writeableStream): синхронно возвращает промис p. Он асинхронно читает все чанки RS и записывает их в WS. После завершения чтения p разрешается.

Мы увидим примеры использования pipeTo(), когда будем говорить о WS.


  • readableStream.pipeThrough(transformStream) подключает RS к transformStream.writable и возвращает transformStream.readable (каждый TS содержит эти свойства и ссылается на записывающую и читающую стороны). Другими словами, данная операция представляет собой создание нового RS посредством подключения TS к RS.

Мы увидим примеры использования pipeThrough(), когда будем говорить о TS.


3. Превращение источников данных в RS с помощью упаковки


Для чтения внешнего источника с помощью RS, его можно обернуть в объект-адаптер и передать в конструктор RS. Объект-адаптер называется базовым источником (underlying source) RS (стратегии помещения в очередь (queuing strategies) рассматриваются ниже, в разделе, посвященном противодавлению):


new ReadableStream(underlyingSource?, queuingStrategy?)

Интерфейс базовых источников:


interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;

  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;

  cancel?(reason?: any): void | Promise<void>;

  // Используются только в байтовых потоках и в данном разделе не рассматриваются
  type: "bytes" | undefined;
  autoAllocateChunkSize: bigint;
}

Эти методы вызываются в следующих случаях:


  • start(controller): вызывается сразу после вызова конструктора RS;
  • pull(controller): вызывается при появлении свободного места (пространства, комнаты) (room) во внутренней очереди RS. Он вызывается снова и снова до тех пор, пока очередь не будет заполнена. Данный метод вызывается только после завершения start(). Если pull() ничего не помещает в очередь, он больше не вызывается;
  • cancel(reason): вызывается, если потребитель RS вызывает readableStream.cancel() или reader.cancel().reason является значением, переданным этим методам.

Каждый из приведенных методов может вернуть промис, до разрешения которого другие операции выполняться не будут. Это может быть полезным при выполнении асинхронных задач.


controller, передаваемый в start() и pull() позволяет им получать доступ к потоку. Он имеет следующую сигнатуру:


type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  // не рассматривается
  | ReadableByteStreamController<TChunk>;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;

  readonly desiredSize: number | null;

  close(): void;

  error(err?: any): void;
}

Методы (чанки — это строки):


  • enqueue(chunk): помещает chunk во внутреннюю очередь RS;
  • desiredSize: количество свободного пространства очереди, в которое пишет enqueue(). Имеет значение 0 при заполнении очереди и отрицательное значение при превышении максимального размера очереди. Если желаемый размер (desired size) <= 0, помещение в очередь следует прекратить;
    • если поток закрыт, его desiredSize равен 0;
    • если поток находится в состоянии ошибки, его desiredSize равен null;
  • close(): закрывает RS. Потребители буду иметь возможность опустошать очередь, но после этого поток закончится. Важно, чтобы базовый источник вызывал данный метод, в противном случае, чтение потока никогда не закончится;
  • error(err): переводит поток в состояние ошибки — последующие взаимодействия с ним будут проваливаться со значением err.

3.1. Первый пример реализации базового источника


В первом примере реализуется только метод start(). Случаи использования pull() рассматриваются в следующем подразделе:


const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue("Первая строка\n"); // (1)
    controller.enqueue("Вторая строка\n"); // (2)
    controller.close(); // (3)
  },
});

for await (const chunk of readableStream) {
  console.log(chunk);
}
/**
 * Первая строка\n
 * Вторая строка\n
*/

Мы использует контроллер для создания потока с 2 чанками (1 и 2). Важно закрыть поток (3). В противном случае, цикл for-await-of никогда не завершится.


Обратите внимание: данный способ помещения чанков в очередь не является полностью безопасным — существует риск превышения возможностей внутренней очереди. Скоро мы увидим, как избежать такого риска.


3.2. Использование RS для оборачивания push- или pull-источника


Распространенным сценарием является преобразование push- или pull-источника в RS. Вид источника (push или pull) определяет, как мы обращаемся к RS из UnderlyingSource:


  • push-источник: такой источник уведомляет нас о появлении новых данных. Для регистрации обработчиков и поддержки структур данных используется метод start. Если полученных данных слишком много и желаемый размер перестал быть положительным, источник приостанавливается. При последующем вызове pull() источник может быть возобновлен. Приостановка внешнего источника в ответ на отрицательность желаемого размера называется применением противодавления (applying backpressure);
  • pull-источник: такой источник опрашивается на наличие новых данных — часто асинхронно. Поэтому, как правило, данные извлекаются при вызове pull(), а не при вызове start().

3.2.1. Пример: создание RS из push-источника с поддержкой противодавления


В следующем примере RS оборачивает сокет, который "пушит" нам данные (вызывает нас):


function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new RS({
    start(controller) {
      socket.ondata = (event) => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // Внутренняя очередь заполнилась, передаем
          // сигнал противодавления базовому источнику
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();

      socket.onerror = () => controller.error(
        new Error("Возникла ошибка!"));
    },

    pull() {
      // Данный метод вызывается, если внутренняя очередь опустела, но
      // потребитель потока хочет еще данных.
      // Возобновляем ранее приостановленный поток данных
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}

3.2.2. Пример: создание RS из pull-источника


Вспомогательная функция iterableToReadableStream принимает итерируемую сущность с чанками и преобразует их в RS:


/**
 * @param iterable итерируемая сущность (асинхронная или синхронная)
 */
 function iterableToReadableStream(iterable) {
  return new RS({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === "function") {

        this.iterator = iterable[Symbol.asyncIterator]();

      } else if (typeof iterable[Symbol.iterator] === "function") {

        this.iterator = iterable[Symbol.iterator]();

      } else {
        throw new Error(iterable + " не является итерируемой сущностью");
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Синхронные итераторы возвращают обычные значение (не промисы)
      // однако для `await` это неважно, значения просто передаются дальше
      const { value, done } = await this.iterator.next();

      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }

      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

Создадим асинхронную функцию-генератор и преобразуем ее в RS:


async function* genAsyncIterable() {
  yield "как";
  yield "твои";
  yield "дела?";
}

const readableStream = iterableToReadableStream(genAsyncIterable());

for await (const chunk of readableStream) {
  console.log(chunk);
}
/**
 * как
 * твои
 * дела?
*/

// iterableToReadableStream() также работает с синхронными сущностями
const syncIterable = ["всем", "привет"];

const readableStream = iterableToReadableStream(syncIterable);

for await (const chunk of readableStream) {
  console.log(chunk);
}
/**
 * всем
 * привет
*/

Возможно, в будущем появится статический вспомогательный метод RS.from(), предоставляющий этот функционал из коробки (см. этот пул-реквест).


4. Запись в WS


WS позволяют записывать чанки данных в разные приемники. Они имеют следующую сигнатуру:


interface WS<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;

  readonly locked: boolean;

  close(): Promise<void>;

  abort(reason?: any): Promise<void>;
}

Свойства:


  • getWriter(): возвращает Writer — объект, через который осуществляется запись в WS;
  • locked: один WS может одновременно иметь одного Writer. До тех пор, пока Writer используется, WS блокируется, а метод getWriter() вызываться не может;
  • close(): закрывает поток:
    • базовый приемник (underlying sink) (об этом позже) продолжает получать чанки из очереди;
    • все попытки записи тихо завершаются ничем (без ошибок);
    • метод возвращает промис, который разрешается после успешной записи в приемник всех чанков из очереди и закрытия приемника. При возникновении ошибки промис отклоняется;
  • abort(): прерывает поток:
    • поток переводится в состояние ошибки;
    • промис разрешается после закрытия приемника и отклоняется при возникновении ошибки.

Существует 2 способа записи данных в WS:


  • с помощью Writers;
  • через подключение к WS.

4.1. Запись в WS с помощью Writers


Writers имеют следующую сигнатуру:


interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;

  readonly ready: Promise<undefined>;

  write(chunk?: TChunk): Promise<void>;

  releaseLock(): void;

  close(): Promise<void>;

  readonly closed: Promise<undefined>;

  abort(reason?: any): Promise<void>;
}

Свойства:


  • desiredSize: количество свободного места в очереди WS. Если имеет значение 0, значит, очередь заполнена, если имеет отрицательное значение — превышен максимальный размер очереди. Таким образом, если желаемый размер <= 0, запись следует прекратить;
    • если поток закрыт, желаемый размер равен 0;
    • если поток находится в состоянии ошибки, желаемый размер равен null;
  • ready: возвращает промис, который разрешается при получении желаемым размером положительного значения. Это означает, что отсутствует активное противодавление и запись может быть продолжена. Если желаемый размер вновь становится non-positive, создается и возвращается новый ожидающий (pending) промис;
  • write(): записывает чанк в поток. Возвращает промис, который разрешается после записи и отклоняется при ошибке;
  • releaseLock(): снимает блокировку потока;
  • close(): имеет тот же эффект, что закрытие потока;
  • closed: возвращает промис, разрешающийся при закрытии потока;
  • abort(): имеет тот же эффект, что прерывание потока.

Код, демонстрирующий протокол использования Writers:


const writer = writableStream.getWriter(); // (1)

assert.equal(writableStream.locked, true); // (2)

try {
  // Записываем чанки (об этом позже)
} finally {
  writer.releaseLock(); // (3)
}

Мы можем писать прямо в WS. Сначала создается Writer (1). После получения Writer WS блокируется (2). Перед последующим вызовом getWriter() следует вызвать releaseLock() (3).


Существует 3 способа записи чанков.


4.1.1. Подход 1: ожидание write() (неэффективная обработка противодавления)


Первый подход заключается в ожидании каждого результата write():


await writer.write("Чанк 1");
await writer.write("Чанк 2");
await writer.close();

Промис, возвращаемый write(), разрешается после успешной записи переданного чанка. Что означает "успешная запись", зависит от реализации WS, например, в случае с файлом, чанк может быть отправлен в операционную систему, но находиться в кеше, т.е. не быть фактически записанным на диск.


Промис, возвращаемый close(), разрешается после закрытия потока.


Недостаток данного подходя: ожидание успешной записи означает, что очередь не используется. Как следствие, снижается скорость передачи данных.


4.1.2. Подход 2: игнорирование отклонения write() (игнорирование противодавления)


Второй подход предполагает игнорирование промисов, возвращаемых write(), и ожидание только промиса, возвращаемого close():


writer.write("Чанк 1").catch(() => {}); // (1)
writer.write("Чанк 2").catch(() => {}); // (2)
await writer.close(); // Cообщения об ошибках

Синхронный вызов write() помещает чанки во внутреннюю очередь WS. Мы не ждем записи чанков. Однако ожидание close() означает ожидание опустения очереди и успешной записи чанков.


Вызов catch() (1 и 2) необходим, поскольку позволяет избежать предупреждений о необработанных отклонениях промиса при возникновении проблем с записью. Мы может игнорировать сообщения об ошибках, возвращаемые write(), потому что close() возвращает такие же сообщения.


Можно даже реализовать вспомогательную функцию для игнорирования отклонений:


ignoreRejections(
  writer.write("Чанк 1"),
  writer.write("Чанк 2"),
);
await writer.close(); // Сообщения об ошибках

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

Недостаток данного подхода: противодавление игнорируется. Мы просто исходим из предположения, что очередь является достаточно большой для размещения всех данных, которые записываются.


4.1.3. Подход 3: ожидание ready (эффективная обработка противодавления)


Ожидание геттера ready позволяет эффективно обрабатывать противодавление:


await writer.ready; // Сообщения об ошибках
// Сколько осталось свободного пространства?
console.log(writer.desiredSize);
writer.write("Чанк 1").catch(() => {});

await writer.ready; // Сообщения об ошибках
// Сколько осталось свободного пространства?
console.log(writer.desiredSize);
writer.write("Чанк 2").catch(() => {});

await writer.close(); // Сообщения об ошибках

Промис в ready разрешается при переходе потока от состояния наличия противодавления к состоянию его отсутствия.


4.1.4. Пример: запись файла с помощью Writer


import * as fs from "node:fs";
import { Writable } from "node:stream";

const nodeWritable = fs.createWriteStream(
  "data.txt",
  { encoding: "utf-8" }
); // (1)

const webWritableStream = Writable.toWeb(nodeWritable); // (2)

const writer = webWritableStream.getWriter();

try {
  await writer.write("Первая строка\n");
  await writer.write("Вторая строка\n");
  await writer.close();
} finally {
  writer.releaseLock()
}

Создаем поток Node.js для файла data.txt (1). Конвертируем этот поток в веб-поток (2). Используем Writer для записи в него строк.


4.2. Подключение к WS


Вместо использования Writers, в WS можно писать прямо из RS:


await readableStream.pipeTo(writableStream);

Промис из pipeTo() разрешается при успешном подключении.


4.2.1. Подключение происходит асинхронно


Подключение выполняется после завершения или приостановки текущей задачи. Это демонстрирует следующий код:


const readableStream = new RS({ // (1)
  start(controller) {
    controller.enqueue("Первая строка\n");
    controller.enqueue("Вторая строка\n");
    controller.close();
  },
});

const writableStream = new WritableStream({ // (2)
  write(chunk) {
    console.log("ЗАПИСЬ " + JSON.stringify(chunk));
  },

  close() {
    console.log("ЗАКРЫТИЕ WS");
  },
});

console.log("До .pipeTo()");

const promise = readableStream.pipeTo(writableStream); // (3)

promise.then(() => console.log("Промис разрешен"));

console.log("После .pipeTo()");
/**
 * До .pipeTo()
 * После .pipeTo()
 * ЗАПИСЬ "Первая строка\n"
 * ЗАПИСЬ "Вторая строка\n"
 * ЗАКРЫТИЕ WS
 * Промис разрешен
*/

Создаем RS (1). Создаем WS (2).


Видим, что pipeTo() (3) выполняется незамедлительно. В новой задаче чанки читаются и записываются. Затем WS закрывается и promise разрешается.


4.2.2. Пример: подключение к WS для файла


В следующем примере мы создаем WS для файла и подключаем к нему RS:


const webReadableStream = new RS({ // (1)
  async start(controller) {
    controller.enqueue("Первая строка\n");
    controller.enqueue("Вторая строка\n");
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (2)
  "data.txt",
  { encoding: "utf-8" }
);

const webWritableStream = Writable.toWeb(nodeWritable); // (3)

await webReadableStream.pipeTo(webWritableStream); // (4)

Создаем RS (1). Создаем поток Node.js для файла data.txt (2). Конвертируем данный поток в веб-поток (3). Подключаем webReadableStream к RS для файла.


4.2.3. Пример: запись двух RS в один WS


const createReadableStream = (prefix) =>
  new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + "чанк 1");
      controller.enqueue(prefix + "чанк 2");
      controller.close();
    },
  });

const writableStream = new WritableStream({
  write(chunk) {
    console.log("ЗАПИСЬ " + JSON.stringify(chunk));
  },

  close() {
    console.log("ЗАКРЫТИЕ");
  },

  abort(err) {
    console.log("ПРЕРЫВАНИЕ " + err);
  },
});

await createReadableStream("Поток 1: ")
  .pipeTo(writableStream, { preventClose: true }); // (1)

await createReadableStream("Поток 2: ")
  .pipeTo(writableStream, { preventClose: true }); // (2)

await writableStream.close();
/**
 * ЗАПИСЬ "Поток 1: чанк 1"
 * ЗАПИСЬ "Поток 1: чанк 2"
 * ЗАПИСЬ "Поток 2: чанк 1"
 * ЗАПИСЬ "Поток 2: чанк 2"
 * ЗАКРЫТИЕ
*/

Мы указываем pipeTo() не закрывать WS после закрытия RS (1 и 2). Поэтому WS остается открытым, и к нему можно подключать другие RS.


На этом перевод первой части статьи завершен.


Благодарю за внимание и happy coding!




Комментарии (0)