Привет, друзья!


Представляю вашему вниманию перевод второй части этой замечательной статьи.


Ссылка на первую часть.


Веб-потоки (web streams) — это стандарт для потоков (streams), который поддерживается всеми основными веб-платформами: веб-браузерами, Node.js и Deno. Потоки — это абстракция для чтения и записи данных последовательно, небольшими частями из любого вида источника — файлов, данных, находящихся на сервере, и т.д.


Например, глобальная функция fetch (которая используется для загрузки онлайн-ресурсов) асинхронно возвращает ответ (Response), содержащий свойство body с веб-потоком.


В данной статье рассматриваются веб-потоки в Node.js, но то, о чем мы будем говорить, применимо к любой поддерживающей их платформе.


Содержание



5. Превращение приемников данных в WS с помощью упаковки


Для того, чтобы иметь возможность записывать данные во внешний приемник через WS, этот источник необходимо обернуть в объект-адаптер, который называется базовым приемником (underlying sink):


new WritableStream(underlyingSink?, queuingStrategy?);

Базовые приемники имеют следующую сигнатуру:


interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;

  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;

  close?(): void | Promise<void>;

  abort?(reason?: any): void | Promise<void>;
}

Свойства:


  • start(controller): вызывается сразу после вызова конструктора WS. Для асинхронных операций можно возвращать промис. Данный метод позволяет подготовиться к записи;
  • write(chunk, controller): вызывается при готовности нового чанка к записи в приемник. Здесь можно отслеживать противодавление, возвращая промис, который разрешается при деактивации противодавления;
  • close(): вызывается после вызова writeStream.close() и записи всех чанков из очереди. Данный метод позволяет выполнять очистку после записи;
  • abort(reason): вызывается в случае вызова writeStream.abort() или writer.abort(). reason — это значение, переданное этим методам.

Параметр controller методов start и write позволяет переводить WS в состояние ошибки. Он имеет следующую сигнатуру:


interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}

  • signal: AbortSignal, позволяющий прерывать запись или закрывать операцию при закрытии потока;
  • error(err): закрывает WS, последующие взаимодействия с ним буду проваливаться со значением err.

5.1. Пример: трассировка RS


В следующем примере RS подключается к WS для отслеживания того, как RS производит чанки:


const readableStream = new RS({
  start(controller) {
    controller.enqueue("Первый чанк");
    controller.enqueue("Второй чанк");
    controller.close();
  },
});

await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log("ЗАПИСЬ " + JSON.stringify(chunk));
    },

    close() {
      console.log("ЗАКРЫТИЕ");
    },

    abort(err) {
      console.log("ПРЕРЫВАНИЕ " + err);
    },
  })
);
/**
 * ЗАПИСЬ "Первый чанк"
 * ЗАПИСЬ "Второй чанк"
 * ЗАКРЫТИЕ
*/

5.2. Пример: формирование строки из записываемых чанков


В следующем примере мы создаем подкласс WS, который собирает все записываемые чанки в строку. Метод getString обеспечивает доступ к строке:


class WritableStringStream extends WS {
  #string = "";

