Node.js v25.0.0 文档
- Node.js v25.0.0
-
目录
- Web Streams API
- 概述
- API
- 类:
ReadableStreamnew ReadableStream([underlyingSource [, strategy]])readableStream.lockedreadableStream.cancel([reason])readableStream.getReader([options])readableStream.pipeThrough(transform[, options])readableStream.pipeTo(destination[, options])readableStream.tee()readableStream.values([options])- 异步迭代
- 使用
postMessage()传输
ReadableStream.from(iterable)- 类:
ReadableStreamDefaultReader - 类:
ReadableStreamBYOBReader - 类:
ReadableStreamDefaultController - 类:
ReadableByteStreamController - 类:
ReadableStreamBYOBRequest - 类:
WritableStream - 类:
WritableStreamDefaultWriternew WritableStreamDefaultWriter(stream)writableStreamDefaultWriter.abort([reason])writableStreamDefaultWriter.close()writableStreamDefaultWriter.closedwritableStreamDefaultWriter.desiredSizewritableStreamDefaultWriter.readywritableStreamDefaultWriter.releaseLock()writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController - 类:
TransformStream - 类:
TransformStreamDefaultController - 类:
ByteLengthQueuingStrategy - 类:
CountQueuingStrategy - 类:
TextEncoderStream - 类:
TextDecoderStream - 类:
CompressionStream - 类:
DecompressionStream - 实用工具消费者
- 类:
- Web Streams API
-
索引
- 断言测试
- 异步上下文跟踪
- 异步钩子
- 缓冲区
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入器 API
- 子进程
- 集群
- 命令行选项
- 控制台
- 加密
- 调试器
- 已弃用的 API
- 诊断通道
- DNS
- 域
- 环境变量
- 错误
- 事件
- 文件系统
- 全局对象
- HTTP
- HTTP/2
- HTTPS
- 检查器
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:moduleAPI - 模块:包
- 模块:TypeScript
- 网络
- 操作系统
- 路径
- 性能钩子
- 权限
- 进程
- Punycode
- 查询字符串
- 逐行读取
- REPL
- 报告
- 单一可执行文件应用
- SQLite
- 流
- 字符串解码器
- 测试运行器
- 定时器
- TLS/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用工具
- V8
- 虚拟机
- WASI
- Web Crypto API
- Web Streams API
- 工作线程
- Zlib
- 其他版本
- 选项
Web Streams API#
概述#
WHATWG Streams 标准(或称“Web Streams”)定义了一个用于处理流式数据的 API。它与 Node.js 的 Streams API 类似,但出现得更晚,并已成为跨多个 JavaScript 环境处理流式数据的“标准”API。
主要有三种类型的对象
ReadableStream- 表示流式数据的来源。WritableStream- 表示流式数据的目的地。TransformStream- 表示转换流式数据的算法。
ReadableStream 示例#
此示例创建了一个简单的 ReadableStream,它会每秒推送一次当前的 performance.now() 时间戳,并无限持续。一个异步可迭代对象被用来从流中读取数据。
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);const {
ReadableStream,
} = require('node:stream/web');
const {
setInterval: every,
} = require('node:timers/promises');
const {
performance,
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
(async () => {
for await (const value of stream)
console.log(value);
})();
Node.js 流的互操作性#
Node.js 流可以通过 stream.Readable、stream.Writable 和 stream.Duplex 对象上的 toWeb 和 fromWeb 方法与 Web Streams 进行相互转换。
更多详情请参阅相关文档
API#
类: ReadableStream#
new ReadableStream([underlyingSource [, strategy]])#
underlyingSource<Object>start<Function> 一个用户定义的函数,在ReadableStream创建时立即被调用。controller<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:
undefined或一个解析为undefined的 promise。
pull<Function> 一个用户定义的函数,当ReadableStream内部队列未满时会重复调用。此操作可以是同步或异步的。如果是异步的,直到前一个返回的 promise 被兑现后,该函数才会再次被调用。controller<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:一个解析为
undefined的 promise。
cancel<Function> 一个用户定义的函数,当ReadableStream被取消时调用。reason<any>- 返回:一个解析为
undefined的 promise。
type<string> 必须是'bytes'或undefined。autoAllocateChunkSize<number> 仅在type等于'bytes'时使用。当设置为非零值时,会自动为ReadableByteStreamController.byobRequest分配一个视图缓冲区。未设置时,必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader来传输数据。
strategy<Object>highWaterMark<number> 在施加背压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
readableStream.locked#
- 类型: <boolean> 如果此 <ReadableStream> 有一个活动的读取器,则设置为
true。
readableStream.locked 属性默认为 false,当有活动的读取器消耗流数据时,会切换为 true。
readableStream.getReader([options])#
options<Object>mode<string>'byob'或undefined
- 返回:<ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
使 readableStream.locked 变为 true。
readableStream.pipeThrough(transform[, options])#
transform<Object>readable<ReadableStream>transform.writable会将从该ReadableStream接收到的可能已修改的数据推送到此ReadableStream中。writable<WritableStream> 此ReadableStream的数据将被写入的WritableStream。
options<Object>preventAbort<boolean> 当为true时,此ReadableStream中的错误不会导致transform.writable被中止。preventCancel<boolean> 当为true时,目标transform.writable中的错误不会导致此ReadableStream被取消。preventClose<boolean> 当为true时,关闭此ReadableStream不会导致transform.writable被关闭。signal<AbortSignal> 允许使用 <AbortController> 来取消数据传输。
- 返回:<ReadableStream> 来自
transform.readable。
将此 <ReadableStream> 连接到 transform 参数中提供的一对 <ReadableStream> 和 <WritableStream>,使得此 <ReadableStream> 的数据被写入 transform.writable,可能经过转换,然后推送到 transform.readable。一旦管道配置完成,将返回 transform.readable。
在管道操作期间,使 readableStream.locked 变为 true。
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: Aconst {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
})();
readableStream.pipeTo(destination[, options])#
destination<WritableStream> 此ReadableStream的数据将被写入的 <WritableStream>。options<Object>preventAbort<boolean> 当为true时,此ReadableStream中的错误不会导致destination被中止。preventCancel<boolean> 当为true时,destination中的错误不会导致此ReadableStream被取消。preventClose<boolean> 当为true时,关闭此ReadableStream不会导致destination被关闭。signal<AbortSignal> 允许使用 <AbortController> 来取消数据传输。
- 返回:一个解析为
undefined的 promise
在管道操作期间,使 readableStream.locked 变为 true。
readableStream.tee()#
返回一对新的 <ReadableStream> 实例,此 ReadableStream 的数据将被转发到这两个实例中。每个实例将接收相同的数据。
使 readableStream.locked 变为 true。
readableStream.values([options])#
options<Object>preventCancel<boolean> 当为true时,可防止在异步迭代器突然终止时关闭 <ReadableStream>。默认值:false。
创建并返回一个可用于消费此 ReadableStream 数据的异步迭代器。
在异步迭代器活动期间,使 readableStream.locked 变为 true。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#
<ReadableStream> 对象支持使用 for await 语法的异步迭代器协议。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
异步迭代器将消费 <ReadableStream> 直到其终止。
默认情况下,如果异步迭代器提前退出(通过 break、return 或 throw),<ReadableStream> 将被关闭。为防止自动关闭 <ReadableStream>,请使用 readableStream.values() 方法获取异步迭代器,并将 preventCancel 选项设置为 true。
<ReadableStream> 必须未被锁定(即,它不能有已存在的活动读取器)。在异步迭代期间,<ReadableStream> 将被锁定。
使用 postMessage() 传输#
一个 <ReadableStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStream.from(iterable)#
iterable<Iterable> 实现Symbol.asyncIterator或Symbol.iterator可迭代协议的对象。
一个从可迭代对象创建新的 <ReadableStream> 的实用工具方法。
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'const { ReadableStream } = require('node:stream/web');
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
})();
要将生成的 <ReadableStream> 管道传输到一个 <WritableStream>,<Iterable> 应该产生一个由 <Buffer>、<TypedArray> 或 <DataView> 对象组成的序列。
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
await stream.pipeTo(createWritableStreamSomehow());const { ReadableStream } = require('node:stream/web');
const { Buffer } = require('node:buffer');
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
(async () => {
await stream.pipeTo(createWritableStreamSomehow());
})();
类: ReadableStreamDefaultReader#
默认情况下,不带参数调用 readableStream.getReader() 将返回一个 ReadableStreamDefaultReader 的实例。默认读取器将通过流的数据块视为不透明的值,这使得 <ReadableStream> 可以处理几乎任何 JavaScript 值。
new ReadableStreamDefaultReader(stream)#
stream<ReadableStream>
创建一个新的 <ReadableStreamDefaultReader>,并锁定到给定的 <ReadableStream>。
readableStreamDefaultReader.cancel([reason])#
reason<any>- 返回:一个解析为
undefined的 promise。
取消 <ReadableStream> 并返回一个在底层流被取消时兑现的 promise。
readableStreamDefaultReader.closed#
- 类型: <Promise> 当关联的 <ReadableStream> 关闭时,解析为
undefined;如果流出错或在流完成关闭前读取器的锁被释放,则被拒绝。
readableStreamDefaultReader.read()#
从底层的 <ReadableStream> 请求下一个数据块,并返回一个在数据可用时解析为该数据的 promise。
readableStreamDefaultReader.releaseLock()#
释放此读取器对底层 <ReadableStream> 的锁定。
类: ReadableStreamBYOBReader#
ReadableStreamBYOBReader 是面向字节的 <ReadableStream>(即在创建 ReadableStream 时 underlyingSource.type 设置为 'bytes' 的流)的另一种消费者。
BYOB 是“bring your own buffer”(自带缓冲区)的缩写。这是一种模式,可以更高效地读取面向字节的数据,避免不必要的复制。
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)#
stream<ReadableStream>
创建一个新的 ReadableStreamBYOBReader,并锁定到给定的 <ReadableStream>。
readableStreamBYOBReader.cancel([reason])#
reason<any>- 返回:一个解析为
undefined的 promise。
取消 <ReadableStream> 并返回一个在底层流被取消时兑现的 promise。
readableStreamBYOBReader.closed#
- 类型: <Promise> 当关联的 <ReadableStream> 关闭时,解析为
undefined;如果流出错或在流完成关闭前读取器的锁被释放,则被拒绝。
readableStreamBYOBReader.read(view[, options])#
view<Buffer> | <TypedArray> | <DataView>options<Object>min<number> 设置后,返回的 promise 只有在至少有min个元素可用时才会被兑现。未设置时,promise 在至少有一个元素可用时兑现。
- 返回:一个解析为对象的 promise
value<TypedArray> | <DataView>done<boolean>
从底层的 <ReadableStream> 请求下一个数据块,并返回一个在数据可用时解析为该数据的 promise。
不要将池化的 <Buffer> 对象实例传递给此方法。池化的 Buffer 对象是使用 Buffer.allocUnsafe() 或 Buffer.from() 创建的,或者通常由各种 node:fs 模块的回调返回。这些类型的 Buffer 使用共享的底层 <ArrayBuffer> 对象,该对象包含所有池化 Buffer 实例的数据。当一个 Buffer、<TypedArray> 或 <DataView> 传递到 readableStreamBYOBReader.read() 中时,视图的底层 ArrayBuffer 会被分离,这会使该 ArrayBuffer 上的所有现有视图失效。这可能会对您的应用程序造成灾难性的后果。
readableStreamBYOBReader.releaseLock()#
释放此读取器对底层 <ReadableStream> 的锁定。
类: ReadableStreamDefaultController#
每个 <ReadableStream> 都有一个控制器,负责流的内部状态和队列管理。ReadableStreamDefaultController 是非面向字节的 ReadableStream 的默认控制器实现。
readableStreamDefaultController.close()#
关闭与此控制器关联的 <ReadableStream>。
类: ReadableByteStreamController#
每个 <ReadableStream> 都有一个控制器,负责流的内部状态和队列管理。ReadableByteStreamController 用于面向字节的 ReadableStream。
readableByteStreamController.byobRequest#
readableByteStreamController.close()#
关闭与此控制器关联的 <ReadableStream>。
readableByteStreamController.enqueue(chunk)#
chunk<Buffer> | <TypedArray> | <DataView>
将一个新的数据块附加到 <ReadableStream> 的队列中。
类: ReadableStreamBYOBRequest#
在面向字节的流中使用 ReadableByteStreamController,并且使用 ReadableStreamBYOBReader 时,readableByteStreamController.byobRequest 属性提供对 ReadableStreamBYOBRequest 实例的访问,该实例代表当前的读取请求。此对象用于访问为读取请求提供的 ArrayBuffer/TypedArray,并提供用于通知数据已提供的方法。
readableStreamBYOBRequest.respond(bytesWritten)#
bytesWritten<number>
通知已有 bytesWritten 数量的字节被写入到 readableStreamBYOBRequest.view。
readableStreamBYOBRequest.respondWithNewView(view)#
view<Buffer> | <TypedArray> | <DataView>
通知请求已通过写入字节到一个新的 Buffer、TypedArray 或 DataView 中来完成。
readableStreamBYOBRequest.view#
- 类型:<Buffer> | <TypedArray> | <DataView>
类: WritableStream#
WritableStream 是流数据发送的目的地。
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])#
underlyingSink<Object>start<Function> 一个用户定义的函数,在WritableStream创建时立即被调用。controller<WritableStreamDefaultController>- 返回:
undefined或一个解析为undefined的 promise。
write<Function> 一个用户定义的函数,当一个数据块被写入WritableStream时调用。chunk<any>controller<WritableStreamDefaultController>- 返回:一个解析为
undefined的 promise。
close<Function> 一个用户定义的函数,当WritableStream关闭时调用。- 返回:一个解析为
undefined的 promise。
- 返回:一个解析为
abort<Function> 一个用户定义的函数,用于突然关闭WritableStream。reason<any>- 返回:一个解析为
undefined的 promise。
type<any>type选项为将来使用保留,并且必须为 undefined。
strategy<Object>highWaterMark<number> 在施加背压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
writableStream.abort([reason])#
reason<any>- 返回:一个解析为
undefined的 promise。
突然终止 WritableStream。所有排队的写入都将被取消,并且其关联的 promise 将被拒绝。
writableStream.getWriter()#
创建并返回一个新的写入器实例,可用于将数据写入 WritableStream。
writableStream.locked#
- 类型:<boolean>
writableStream.locked 属性默认为 false,当有活动的写入器附加到此 WritableStream 时,会切换为 true。
使用 postMessage() 传输#
一个 <WritableStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
类: WritableStreamDefaultWriter#
new WritableStreamDefaultWriter(stream)#
stream<WritableStream>
创建一个新的 WritableStreamDefaultWriter,并锁定到给定的 WritableStream。
writableStreamDefaultWriter.abort([reason])#
reason<any>- 返回:一个解析为
undefined的 promise。
突然终止 WritableStream。所有排队的写入都将被取消,并且其关联的 promise 将被拒绝。
writableStreamDefaultWriter.closed#
- 类型: <Promise> 当关联的 <WritableStream> 关闭时,解析为
undefined;如果流出错或在流完成关闭前写入器的锁被释放,则被拒绝。
writableStreamDefaultWriter.releaseLock()#
释放此写入器对底层 <ReadableStream> 的锁定。
writableStreamDefaultWriter.write([chunk])#
chunk<any>- 返回:一个解析为
undefined的 promise。
将一个新的数据块附加到 <WritableStream> 的队列中。
类: WritableStreamDefaultController#
WritableStreamDefaultController 管理 <WritableStream> 的内部状态。
writableStreamDefaultController.error([error])#
error<any>
由用户代码调用,以通知在处理 WritableStream 数据时发生错误。调用时,<WritableStream> 将被中止,当前待处理的写入将被取消。
writableStreamDefaultController.signal#
- 类型: <AbortSignal> 一个
AbortSignal,可用于在 <WritableStream> 被中止时取消待处理的写入或关闭操作。
类: TransformStream#
一个 TransformStream 由一个 <ReadableStream> 和一个 <WritableStream> 组成,它们被连接起来,使得写入 WritableStream 的数据在被推送到 ReadableStream 的队列之前被接收并可能被转换。
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
transformer<Object>start<Function> 一个用户定义的函数,在TransformStream创建时立即被调用。controller<TransformStreamDefaultController>- 返回:
undefined或一个解析为undefined的 promise
transform<Function> 一个用户定义的函数,接收并可能修改写入transformStream.writable的数据块,然后将其转发到transformStream.readable。chunk<any>controller<TransformStreamDefaultController>- 返回:一个解析为
undefined的 promise。
flush<Function> 一个用户定义的函数,在TransformStream的可写侧关闭之前立即调用,标志着转换过程的结束。controller<TransformStreamDefaultController>- 返回:一个解析为
undefined的 promise。
readableType<any>readableType选项为将来使用保留,并且必须为undefined。writableType<any>writableType选项为将来使用保留,并且必须为undefined。
writableStrategy<Object>highWaterMark<number> 在施加背压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
readableStrategy<Object>highWaterMark<number> 在施加背压之前的最大内部队列大小。size<Function> 一个用户定义的函数,用于确定每个数据块的大小。
transformStream.readable#
- 类型: <ReadableStream>
transformStream.writable#
- 类型: <WritableStream>
使用 postMessage() 传输#
一个 <TransformStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
类: TransformStreamDefaultController#
TransformStreamDefaultController 管理 TransformStream 的内部状态。
transformStreamDefaultController.error([reason])#
reason<any>
向可读和可写两侧发送信号,通知在处理转换数据时发生错误,导致两侧都被突然关闭。
transformStreamDefaultController.terminate()#
关闭传输的可读侧,并导致可写侧因错误而突然关闭。
类: ByteLengthQueuingStrategy#
byteLengthQueuingStrategy.size#
- 类型: <Function>
类: CountQueuingStrategy#
countQueuingStrategy.size#
- 类型: <Function>
类: TextEncoderStream#
new TextEncoderStream()#
创建一个新的 TextEncoderStream 实例。
textEncoderStream.readable#
- 类型: <ReadableStream>
textEncoderStream.writable#
- 类型: <WritableStream>
类: TextDecoderStream#
new TextDecoderStream([encoding[, options]])#
创建一个新的 TextDecoderStream 实例。
textDecoderStream.readable#
- 类型: <ReadableStream>
textDecoderStream.writable#
- 类型: <WritableStream>
类: CompressionStream#
compressionStream.readable#
- 类型: <ReadableStream>
compressionStream.writable#
- 类型: <WritableStream>
类: DecompressionStream#
decompressionStream.readable#
- 类型: <ReadableStream>
decompressionStream.writable#
- 类型: <WritableStream>
实用工具消费者#
实用工具消费者函数提供了消费流的常用选项。
它们通过以下方式访问
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 解析为一个包含流完整内容的
ArrayBuffer。
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
});
streamConsumers.blob(stream)#
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 解析为一个包含流完整内容的 <Blob>。
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
});
streamConsumers.buffer(stream)#
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 解析为一个包含流完整内容的 <Buffer>。
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});
streamConsumers.json(stream)#
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 解析为将流内容解析为 UTF-8 编码字符串,然后通过
JSON.parse()处理后的结果。
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
});
streamConsumers.text(stream)#
stream<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回:<Promise> 解析为将流内容解析为 UTF-8 编码字符串的结果。
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});