Web Streams API#

稳定性:2 - 稳定

WHATWG Streams 标准的实现。

概述#

WHATWG Streams 标准(或称“Web Streams”)定义了一个用于处理流式数据的 API。它与 Node.js 的 Streams API 类似,但出现得更晚,并已成为跨多个 JavaScript 环境处理流式数据的“标准”API。

主要有三种类型的对象

  • ReadableStream - 表示流式数据的来源。
  • WritableStream - 表示流式数据的目的地。
  • TransformStream - 表示转换流式数据的算法。

ReadableStream 示例#

此示例创建了一个简单的 ReadableStream,它会每秒推送一次当前的 performance.now() 时间戳,并无限持续。一个异步可迭代对象被用来从流中读取数据。

import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

Node.js 流的互操作性#

Node.js 流可以通过 stream.Readablestream.Writablestream.Duplex 对象上的 toWebfromWeb 方法与 Web Streams 进行相互转换。

更多详情请参阅相关文档

API#

类: ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#
  • underlyingSource <Object>
    • start <Function> 一个用户定义的函数,在 ReadableStream 创建时立即被调用。
    • pull <Function> 一个用户定义的函数,当 ReadableStream 内部队列未满时会重复调用。此操作可以是同步或异步的。如果是异步的,直到前一个返回的 promise 被兑现后,该函数才会再次被调用。
    • cancel <Function> 一个用户定义的函数,当 ReadableStream 被取消时调用。
      • reason <any>
      • 返回:一个解析为 undefined 的 promise。
    • type <string> 必须是 'bytes'undefined
    • autoAllocateChunkSize <number> 仅在 type 等于 'bytes' 时使用。当设置为非零值时,会自动为 ReadableByteStreamController.byobRequest 分配一个视图缓冲区。未设置时,必须使用流的内部队列通过默认读取器 ReadableStreamDefaultReader 来传输数据。
  • strategy <Object>
    • highWaterMark <number> 在施加背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于确定每个数据块的大小。
readableStream.locked#

readableStream.locked 属性默认为 false,当有活动的读取器消耗流数据时,会切换为 true

readableStream.cancel([reason])#
  • reason <any>
  • 返回:一个在取消操作完成后解析为 undefined 的 promise。
readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

使 readableStream.locked 变为 true

readableStream.pipeThrough(transform[, options])#
  • transform <Object>
    • readable <ReadableStream> transform.writable 会将从该 ReadableStream 接收到的可能已修改的数据推送到此 ReadableStream 中。
    • writable <WritableStream>ReadableStream 的数据将被写入的 WritableStream
  • options <Object>
    • preventAbort <boolean> 当为 true 时,此 ReadableStream 中的错误不会导致 transform.writable 被中止。
    • preventCancel <boolean> 当为 true 时,目标 transform.writable 中的错误不会导致此 ReadableStream 被取消。
    • preventClose <boolean> 当为 true 时,关闭此 ReadableStream 不会导致 transform.writable 被关闭。
    • signal <AbortSignal> 允许使用 <AbortController> 来取消数据传输。
  • 返回:<ReadableStream> 来自 transform.readable

将此 <ReadableStream> 连接到 transform 参数中提供的一对 <ReadableStream><WritableStream>,使得此 <ReadableStream> 的数据被写入 transform.writable,可能经过转换,然后推送到 transform.readable。一旦管道配置完成,将返回 transform.readable

在管道操作期间,使 readableStream.locked 变为 true

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);
  // Prints: Aconst {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
    // Prints: A
})();
readableStream.pipeTo(destination[, options])#
  • destination <WritableStream>ReadableStream 的数据将被写入的 <WritableStream>
  • options <Object>
    • preventAbort <boolean> 当为 true 时,此 ReadableStream 中的错误不会导致 destination 被中止。
    • preventCancel <boolean> 当为 true 时,destination 中的错误不会导致此 ReadableStream 被取消。
    • preventClose <boolean> 当为 true 时,关闭此 ReadableStream 不会导致 destination 被关闭。
    • signal <AbortSignal> 允许使用 <AbortController> 来取消数据传输。
  • 返回:一个解析为 undefined 的 promise

在管道操作期间,使 readableStream.locked 变为 true

readableStream.tee()#

返回一对新的 <ReadableStream> 实例,此 ReadableStream 的数据将被转发到这两个实例中。每个实例将接收相同的数据。

使 readableStream.locked 变为 true

readableStream.values([options])#

创建并返回一个可用于消费此 ReadableStream 数据的异步迭代器。