  constructor() {
    super({
      // Нам нужен доступ к `this` `WritableStringStream`,
      // поэтому мы используем стрелочную функцию
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }

  getString() {
    return this.#string;
  }
}

const stringStream = new WritableStringStream();

const writer = stringStream.getWriter();

try {
  await writer.write("Как");
  await writer.write(" твои ");
  await writer.write(" дела?");
  await writer.close();
} finally {
  writer.releaseLock()
}

assert.equal(
  stringStream.getString(),
  "Как твои дела?"
);

Недостатком данного подхода является смешение API: WS API и API нашего строкового потока. Альтернативой является делегирование ответственности WS вместо его расширения:


function createWritableStringStream() {
  let string = "";

  return {
    stream: new WS({
      write(chunk) {
        string += chunk;
      },
    }),

    getString() {
      return string;
    },
  };
}

const stringStream = createWritableStringStream();

const writer = stringStream.stream.getWriter();

try {
  await writer.write("Как");
  await writer.write(" твои ");
  await writer.write(" дела?");
  await writer.close();
} finally {
  writer.releaseLock()
}

assert.equal(
  stringStream.getString(),
  "Как твои дела?"
);

Эта функциональность также может быть реализована с помощью класса (вместо фабричной функции для объектов).


6. Использование TS


TS:


  • получает входные данные через сторону для записи (writable side), WS;
  • может преобразовывать входные данные;
  • позволяет читать результат через сторону для чтения (readable side), RS.

Основным способом применения TS является пропускание через них данных (pipe through) для преобразования:


const transformedStream = readableStream.pipeThrough(transformStream);

pipeThrough() подключает RS к стороне для записи TS и возвращает его сторону для чтения. Другими словами, создается новый RS, который является преобразованной версией RS.


pipeThrough() принимает не только TS, но также любой объект, соответствующий такому контракту:


interface ReadableWritablePair<RChunk, WChunk> {
  readable: RS<RChunk>;
  writable: WS<WChunk>;
}

6.1. Стандартные TS


Node.js поддерживает следующие стандартные TS:



6.1.1. Пример: декодирование потока байтов в кодировке UTF-8


const response = await fetch("https://example.com");
const readableByteStream = response.body;

const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream("utf-8"));

for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body — это ReadableByteStream (доступный для чтения поток байтов), чанки которого являются экземплярами Uint8Array. Мы пропускаем этот поток через TextDecoderStream для получения потока, содержащего строковые чанки.


Обратите внимание: декодирование каждого байтового чанка по отдельности (например, с помощью TextDecoder) работать не будет, поскольку байты единицы кода могут находиться в разных чанках.


6.1.2. Пример: создание доступного для чтения строкового потока для стандартного входа


Следующий модуль Node.js выводит в терминал все, что получает через стандартный вход:


import { Readable } from "node:stream";

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream("utf-8"));

for await (const chunk of webStream) {
  console.log(">>>", chunk);
}

Доступ к стандартному входу можно получить через поток, хранящийся в process.stdin. Без указания кодировки для потока и его преобразования с помощью Readable.toWeb() создается байтовый поток. Он пропускается через TextDecoderStream для получения текстового потока.


Обратите внимание: мы обрабатываем стандартный вход инкрементально — как только очередной чанк становится доступным, он выводится в терминал. Другими словами, нам не нужно ждать завершения ввода. Это может быть полезным при большом количестве данных или в случае, когда данные приходят с перерывами.


7. Реализация кастомных TS


Кастомный TS можно реализовать путем передачи объекта Transformer в конструктор TS. Данный объект имеет следующую сигнатуру:


interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;

  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;

  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

Свойства:


  • start(controller): вызывается сразу после вызова конструктора TS. Данный метод позволяет подготовиться к преобразованиям;
  • transform(chunk, controller): выполняет преобразования. Получает входной чанк и может использовать его параметр controller для помещения в очередь одного или нескольких преобразованных выходных чанков. Также может ничего не помещать в очередь;
  • flush(controller): вызывается после успешного преобразования всех чанков. Данный метод позволяет выполнять очистку.

Каждый из этих методов может возвращать промис, до разрешения которого выполнение кода приостанавливается. Это может быть полезным при выполнении асинхронных операций.


Параметр controller имеет следующую сигнатуру:


interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;

  readonly desiredSize: number | null;

  terminate(): void;

  error(err?: any): void;
}

Свойства:


  • enqueue(chunk): помещает чанк в сторону для чтения (вывод) TS;
  • desiredSize: возвращает желаемый размер внутренней очереди стороны для чтения (вывода) TS;
  • terminate(): закрывает сторону для чтения (вывод) и переводит сторону для записи (вход) TS в состояние ошибки. Может использоваться в случае, когда преобразователь не заинтересован в оставшихся чанках стороны для записи (входа) и хочет их пропустить;
  • error(err): переводит TS в состояние ошибки, последующие взаимодействия с ним будут проваливаться со значением err.

Что насчет противодавления? Противодавление передается от стороны для чтения (выхода) к стороне для записи (вход). Обычно, преобразования не сильно влияют на количество данных. Поэтому по умолчанию преобразователи игнорируют противодавление. Тем не менее, оно может быть обнаружено через transformStreamDefaultController.desiredSize и передано посредством возврата промиса из transformer.transform().


7.1. Пример: преобразование потока произвольных чанков в поток строк


Следующий подкласс TS преобразует поток произвольных чанков в поток, где каждый чанк занимает ровно одну строку. Каждый чанк, за исключением последнего, заканчивается символом \n в Unix и символами \r\n в Windows:


class ChunksToLinesTransformer {
  #previous = "";