在异步迭代器活动期间,使 readableStream.locked 变为 true

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString()); 
异步迭代#

<ReadableStream> 对象支持使用 for await 语法的异步迭代器协议。

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString()); 

异步迭代器将消费 <ReadableStream> 直到其终止。

默认情况下,如果异步迭代器提前退出(通过 breakreturnthrow),<ReadableStream> 将被关闭。为防止自动关闭 <ReadableStream>,请使用 readableStream.values() 方法获取异步迭代器,并将 preventCancel 选项设置为 true

<ReadableStream> 必须未被锁定(即,它不能有已存在的活动读取器)。在异步迭代期间,<ReadableStream> 将被锁定。

使用 postMessage() 传输#

一个 <ReadableStream> 实例可以使用 <MessagePort> 进行传输。

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]); 

ReadableStream.from(iterable)#

  • iterable <Iterable> 实现 Symbol.asyncIteratorSymbol.iterator 可迭代协议的对象。

一个从可迭代对象创建新的 <ReadableStream> 的实用工具方法。

import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

(async () => {
  const stream = ReadableStream.from(asyncIterableGenerator());

  for await (const chunk of stream)
    console.log(chunk); // Prints: 'a', 'b', 'c'
})();

要将生成的 <ReadableStream> 管道传输到一个 <WritableStream><Iterable> 应该产生一个由 <Buffer><TypedArray><DataView> 对象组成的序列。

import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

await stream.pipeTo(createWritableStreamSomehow());const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');

async function* asyncIterableGenerator() {
  yield Buffer.from('a');
  yield Buffer.from('b');
  yield Buffer.from('c');
}

const stream = ReadableStream.from(asyncIterableGenerator());

(async () => {
  await stream.pipeTo(createWritableStreamSomehow());
})();

类: ReadableStreamDefaultReader#

默认情况下,不带参数调用 readableStream.getReader() 将返回一个 ReadableStreamDefaultReader 的实例。默认读取器将通过流的数据块视为不透明的值,这使得 <ReadableStream> 可以处理几乎任何 JavaScript 值。

new ReadableStreamDefaultReader(stream)#

创建一个新的 <ReadableStreamDefaultReader>,并锁定到给定的 <ReadableStream>

readableStreamDefaultReader.cancel([reason])#
  • reason <any>
  • 返回:一个解析为 undefined 的 promise。

取消 <ReadableStream> 并返回一个在底层流被取消时兑现的 promise。

readableStreamDefaultReader.closed#
  • 类型: <Promise> 当关联的 <ReadableStream> 关闭时,解析为 undefined;如果流出错或在流完成关闭前读取器的锁被释放,则被拒绝。
readableStreamDefaultReader.read()#
  • 返回:一个解析为对象的 promise

从底层的 <ReadableStream> 请求下一个数据块,并返回一个在数据可用时解析为该数据的 promise。

readableStreamDefaultReader.releaseLock()#

释放此读取器对底层 <ReadableStream> 的锁定。

类: ReadableStreamBYOBReader#

ReadableStreamBYOBReader 是面向字节的 <ReadableStream>(即在创建 ReadableStreamunderlyingSource.type 设置为 'bytes' 的流)的另一种消费者。

BYOB 是“bring your own buffer”(自带缓冲区)的缩写。这是一种模式,可以更高效地读取面向字节的数据,避免不必要的复制。

import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString()); 
new ReadableStreamBYOBReader(stream)#

创建一个新的 ReadableStreamBYOBReader,并锁定到给定的 <ReadableStream>

readableStreamBYOBReader.cancel([reason])#
  • reason <any>
  • 返回:一个解析为 undefined 的 promise。

取消 <ReadableStream> 并返回一个在底层流被取消时兑现的 promise。

readableStreamBYOBReader.closed#
  • 类型: <Promise> 当关联的 <ReadableStream> 关闭时,解析为 undefined;如果流出错或在流完成关闭前读取器的锁被释放,则被拒绝。
readableStreamBYOBReader.read(view[, options])#

从底层的 <ReadableStream> 请求下一个数据块,并返回一个在数据可用时解析为该数据的 promise。

不要将池化的 <Buffer> 对象实例传递给此方法。池化的 Buffer 对象是使用 Buffer.allocUnsafe()Buffer.from() 创建的,或者通常由各种 node:fs 模块的回调返回。这些类型的 Buffer 使用共享的底层 <ArrayBuffer> 对象,该对象包含所有池化 Buffer 实例的数据。当一个 Buffer<TypedArray><DataView> 传递到 readableStreamBYOBReader.read() 中时,视图的底层 ArrayBuffer 会被分离,这会使该 ArrayBuffer 上的所有现有视图失效。这可能会对您的应用程序造成灾难性的后果。

readableStreamBYOBReader.releaseLock()#

释放此读取器对底层 <ReadableStream> 的锁定。

类: ReadableStreamDefaultController#

每个 <ReadableStream> 都有一个控制器,负责流的内部状态和队列管理。ReadableStreamDefaultController 是非面向字节的 ReadableStream 的默认控制器实现。

readableStreamDefaultController.close()#

关闭与此控制器关联的 <ReadableStream>

readableStreamDefaultController.desiredSize#

返回填满 <ReadableStream> 队列所需的剩余数据量。

readableStreamDefaultController.enqueue([chunk])#

将一个新的数据块附加到 <ReadableStream> 的队列中。

readableStreamDefaultController.error([error])#

发出一个错误信号,导致 <ReadableStream> 出错并关闭。

类: ReadableByteStreamController#

每个 <ReadableStream> 都有一个控制器,负责流的内部状态和队列管理。ReadableByteStreamController 用于面向字节的 ReadableStream

readableByteStreamController.byobRequest#
readableByteStreamController.close()#

关闭与此控制器关联的 <ReadableStream>

readableByteStreamController.desiredSize#

返回填满 <ReadableStream> 队列所需的剩余数据量。

readableByteStreamController.enqueue(chunk)#

将一个新的数据块附加到 <ReadableStream> 的队列中。

readableByteStreamController.error([error])#

发出一个错误信号,导致 <ReadableStream> 出错并关闭。

类: ReadableStreamBYOBRequest#

在面向字节的流中使用 ReadableByteStreamController,并且使用 ReadableStreamBYOBReader 时,readableByteStreamController.byobRequest 属性提供对 ReadableStreamBYOBRequest 实例的访问,该实例代表当前的读取请求。此对象用于访问为读取请求提供的 ArrayBuffer/TypedArray,并提供用于通知数据已提供的方法。

readableStreamBYOBRequest.respond(bytesWritten)#

通知已有 bytesWritten 数量的字节被写入到 readableStreamBYOBRequest.view

readableStreamBYOBRequest.respondWithNewView(view)#

通知请求已通过写入字节到一个新的 BufferTypedArrayDataView 中来完成。

readableStreamBYOBRequest.view#

类: WritableStream#

WritableStream 是流数据发送的目的地。

import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World'); 
new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <Object>
    • start <Function> 一个用户定义的函数,在 WritableStream 创建时立即被调用。
    • write <Function> 一个用户定义的函数,当一个数据块被写入 WritableStream 时调用。
    • close <Function> 一个用户定义的函数,当 WritableStream 关闭时调用。
      • 返回:一个解析为 undefined 的 promise。
    • abort <Function> 一个用户定义的函数,用于突然关闭 WritableStream
      • reason <any>
      • 返回:一个解析为 undefined 的 promise。
    • type <any> type 选项为将来使用保留,并且必须为 undefined。
  • strategy <Object>
    • highWaterMark <number> 在施加背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于确定每个数据块的大小。
writableStream.abort([reason])#
  • reason <any>
  • 返回:一个解析为 undefined 的 promise。

突然终止 WritableStream。所有排队的写入都将被取消,并且其关联的 promise 将被拒绝。

writableStream.close()#
  • 返回:一个解析为 undefined 的 promise。

在预计没有更多写入时关闭 WritableStream

writableStream.getWriter()#

创建并返回一个新的写入器实例,可用于将数据写入 WritableStream

writableStream.locked#

writableStream.locked 属性默认为 false,当有活动的写入器附加到此 WritableStream 时,会切换为 true

使用 postMessage() 传输#

一个 <WritableStream> 实例可以使用 <MessagePort> 进行传输。

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]); 

类: WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

创建一个新的 WritableStreamDefaultWriter,并锁定到给定的 WritableStream

writableStreamDefaultWriter.abort([reason])#
  • reason <any>
  • 返回:一个解析为 undefined 的 promise。

突然终止 WritableStream。所有排队的写入都将被取消,并且其关联的 promise 将被拒绝。

writableStreamDefaultWriter.close()#
  • 返回:一个解析为 undefined 的 promise。

在预计没有更多写入时关闭 WritableStream

writableStreamDefaultWriter.closed#
  • 类型: <Promise> 当关联的 <WritableStream> 关闭时,解析为 undefined;如果流出错或在流完成关闭前写入器的锁被释放,则被拒绝。