  transform(chunk, controller) {
    let startSearch = this.#previous.length;

    this.#previous += chunk;

    while (true) {
      // Работает для EOL === "\n" и EOL === "\r\n"
      const eolIndex = this.#previous.indexOf("\n", startSearch);

      if (eolIndex < 0) break;

      // Строка включает EOL
      const line = this.#previous.slice(0, eolIndex + 1);

      controller.enqueue(line);

      this.#previous = this.#previous.slice(eolIndex + 1);

      startSearch = 0;
    }
  }

  flush(controller) {
    // Выполняем очистку и помещаем в очередь оставшийся текст
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}

class ChunksToLinesStream extends TS {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new RS({
  async start(controller) {
    controller.enqueue("несколько\nстрок\nтекста");
    controller.close();
  },
});

const transformStream = new ChunksToLinesStream();

const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log(">>>", JSON.stringify(line));
}
/**
 * >>> "несколько\n"
 * >>> "строк\n"
 * >>> "текста"
*/

Обратите внимание: Deno имеет встроенный TextLineStream со схожим функционалом.


7.2. Асинхронные генераторы также отлично подходят для преобразования потока


Поскольку RS являются асинхронно перебираемыми, мы можем использовать асинхронные генераторы для их преобразования. Это делает код очень элегантным:


const stream = new RS({
  async start(controller) {
    controller.enqueue("раз");
    controller.enqueue("два");
    controller.enqueue("три");
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield "> " + chunk;
  }
}

const transformedAsyncIterable = prefixChunks("> ", stream);

for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}
/**
 * > раз
 * > два
 * > три
*/

8. Детали противодавления


Рассмотрим такой конвейер:


rs.pipeThrough(ts).pipeTo(ws);

Подключения, создаваемые этим выражением (pipeThrough() использует pipeTo() для подключения rs к стороне для записи ts):


rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

Наблюдения:


  • базовый источник rs может быть представлен как элемент цепочки, предшествующий rs;
  • базовый приемник ws может быть представлен как элемент цепочки, следующий за ws;
  • каждый поток имеет внутренний буфер: буферы RS следуют за базовыми источниками, буферы WS находятся перед базовыми приемниками.

Предположим, что базовый приемник ws является медленным, что привело к заполнению буфера ws. В этом случае происходит следующее:


  • ws сигнализирует о заполненности;
  • pipeTo прекращает чтение данных из ts.readable;
  • ts.readable сигнализирует о заполненности;
  • ts прекращает передачу чанков из ts.writable в ts.readable;
  • ts.writable сигнализирует о заполненности;
  • pipeTo прекращает читать данные из rs;
  • rs сообщает о заполненности базовому источнику;
  • базовый источник приостанавливается.

Этот пример показывает, что нам требуется 2 вида функционала:


  • сущности, получающие данные, должны иметь возможность посылать сигнал противодавления;
  • сущности, отправляющие данные, должны реагировать на сигналы противодавления.

Посмотрим, как данный функционал реализуется в веб-потоках.


8.1. Сигнализация противодавления


Сигнал о противодавлении посылается сущностями, которые получают данные. Веб-потоки имеют 2 таких сущности:


  • WS получает данные через метод write объекта Writer;
  • RS получает данные при вызове метода enqueue объекта ReadableStreamDefaultController базового источника.

В обоих случаях входные данные буферизуются с помощью очередей. Сигнал о противодавлении возникает при заполнении очереди. Как его можно обнаружить?


Вот где находятся очереди:


  • очередь WS хранится внутри WritableStreamDefaultController;
  • очередь RS хранится внутри ReadableStreamDefaultController.

Желаемый размер (desired size) — это число, означающее количество свободного места (пространства, комнаты) (room) в очереди:


  • если свободное место имеется, число положительное;
  • если места нет, число равно 0;
  • если очередь переполнена, число отрицательное.

Поэтому противодавление применяется, когда желаемый размер <= 0. Размер доступен через геттер desiredSize объекта, содержащего очередь.


Как вычисляется желаемый размер? Через объект, определяющий так называемую стратегию помещения в очередь (queuing strategy). RS и WS имеют дефолтные стратегии помещения в очередь, которые могут быть перезаписаны через опциональный параметр их конструкторов. Интерфейс QueuingStrategy содержит 2 свойства:


  • метод size(chunk) возвращает размер chunk;
    • текущий размер очереди — это сумма размеров содержащихся в ней чанков;
  • свойство highWaterMark (верхняя отметка) определяет максимальный размер очереди.

Желаемый размер очереди — это верхняя отметка минус текущий размер очереди:


desiredSize = highWaterMark - [chunks].reduce((x, y) => x + y, 0);

8.2. Обработка противодавления


Сущности, отправляющие данные, должны реагировать на противодавление.


8.2.1. Код, записывающий данные в WS через Writer


  • Мы можем ждать разрешения промиса в writer.ready. В это время мы заблокированы, и достигается противодавление. Промис разрешается при появлении свободного места в очереди. Разрешение запускается, когда значение writer.desiredSize становится пложительным;
  • в качестве альтернативы, можно ждать разрешения промиса, возвращаемого writer.write(). В это время очередь не заполняется.

Также имеется возможность указывать размер чанков в writer.desiredSize.


8.2.2. Базовый источник RS


Объект базового источника, который передается RS, оборачивает внешний источник. Он является звеном цепочки, располагающимся перед RS.


  • Базовые pull-источники запрашивают новые данные при появлении комнаты в очереди. При отсутствии комнаты противодавление выполняется автоматически, поскольку данные не запрашиваются;
  • базовые push-источники должны проверять controller.desiredSize после помещения чего-либо в очередь: если желаемый размер <=0, они должны выполнять противодавление путем приостановки их внешних источников.

8.2.3. Базовый приемник WS


Объект базового приемника, передаваемый в WS, оборачивает внешний приемник. Он является звеном цепочки, располагающимся после WS.


Каждый внешний источник сообщает о противодавлении по-разному (в некоторых случаях этого вообще не происходит). Базовый источник может оказывать противодавление путем возврата промиса из метода write, который разрешается после завершения записи. В стандарте веб-потоков имеется пример того, как это работает.


8.2.4. TS (writable-readable)


TS подключает сторону для записи к стороне для чтения путем реализации базового приемника для первой и базового источника для второй. Он имеет внутренний слот [[backpressure]], который является индикатором активности противодавления.


  • Метод write базового приемника стороны для записи асинхронно ожидает завершения внутреннего противодавления перед передачей очередного чанка преобразователю TS (TransformStreamDefaultSinkWriteAlgorithm). Преобразователь может использовать очередь через TransformStreamDefaultController. Обратите внимание: write() возвращает промис, который разрешается после выполнения метода. До разрешения этого промиса WS буферизует входящие запросы на запись с помощью очереди. Поэтому противодавление стороны для записи реализуется через очередь и ее желаемый размер;
  • противодавление TS активируется, если чанк помещается в очередь с помощью TransformStreamDefaultController и очередь стороны для чтения становится полной ((TransformStreamDefaultControllerEnqueue)[https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue]);
  • противодавление TS может быть деактивировано, если что-либо читается из Reader ((ReadableStreamDefaultReaderRead)[https://streams.spec.whatwg.org/#readable-stream-default-reader-read]);

8.2.5. pipeTo() (RS -> WS)


pipeTo() читает чанки из RS через Reader и записывает их в WS через Writer. Он приостанавливается, когда writer.desiredSize <= 0 (шаг 15 (ReadableStreamPipeTo)[https://streams.spec.whatwg.org/#readable-stream-pipe-to]).


9. Потоки байтов


До сих пор мы говорили о текстовых потоках, потоках, чанки которых представляют собой строки. Но Web streams API также поддерживает потоки байтов для двоичных данных, где чанки являются Uint8Arrays (типизированными массивами):


  • RS имеют специальный режим (mode) bytes;
  • для WS неважно, чем являются чанки, строками или Uint8Arrays. Следовательно, каким потоком является экземпляр WS зависит от того, что способен обрабатывать базовый приемник;
  • какой вид чанков способен обрабатывать TS также зависит от его Transformer.

Посмотрим, как создать доступный для чтения поток байтов.


9.1. Доступный для чтения поток байтов


Тип создаваемого конструктором RS потока зависит от опционального свойства type его опционального первого параметра underlyingSource:


  • если type не указан или underlyingSource не передан, создается текстовый поток;
  • если type имеет значение "bytes" (строка), новый экземпляр будет представлять собой поток байтов:

const readableByteStream = new RS({
  type: "bytes",
  async start() {}
  // ...
});

На что влияет режим bytes?


В дефолтном режиме базовый источник может возвращать любой вид чанков. В "байтовом" режиме чанки должны быть ArrayBufferViews, т.е. TypedArrays (такими как Uint8Arrays) или DataViews.


Доступный для чтения поток байтов может создавать 2 вида Reader:


  • getReader() возвращает экземпляр ReadableStreamDefaultReader;
  • getReader({ mode: "byob" }) возвращает экземпляр ReadableStreamBYOBReader.

BYOB расшифровывается как Bring Your Own Buffer (предоставьте собственный буфер) и означает, что мы может передать буфер (ArrayBufferView) в reader.read(). После этого данный ArrayBufferView отключается и больше не используется. Однако read() возвращает данные в виде нового ArrayBufferView, который имеет тот же тип и обращается к той же области того же самого ArrayBuffer.


Кроме того, доступные для чтения потоки байтов имеют разные контроллеры: они являются экземплярами ReadableByteStreamController (а не ReadableStreamDefaultController). Помимо принуждения (forcing) базовых источников помещать в очередь ArrayBufferViews (TypedArrays или DataViews), они поддерживают ReadableStreamBYOBReaders через свойство byobRequest. В стандарте веб-потоков приведено 2 примера использования byobRequest.


9.2. Пример: бесконечный доступный для чтения поток байтов, заполняемый произвольными данными


В следующем примере создается бесконечный доступный для чтения поток байтов, заполняющий чанки произвольными данными:


import { promisify } from "node:util";
import { randomFill } from "node:crypto";

const asyncRandomFill = promisify(randomFill);

const readableByteStream = new RS({
  type: "bytes",

  async pull(controller) {
    const byobRequest = controller.byobRequest;

    await asyncRandomFill(byobRequest.view);

    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({ mode: "byob" });

const buffer = new Uint8Array(10); // (1)

const firstChunk = await reader.read(buffer); // (2)
console.log(firstChunk);

Поскольку readableByteStream является бесконечным, мы не можем перебрать его в цикле. Поэтому читается только его первый чанк (2).


Созданный буфер (1) передается в reader.read() (2) и после этого становится недоступным для чтения.


9.3. Пример: сжатие доступного для чтения потока байтов


В следующем примере создается доступный для чтения поток байтов, который пропускается через поток, сжимающий данные в формат GZIP:


const readableByteStream = new RS({
  type: "bytes",

  start(controller) {
    // 256 нулей
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});

const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream("gzip"));

await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = transformedStream.getReader();

  try {
    while (true) {
      const { done, value } = await reader.read();

      if (done) break;

      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

9.4. Пример: чтение веб-страницы с помощью fetch()


Результат fetch() разрешается объектом ответа, свойство body которого представляет собой доступный для чтения поток байтов. Данный поток преобразуется в текстовый поток с помощью TextDecoderStream:


const response = await fetch("https://example.com");
const readableByteStream = response.body;

const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream("utf-8"));

for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

10. Специфичные для Node.js вспомогательные функции


Node.js — единственная на сегодняшний день платформа, поддерживающая следующие вспомогательные функции, именуемые utility consumers (утилитами потребления?):


import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from "node:stream/consumers";

Эти функции преобразуют веб RS, Readables Node.js и AsyncIterators в промисы, которые разрешаются:


  • ArrayBuffersarrayBuffer();
  • Blobsblob();
  • буферами Node.jsbuffer();
  • объектами JSONjson();
  • строками — text().

Предполагается, что бинарные данные имеют кодировку UTF-8:


import * as streamConsumers from "node:stream/consumers";

const readableByteStream = new RS({
  type: "bytes",

  start(controller) {
    // TextEncoder преобразует строки в  Uint8Arrays в кодировке UTF-8
    const encoder = new TextEncoder();

    const view = encoder.encode(`"????"`);

    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );

    controller.enqueue(view);

    controller.close();
  },
});

const jsonData = await streamConsumers.json(readableByteStream);

assert.equal(jsonData, "????");

Строковые потоки работают, как ожидается:


import * as streamConsumers from "node:stream/consumers";

const readableByteStream = new RS({
  start(controller) {
    controller.enqueue(`"????"`);

    controller.close();
  },
});

const jsonData = await streamConsumers.json(readableByteStream);

assert.equal(jsonData, "????");

В данной статье мы рассмотрели далеко не все аспекты API веб-потоков. Вот несколько ссылок для дальнейшего изучения этого интерфейса.


Надеюсь, вы, как и я, узнали для себя что-то новое и не жалеете потраченного времени.


Благодарю за внимание и happy coding!




Комментарии (0)