writableStreamDefaultWriter.desiredSize#

填满 <WritableStream> 队列所需的数据量。

writableStreamDefaultWriter.ready#
  • 类型: <Promise> 当写入器准备好使用时,解析为 undefined
writableStreamDefaultWriter.releaseLock()#

释放此写入器对底层 <ReadableStream> 的锁定。

writableStreamDefaultWriter.write([chunk])#
  • chunk <any>
  • 返回:一个解析为 undefined 的 promise。

将一个新的数据块附加到 <WritableStream> 的队列中。

类: WritableStreamDefaultController#

WritableStreamDefaultController 管理 <WritableStream> 的内部状态。

writableStreamDefaultController.error([error])#

由用户代码调用,以通知在处理 WritableStream 数据时发生错误。调用时,<WritableStream> 将被中止,当前待处理的写入将被取消。

writableStreamDefaultController.signal#

类: TransformStream#

一个 TransformStream 由一个 <ReadableStream> 和一个 <WritableStream> 组成,它们被连接起来,使得写入 WritableStream 的数据在被推送到 ReadableStream 的队列之前被接收并可能被转换。

import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]); 
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
  • transformer <Object>
    • start <Function> 一个用户定义的函数,在 TransformStream 创建时立即被调用。
    • transform <Function> 一个用户定义的函数,接收并可能修改写入 transformStream.writable 的数据块,然后将其转发到 transformStream.readable
    • flush <Function> 一个用户定义的函数,在 TransformStream 的可写侧关闭之前立即调用,标志着转换过程的结束。
    • readableType <any> readableType 选项为将来使用保留,并且必须undefined
    • writableType <any> writableType 选项为将来使用保留,并且必须undefined
  • writableStrategy <Object>
    • highWaterMark <number> 在施加背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于确定每个数据块的大小。
  • readableStrategy <Object>
    • highWaterMark <number> 在施加背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于确定每个数据块的大小。
transformStream.readable#
transformStream.writable#
使用 postMessage() 传输#

一个 <TransformStream> 实例可以使用 <MessagePort> 进行传输。

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]); 

类: TransformStreamDefaultController#

TransformStreamDefaultController 管理 TransformStream 的内部状态。

transformStreamDefaultController.desiredSize#

填满可读侧队列所需的数据量。

transformStreamDefaultController.enqueue([chunk])#

将一个数据块附加到可读侧的队列中。

transformStreamDefaultController.error([reason])#

向可读和可写两侧发送信号,通知在处理转换数据时发生错误,导致两侧都被突然关闭。

transformStreamDefaultController.terminate()#

关闭传输的可读侧,并导致可写侧因错误而突然关闭。

类: ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(init)#
byteLengthQueuingStrategy.highWaterMark#
byteLengthQueuingStrategy.size#

类: CountQueuingStrategy#

new CountQueuingStrategy(init)#
countQueuingStrategy.highWaterMark#
countQueuingStrategy.size#

类: TextEncoderStream#

new TextEncoderStream()#

创建一个新的 TextEncoderStream 实例。

textEncoderStream.encoding#

TextEncoderStream 实例支持的编码。

textEncoderStream.readable#
textEncoderStream.writable#

类: TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string> 标识此 TextDecoder 实例支持的 encoding默认值: 'utf-8'
  • options <Object>
    • fatal <boolean> 如果解码失败是致命的,则为 true
    • ignoreBOM <boolean> 当为 true 时,TextDecoderStream 将在解码结果中包含字节顺序标记 (BOM)。当为 false 时,字节顺序标记将从输出中移除。此选项仅在 encoding'utf-8''utf-16be''utf-16le' 时使用。默认值: false

创建一个新的 TextDecoderStream 实例。

textDecoderStream.encoding#

TextDecoderStream 实例支持的编码。

textDecoderStream.fatal#

如果解码错误会导致抛出 TypeError,则该值为 true

textDecoderStream.ignoreBOM#

如果解码结果将包含字节顺序标记,则该值为 true

textDecoderStream.readable#
textDecoderStream.writable#

类: CompressionStream#

new CompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 之一。
compressionStream.readable#
compressionStream.writable#

类: DecompressionStream#

new DecompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip''brotli' 之一。
decompressionStream.readable#
decompressionStream.writable#

实用工具消费者#

实用工具消费者函数提供了消费流的常用选项。

它们通过以下方式访问

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
  // Prints: from readable: 76
});
streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
  // Prints: from readable: 27
});
streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});
streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 100
});
streamConsumers.text(stream)#
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});