Node.js v26.0.0 文档
- Node.js v26.0.0
- 目录
- 索引
- 关于本文档
- 用法与示例
- 断言测试
- 异步上下文跟踪
- 异步钩子
- 缓冲区
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入器 API
- 子进程
- 集群
- 命令行选项
- 控制台
- 加密
- 调试器
- 已弃用的 API
- 诊断通道
- DNS
- 域
- 环境变量
- 错误
- 事件
- 文件系统
- 全局对象
- HTTP
- HTTP/2
- HTTPS
- 检查器
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:moduleAPI - 模块:包
- 模块:TypeScript
- 网络
- 可迭代流 API
- 操作系统
- 路径
- 性能钩子
- 权限
- 进程
- Punycode
- 查询字符串
- 逐行读取
- REPL
- 报告
- 单一可执行文件应用
- SQLite
- 流
- 字符串解码器
- 测试运行器
- 定时器
- TLS/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用工具
- V8
- 虚拟机
- WASI
- Web Crypto API
- Web Streams API
- 工作线程
- Zlib
- Zlib 可迭代压缩
- 其他版本
- 选项
可迭代流#
稳定性:1 - 实验性
node:stream/iter 模块提供了一种基于可迭代对象的流式 API,而不是基于事件驱动的 Readable/Writable/Transform 类层次结构,或者是 Web Streams 的 ReadableStream/WritableStream/TransformStream 接口。
此模块仅在启用 --experimental-stream-iter CLI 标志时可用。
流表示为 AsyncIterable<Uint8Array[]>(异步)或 Iterable<Uint8Array[]>(同步)。没有可扩展的基类——任何实现了可迭代协议的对象都可以参与。转换可以是纯函数,也可以是具有 transform 方法的对象。
数据以批次(每次迭代为 Uint8Array[])的形式流动,以摊销异步操作的成本。
import { from, pull, text } from 'node:stream/iter'; import { compressGzip, decompressGzip } from 'node:zlib/iter'; // Compress and decompress a string const compressed = pull(from('Hello, world!'), compressGzip()); const result = await text(pull(compressed, decompressGzip())); console.log(result); // 'Hello, world!'const { from, pull, text } = require('node:stream/iter'); const { compressGzip, decompressGzip } = require('node:zlib/iter'); async function run() { // Compress and decompress a string const compressed = pull(from('Hello, world!'), compressGzip()); const result = await text(pull(compressed, decompressGzip())); console.log(result); // 'Hello, world!' } run().catch(console.error);
import { open } from 'node:fs/promises'; import { text, pipeTo } from 'node:stream/iter'; import { compressGzip, decompressGzip } from 'node:zlib/iter'; // Read a file, compress, write to another file const src = await open('input.txt', 'r'); const dst = await open('output.gz', 'w'); await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true })); await src.close(); // Read it back const gz = await open('output.gz', 'r'); console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));const { open } = require('node:fs/promises'); const { text, pipeTo } = require('node:stream/iter'); const { compressGzip, decompressGzip } = require('node:zlib/iter'); async function run() { // Read a file, compress, write to another file const src = await open('input.txt', 'r'); const dst = await open('output.gz', 'w'); await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true })); await src.close(); // Read it back const gz = await open('output.gz', 'r'); console.log(await text(gz.pull(decompressGzip(), { autoClose: true }))); } run().catch(console.error);
概念#
字节流#
此 API 中的所有数据都表示为 Uint8Array 字节。当传递给 from()、push() 或 pipeTo() 时,字符串会自动进行 UTF-8 编码。这消除了关于编码的歧义,并实现了流与原生代码之间的零拷贝传输。
批处理#
每次迭代都会产生一个批次——即 Uint8Array 数据块的数组(Uint8Array[])。批处理通过将 await 和 Promise 创建的成本摊销到多个数据块上来提高效率。如果消费者一次只处理一个数据块,只需对内部数组进行迭代即可。
for await (const batch of source) { for (const chunk of batch) { handle(chunk); } }async function run() { for await (const batch of source) { for (const chunk of batch) { handle(chunk); } } }
转换#
转换有两种形式:
-
无状态 —— 一个函数
(chunks, options) => result,每个批次调用一次。接收Uint8Array[](或作为刷新信号的null)以及一个options对象。返回Uint8Array[]、null或数据块的可迭代对象。 -
有状态 —— 一个对象
{ transform(source, options) },其中transform是一个生成器(同步或异步),接收整个上游可迭代对象和一个options对象,并生成输出。此形式用于压缩、加密以及任何需要在批次间进行缓冲的转换。
这两种形式都会接收一个包含以下属性的 options 参数:
options.signal<AbortSignal>一个 AbortSignal,当管道被取消、遇到错误或消费者停止读取时触发。转换可以检查signal.aborted或监听'abort'事件来执行早期清理。
刷新信号(null)在源结束之后发送,让转换有机会发出尾部数据(例如压缩页脚)。
// Stateless: uppercase transform
const upper = (chunks) => {
if (chunks === null) return null; // flush
return chunks.map((c) => new TextEncoder().encode(
new TextDecoder().decode(c).toUpperCase(),
));
};
// Stateful: line splitter
const lines = {
transform: async function*(source) {
let partial = '';
for await (const chunks of source) {
if (chunks === null) {
if (partial) yield [new TextEncoder().encode(partial)];
continue;
}
for (const chunk of chunks) {
const str = partial + new TextDecoder().decode(chunk);
const parts = str.split('\n');
partial = parts.pop();
for (const line of parts) {
yield [new TextEncoder().encode(`${line}\n`)];
}
}
}
},
};
拉取与推送#
此 API 支持两种模型:
-
拉取 (Pull) —— 数据按需流动。
pull()和pullSync()创建惰性管道,仅在消费者进行迭代时才从源读取数据。 -
推送 (Push) —— 数据被显式写入。
push()创建了一个带有背压(backpressure)的写入者/可读对。写入者将数据推入;可读端作为异步可迭代对象被消费。
背压#
拉取流具有自然的背压 —— 消费者驱动节奏,因此源的读取速度绝不会超过消费者的处理速度。推送流需要显式背压,因为生产者和消费者是独立运行的。push()、broadcast() 和 share() 上的 highWaterMark 和 backpressure 选项控制此机制。
双缓冲区模型#
推送流使用两部分缓冲系统。可以将其想象成通过软管(挂起的写入)填充桶(槽位),并带有一个在桶满时关闭的浮动阀门。
highWaterMark (e.g., 3)
|
Producer v
| +---------+
v | |
[ write() ] ----+ +--->| slots |---> Consumer pulls
[ write() ] | | | (bucket)| for await (...)
[ write() ] v | +---------+
+--------+ ^
| pending| |
| writes | float valve
| (hose) | (backpressure)
+--------+
^
|
'strict' mode limits this too!
-
槽位(桶) —— 准备好供消费者使用的数据,上限为
highWaterMark。当消费者拉取时,它会将所有槽位一次性排空到一个批次中。 -
挂起的写入(软管) —— 等待槽位空间的写入。消费者排空后,挂起的写入被提升到空出来的槽位中,它们的 Promise 得到解决。
各策略如何使用这些缓冲区:
| 策略 | 槽位限制 | 挂起写入限制 |
|---|---|---|
'strict' |
highWaterMark |
highWaterMark |
'block' |
highWaterMark |
无限制 |
'drop-oldest' |
highWaterMark |
N/A(从不等待) |
'drop-newest' |
highWaterMark |
N/A(从不等待) |
严格模式(默认)#
严格模式捕获了“即发即忘”模式,即生产者调用 write() 而不等待,这会导致内存无限制增长。它将槽位缓冲区和挂起写入队列都限制为 highWaterMark。
如果您适当地 await 每次写入,则在任何时候只能有一个挂起的写入(您自己的),因此永远不会达到挂起写入限制。未等待的写入会在挂起队列中累积,一旦溢出就会抛出错误。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 16 }); // Consumer must run concurrently -- without it, the first write // that fills the buffer blocks the producer forever. const consuming = text(readable); // GOOD: awaited writes. The producer waits for the consumer to // make room when the buffer is full. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming);const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 16 }); // Consumer must run concurrently -- without it, the first write // that fills the buffer blocks the producer forever. const consuming = text(readable); // GOOD: awaited writes. The producer waits for the consumer to // make room when the buffer is full. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming); } run().catch(console.error);
忘记 await 最终会导致抛出错误。
// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"
阻塞模式#
阻塞模式将槽位限制为 highWaterMark,但对挂起写入队列不设限制。已等待的写入会阻塞,直到消费者腾出空间,这与严格模式类似。区别在于未等待的写入会静默地无限排队,而不是抛出错误 —— 如果生产者忘记 await,这可能导致内存泄漏。
这是现有的 Node.js 经典流和 Web Streams 默认的模式。当您控制生产者并确保其正确 await 时,或者在从这些 API 迁移代码时使用它。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 16, backpressure: 'block', }); const consuming = text(readable); // Safe -- awaited writes block until the consumer reads. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming);const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 16, backpressure: 'block', }); const consuming = text(readable); // Safe -- awaited writes block until the consumer reads. for (const item of dataset) { await writer.write(item); } await writer.end(); console.log(await consuming); } run().catch(console.error);
丢弃最旧#
写入永不等待。当槽位缓冲区已满时,最旧的缓冲块会被逐出,以便为传入的写入腾出空间。消费者总是看到最新的数据。适用于实时馈送、遥测或任何旧数据价值低于当前数据的场景。
import { push } from 'node:stream/iter'; // Keep only the 5 most recent readings const { writer, readable } = push({ highWaterMark: 5, backpressure: 'drop-oldest', });const { push } = require('node:stream/iter'); // Keep only the 5 most recent readings const { writer, readable } = push({ highWaterMark: 5, backpressure: 'drop-oldest', });
丢弃最新#
写入永不等待。当槽位缓冲区已满时,传入的写入会被静默丢弃。消费者处理已经缓冲的内容,而不会被新数据压垮。适用于速率限制或在高压下卸载。
import { push } from 'node:stream/iter'; // Accept up to 10 buffered items; discard anything beyond that const { writer, readable } = push({ highWaterMark: 10, backpressure: 'drop-newest', });const { push } = require('node:stream/iter'); // Accept up to 10 buffered items; discard anything beyond that const { writer, readable } = push({ highWaterMark: 10, backpressure: 'drop-newest', });
写入者接口#
写入者是任何符合 Writer 接口的对象。只需 write() 方法;其他所有方法均为可选。
每个异步方法都有一个对应的同步 *Sync 方法,设计用于“尝试-回退”模式:首先尝试快速的同步路径,只有当同步调用表明无法完成时,才回退到异步版本。
if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err); // Always synchronous, no fallback needed
writer.desiredSize#
达到高水位线之前可用的缓冲区槽位数。如果写入者已关闭或消费者已断开连接,则返回 null。
该值总是非负的。
writer.end([options])#
options<Object>signal<AbortSignal>仅取消此操作。该信号仅取消挂起的end()调用;它不会使写入者本身失败。
- 返回:{Promise
} 总写入字节数。
发出不再有数据写入的信号。
writer.endSync()#
- 返回:
<number>总写入字节数,如果写入者未打开则为-1。
writer.end() 的同步变体。如果写入者已关闭或出错,则返回 -1。可用作“尝试-回退”模式。
const result = writer.endSync();
if (result < 0) {
writer.end();
}
writer.fail(reason)#
reason<any>
使写入者进入终结错误状态。如果写入者已关闭或出错,此操作无效。与 write() 和 end() 不同,fail() 是无条件同步的,因为使写入者失败是一个纯状态转换,没有异步工作需要执行。
writer.write(chunk[, options])#
chunk<Uint8Array>|<string>options<Object>signal<AbortSignal>仅取消此写入操作。该信号仅取消挂起的write()调用;它不会使写入者本身失败。
- 返回:{Promise
}
写入一个数据块。当缓冲区空间可用时,Promise 解析。
writer.writeSync(chunk)#
chunk<Uint8Array>|<string>- 返回:
<boolean>如果写入被接受则为true,如果缓冲区已满则为false。
同步写入。不阻塞;如果背压生效,返回 false。
writer.writev(chunks[, options])#
chunks<Uint8Array[]>|<string[]>options<Object>signal<AbortSignal>仅取消此写入操作。该信号仅取消挂起的writev()调用;它不会使写入者本身失败。
- 返回:{Promise
}
以单个批次写入多个数据块。
writer.writevSync(chunks)#
chunks<Uint8Array[]>|<string[]>- 返回:
<boolean>如果写入被接受则为true,如果缓冲区已满则为false。
同步批次写入。
stream/iter 模块#
所有函数既可以作为命名导出,也可以作为 Stream 命名空间对象的属性使用。
// Named exports import { from, pull, bytes, Stream } from 'node:stream/iter'; // Namespace access Stream.from('hello');// Named exports const { from, pull, bytes, Stream } = require('node:stream/iter'); // Namespace access Stream.from('hello');
在模块说明符上包含 node: 前缀是可选的。
数据源#
from(input)#
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<AsyncIterable>|<Object>不能为null或undefined。- 返回:{AsyncIterable<Uint8Array[]>}
从给定输入创建一个异步字节流。字符串以 UTF-8 编码。ArrayBuffer 和 ArrayBufferView 值被包装为 Uint8Array。数组和可迭代对象被递归展平并规范化。
实现 Symbol.for('Stream.toAsyncStreamable') 或 Symbol.for('Stream.toStreamable') 的对象将通过这些协议进行转换。toAsyncStreamable 协议优先于 toStreamable,后者优先于迭代协议(Symbol.asyncIterator, Symbol.iterator)。
import { Buffer } from 'node:buffer'; import { from, text } from 'node:stream/iter'; console.log(await text(from('hello'))); // 'hello' console.log(await text(from(Buffer.from('hello')))); // 'hello'const { Buffer } = require('node:buffer'); const { from, text } = require('node:stream/iter'); async function run() { console.log(await text(from('hello'))); // 'hello' console.log(await text(from(Buffer.from('hello')))); // 'hello' } run().catch(console.error);
fromSync(input)#
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<Object>不能为null或undefined。- 返回:{Iterable<Uint8Array[]>}
from() 的同步版本。返回一个同步可迭代对象。不能接受异步可迭代对象或 Promise。实现 Symbol.for('Stream.toStreamable') 的对象将通过该协议转换(优先于 Symbol.iterator)。toAsyncStreamable 协议被完全忽略。
import { fromSync, textSync } from 'node:stream/iter'; console.log(textSync(fromSync('hello'))); // 'hello'const { fromSync, textSync } = require('node:stream/iter'); console.log(textSync(fromSync('hello'))); // 'hello'
管道#
pipeTo(source[, ...transforms], writer[, options])#
source<AsyncIterable>|<Iterable>数据源。...transforms<Function>|<Object>零个或多个要应用的转换。writer<Object>具有write(chunk)方法的目标。options<Object>signal<AbortSignal>中止管道。preventClose<boolean>如果为true,则在源结束时不要调用writer.end()。默认值:false。preventFail<boolean>如果为true,则在错误时不要调用writer.fail()。默认值:false。
- 返回:{Promise
} 总写入字节数。
通过转换将源管入写入者。如果写入者具有 writev(chunks) 方法,则整个批次会在单个调用中传入(实现分散/收集 I/O)。
如果写入者实现了可选的 *Sync 方法(writeSync、writevSync、endSync),pipeTo() 将尝试首先使用同步方法作为快速路径,仅当同步方法表明它们无法完成时(例如背压或等待下一个滴答)才回退到异步版本。fail() 总是同步调用的。
import { from, pipeTo } from 'node:stream/iter'; import { compressGzip } from 'node:zlib/iter'; import { open } from 'node:fs/promises'; const fh = await open('output.gz', 'w'); const totalBytes = await pipeTo( from('Hello, world!'), compressGzip(), fh.writer({ autoClose: true }), );const { from, pipeTo } = require('node:stream/iter'); const { compressGzip } = require('node:zlib/iter'); const { open } = require('node:fs/promises'); async function run() { const fh = await open('output.gz', 'w'); const totalBytes = await pipeTo( from('Hello, world!'), compressGzip(), fh.writer({ autoClose: true }), ); } run().catch(console.error);
pipeToSync(source[, ...transforms], writer[, options])#
source<Iterable>同步数据源。...transforms<Function>|<Object>零个或多个同步转换。writer<Object>具有write(chunk)方法的目标。options<Object>- 返回:
<number>总写入字节数。
pipeTo() 的同步版本。source、所有转换和 writer 必须是同步的。不能接受异步可迭代对象或 Promise。
要实现此功能,writer 必须具有 *Sync 方法(writeSync、writevSync、endSync)和 fail()。
pull(source[, ...transforms][, options])#
source<AsyncIterable>|<Iterable>数据源。...transforms<Function>|<Object>零个或多个要应用的转换。options<Object>signal<AbortSignal>中止管道。
- 返回:{AsyncIterable<Uint8Array[]>}
创建一个惰性异步管道。除非返回的可迭代对象被消费,否则不会从 source 读取数据。转换按顺序应用。
import { from, pull, text } from 'node:stream/iter'; const asciiUpper = (chunks) => { if (chunks === null) return null; return chunks.map((c) => { for (let i = 0; i < c.length; i++) { c[i] -= (c[i] >= 97 && c[i] <= 122) * 32; } return c; }); }; const result = pull(from('hello'), asciiUpper); console.log(await text(result)); // 'HELLO'const { from, pull, text } = require('node:stream/iter'); const asciiUpper = (chunks) => { if (chunks === null) return null; return chunks.map((c) => { for (let i = 0; i < c.length; i++) { c[i] -= (c[i] >= 97 && c[i] <= 122) * 32; } return c; }); }; async function run() { const result = pull(from('hello'), asciiUpper); console.log(await text(result)); // 'HELLO' } run().catch(console.error);
使用 AbortSignal
import { pull } from 'node:stream/iter'; const ac = new AbortController(); const result = pull(source, transform, { signal: ac.signal }); ac.abort(); // Pipeline throws AbortError on next iterationconst { pull } = require('node:stream/iter'); const ac = new AbortController(); const result = pull(source, transform, { signal: ac.signal }); ac.abort(); // Pipeline throws AbortError on next iteration
pullSync(source[, ...transforms])#
source<Iterable>同步数据源。...transforms<Function>|<Object>零个或多个同步转换。- 返回:{Iterable<Uint8Array[]>}
pull() 的同步版本。所有转换必须是同步的。
推送流#
push([...transforms][, options])#
...transforms<Function>|<Object>应用于可读侧的可选转换。options<Object>highWaterMark<number>在应用背压之前缓冲槽位的最大数量。必须 >= 1;小于 1 的值被截断为 1。默认值:4。backpressure<string>背压策略:'strict'、'block'、'drop-oldest'或'drop-newest'。默认值:'strict'。signal<AbortSignal>中止流。
- 返回:
<Object>writer{PushWriter} 写入侧。readable{AsyncIterable<Uint8Array[]>} 可读侧。
创建带有背压的推送流。写入者推入数据;可读侧作为异步可迭代对象消费。
import { push, text } from 'node:stream/iter'; const { writer, readable } = push(); // Producer and consumer must run concurrently. With strict backpressure // (the default), awaited writes block until the consumer reads. const producing = (async () => { await writer.write('hello'); await writer.write(' world'); await writer.end(); })(); console.log(await text(readable)); // 'hello world' await producing;const { push, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push(); // Producer and consumer must run concurrently. With strict backpressure // (the default), awaited writes block until the consumer reads. const producing = (async () => { await writer.write('hello'); await writer.write(' world'); await writer.end(); })(); console.log(await text(readable)); // 'hello world' await producing; } run().catch(console.error);
由 push() 返回的写入者符合 [写入者接口][]。
双工通道#
duplex([options])#
创建一对连接的双工通道以进行双向通信,类似于 socketpair()。写入一个通道写入者的数据会出现在另一个通道的可读端。
每个通道具有:
writer— 用于向对等方发送数据的 [写入者接口][] 对象。readable— 用于从对等方读取数据的AsyncIterable<Uint8Array[]>。close()— 关闭通道的此端(幂等)。[Symbol.asyncDispose]()— 支持await using的异步处置。
import { duplex, text } from 'node:stream/iter'; const [client, server] = duplex(); // Server echoes back const serving = (async () => { for await (const chunks of server.readable) { await server.writer.writev(chunks); } })(); await client.writer.write('hello'); await client.writer.end(); console.log(await text(server.readable)); // handled by echo await serving;const { duplex, text } = require('node:stream/iter'); async function run() { const [client, server] = duplex(); // Server echoes back const serving = (async () => { for await (const chunks of server.readable) { await server.writer.writev(chunks); } })(); await client.writer.write('hello'); await client.writer.end(); console.log(await text(server.readable)); // handled by echo await serving; } run().catch(console.error);
消费者#
array(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:{Promise<Uint8Array[]>}
将所有数据块收集为 Uint8Array 值的数组(不进行连接)。
arrayBuffer(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:{Promise
}
将所有字节收集到 ArrayBuffer 中。
arrayBufferSync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:
<ArrayBuffer>
arrayBuffer() 的同步版本。
arraySync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:
<Uint8Array[]>
array() 的同步版本。
bytes(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>signal<AbortSignal>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:{Promise
}
将流中的所有字节收集到单个 Uint8Array 中。
import { from, bytes } from 'node:stream/iter'; const data = await bytes(from('hello')); console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]const { from, bytes } = require('node:stream/iter'); async function run() { const data = await bytes(from('hello')); console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ] } run().catch(console.error);
bytesSync(source[, options])#
source{Iterable<Uint8Array[]>}options<Object>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:
<Uint8Array>
bytes() 的同步版本。
text(source[, options])#
source{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}options<Object>encoding<string>文本编码。默认值:'utf-8'。signal<AbortSignal>limit<number>要消费的最大字节数。如果收集的总字节数超过限制,则抛出ERR_OUT_OF_RANGE错误。
- 返回:{Promise
}
收集所有字节并解码为文本。
import { from, text } from 'node:stream/iter'; console.log(await text(from('hello'))); // 'hello'const { from, text } = require('node:stream/iter'); async function run() { console.log(await text(from('hello'))); // 'hello' } run().catch(console.error);
textSync(source[, options])#
text() 的同步版本。
工具#
ondrain(drainable)#
drainable<Object>实现 drainable 协议的对象。- 返回:{Promise
|null}
等待 drainable 写入者的背压清除。返回一个 Promise,当写入者可以接受更多数据时解析为 true;如果对象没有实现 drainable 协议,则返回 null。
import { push, ondrain, text } from 'node:stream/iter'; const { writer, readable } = push({ highWaterMark: 2 }); writer.writeSync('a'); writer.writeSync('b'); // Start consuming so the buffer can actually drain const consuming = text(readable); // Buffer is full -- wait for drain const canWrite = await ondrain(writer); if (canWrite) { await writer.write('c'); } await writer.end(); await consuming;const { push, ondrain, text } = require('node:stream/iter'); async function run() { const { writer, readable } = push({ highWaterMark: 2 }); writer.writeSync('a'); writer.writeSync('b'); // Start consuming so the buffer can actually drain const consuming = text(readable); // Buffer is full -- wait for drain const canWrite = await ondrain(writer); if (canWrite) { await writer.write('c'); } await writer.end(); await consuming; } run().catch(console.error);
merge(...sources[, options])#
...sources{AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} 两个或更多可迭代对象。options<Object>signal<AbortSignal>
- 返回:{AsyncIterable<Uint8Array[]>}
通过按时间顺序(哪个源先产生数据就先产生哪个)产生批次来合并多个异步可迭代对象。所有源被并发消费。
import { from, merge, text } from 'node:stream/iter'; const merged = merge(from('hello '), from('world')); console.log(await text(merged)); // Order depends on timingconst { from, merge, text } = require('node:stream/iter'); async function run() { const merged = merge(from('hello '), from('world')); console.log(await text(merged)); // Order depends on timing } run().catch(console.error);
tap(callback)#
callback<Function>(chunks) => void对每个批次调用。- 返回:
<Function>无状态转换。
创建一个传递转换,在不修改批次的情况下观察它们。适用于日志记录、指标或调试。
import { from, pull, text, tap } from 'node:stream/iter'; const result = pull( from('hello'), tap((chunks) => console.log('Batch size:', chunks.length)), ); console.log(await text(result));const { from, pull, text, tap } = require('node:stream/iter'); async function run() { const result = pull( from('hello'), tap((chunks) => console.log('Batch size:', chunks.length)), ); console.log(await text(result)); } run().catch(console.error);
tap() 故意不阻止 tapping 回调对数据块进行原地修改;但返回值会被忽略。
tapSync(callback)#
callback<Function>- 返回:
<Function>
tap() 的同步版本。
多消费者#
broadcast([options])#
options<Object>highWaterMark<number>槽位中的缓冲区大小。必须 >= 1;小于 1 的值被截断为 1。默认值:16。backpressure<string>'strict'、'block'、'drop-oldest'或'drop-newest'。默认值:'strict'。signal<AbortSignal>
- 返回:
<Object>writer{BroadcastWriter}broadcast{Broadcast}
创建推送模型的多消费者广播通道。单个写入者向多个消费者推送数据。每个消费者都有一个进入共享缓冲区的独立游标。
import { broadcast, text } from 'node:stream/iter'; const { writer, broadcast: bc } = broadcast(); // Create consumers before writing const c1 = bc.push(); // Consumer 1 const c2 = bc.push(); // Consumer 2 // Producer and consumers must run concurrently. Awaited writes // block when the buffer fills until consumers read. const producing = (async () => { await writer.write('hello'); await writer.end(); })(); const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' await producing;const { broadcast, text } = require('node:stream/iter'); async function run() { const { writer, broadcast: bc } = broadcast(); // Create consumers before writing const c1 = bc.push(); // Consumer 1 const c2 = bc.push(); // Consumer 2 // Producer and consumers must run concurrently. Awaited writes // block when the buffer fills until consumers read. const producing = (async () => { await writer.write('hello'); await writer.end(); })(); const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' await producing; } run().catch(console.error);
broadcast.bufferSize#
当前缓冲的数据块数量。
broadcast.cancel([reason])#
reason<Error>
取消广播。所有消费者都会收到错误。
broadcast.consumerCount#
活跃消费者的数量。
broadcast.push([...transforms][, options])#
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- 返回:{AsyncIterable<Uint8Array[]>}
创建新消费者。每个消费者从订阅点开始接收广播中写入的所有数据。可选转换应用于此消费者的数据视图。
broadcast[Symbol.dispose]()#
broadcast.cancel() 的别名。
Broadcast.from(input[, options])#
input<AsyncIterable>|<Iterable>options<Object>与broadcast()相同。- 返回:
<Object>{ writer, broadcast }
从现有源创建 {Broadcast}。源被自动消费并推送到所有订阅者。
share(source[, options])#
source<AsyncIterable>要共享的源。options<Object>- 返回:{Share}
创建拉取模型的多消费者共享流。与 broadcast() 不同,源仅在消费者拉取时读取。多个消费者共享单个缓冲区。
import { from, share, text } from 'node:stream/iter'; const shared = share(from('hello')); const c1 = shared.pull(); const c2 = shared.pull(); // Consume concurrently to avoid deadlock with small buffers. const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello'const { from, share, text } = require('node:stream/iter'); async function run() { const shared = share(from('hello')); const c1 = shared.pull(); const c2 = shared.pull(); // Consume concurrently to avoid deadlock with small buffers. const [r1, r2] = await Promise.all([text(c1), text(c2)]); console.log(r1); // 'hello' console.log(r2); // 'hello' } run().catch(console.error);
share.bufferSize#
当前缓冲的数据块数量。
share.cancel([reason])#
reason<Error>
取消共享。所有消费者都会收到错误。
share.consumerCount#
活跃消费者的数量。
share.pull([...transforms][, options])#
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- 返回:{AsyncIterable<Uint8Array[]>}
创建共享源的新消费者。
share[Symbol.dispose]()#
share.cancel() 的别名。
Share.from(input[, options])#
input<AsyncIterable>options<Object>与share()相同。- 返回:{Share}
从现有源创建 {Share}。
shareSync(source[, options])#
source<Iterable>要共享的同步源。options<Object>- 返回:{SyncShare}
share() 的同步版本。
SyncShare.fromSync(input[, options])#
input<Iterable>options<Object>- 返回:{SyncShare}
压缩与解压缩转换#
用于 pull()、pullSync()、pipeTo() 和 pipeToSync() 的压缩和解压缩转换可通过 node:zlib/iter 模块获得。详情请参阅 node:zlib/iter 文档。
协议符号#
这些众所周知的符号允许第三方对象参与流式传输协议,而无需直接从 node:stream/iter 导入。
Stream.broadcastProtocol#
- 值:
Symbol.for('Stream.broadcastProtocol')
该值必须是一个函数。当被 Broadcast.from() 调用时,它接收传递给 Broadcast.from() 的选项,并必须返回符合 {Broadcast} 接口的对象。实现完全自定义 —— 它可以以任何方式管理消费者、缓冲和背压。
import { Broadcast, text } from 'node:stream/iter'; // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); console.log(await text(consumer)); // 'hello'const { Broadcast, text } = require('node:stream/iter'); // This example defers to the built-in Broadcast, but a custom // implementation could use any mechanism. class MessageBus { #broadcast; #writer; constructor() { const { writer, broadcast } = Broadcast(); this.#writer = writer; this.#broadcast = broadcast; } [Symbol.for('Stream.broadcastProtocol')](options) { return this.#broadcast; } send(data) { this.#writer.write(new TextEncoder().encode(data)); } close() { this.#writer.end(); } } const bus = new MessageBus(); const { broadcast } = Broadcast.from(bus); const consumer = broadcast.push(); bus.send('hello'); bus.close(); text(consumer).then(console.log); // 'hello'
Stream.drainableProtocol#
- 值:
Symbol.for('Stream.drainableProtocol')
实现此以使写入者与 ondrain() 兼容。该方法应返回一个在背压清除时解析的 Promise,如果无背压则返回 null。
import { ondrain } from 'node:stream/iter'; class CustomWriter { #queue = []; #drain = null; #closed = false; [Symbol.for('Stream.drainableProtocol')]() { if (this.#closed) return null; if (this.#queue.length < 3) return Promise.resolve(true); this.#drain ??= Promise.withResolvers(); return this.#drain.promise; } write(chunk) { this.#queue.push(chunk); } flush() { this.#queue.length = 0; this.#drain?.resolve(true); this.#drain = null; } close() { this.#closed = true; } } const writer = new CustomWriter(); const ready = ondrain(writer); console.log(ready); // Promise { true } -- no backpressureconst { ondrain } = require('node:stream/iter'); class CustomWriter { #queue = []; #drain = null; #closed = false; [Symbol.for('Stream.drainableProtocol')]() { if (this.#closed) return null; if (this.#queue.length < 3) return Promise.resolve(true); this.#drain ??= Promise.withResolvers(); return this.#drain.promise; } write(chunk) { this.#queue.push(chunk); } flush() { this.#queue.length = 0; this.#drain?.resolve(true); this.#drain = null; } close() { this.#closed = true; } } const writer = new CustomWriter(); const ready = ondrain(writer); console.log(ready); // Promise { true } -- no backpressure
Stream.shareProtocol#
- 值:
Symbol.for('Stream.shareProtocol')
该值必须是一个函数。当被 Share.from() 调用时,它接收传递给 Share.from() 的选项,并必须返回符合 {Share} 接口的对象。实现完全自定义 —— 它可以以任何方式管理共享源、消费者、缓冲和背压。
import { share, Share, text } from 'node:stream/iter'; // This example defers to the built-in share(), but a custom // implementation could use any mechanism. class DataPool { #share; constructor(source) { this.#share = share(source); } [Symbol.for('Stream.shareProtocol')](options) { return this.#share; } } const pool = new DataPool( (async function* () { yield 'hello'; })(), ); const shared = Share.from(pool); const consumer = shared.pull(); console.log(await text(consumer)); // 'hello'const { share, Share, text } = require('node:stream/iter'); // This example defers to the built-in share(), but a custom // implementation could use any mechanism. class DataPool { #share; constructor(source) { this.#share = share(source); } [Symbol.for('Stream.shareProtocol')](options) { return this.#share; } } const pool = new DataPool( (async function* () { yield 'hello'; })(), ); const shared = Share.from(pool); const consumer = shared.pull(); text(consumer).then(console.log); // 'hello'
Stream.shareSyncProtocol#
- 值:
Symbol.for('Stream.shareSyncProtocol')
该值必须是一个函数。当被 SyncShare.fromSync() 调用时,它接收传递给 SyncShare.fromSync() 的选项,并必须返回符合 {SyncShare} 接口的对象。实现完全自定义 —— 它可以以任何方式管理共享源、消费者和缓冲。
import { shareSync, SyncShare, textSync } from 'node:stream/iter'; // This example defers to the built-in shareSync(), but a custom // implementation could use any mechanism. class SyncDataPool { #share; constructor(source) { this.#share = shareSync(source); } [Symbol.for('Stream.shareSyncProtocol')](options) { return this.#share; } } const encoder = new TextEncoder(); const pool = new SyncDataPool( function* () { yield [encoder.encode('hello')]; }(), ); const shared = SyncShare.fromSync(pool); const consumer = shared.pull(); console.log(textSync(consumer)); // 'hello'const { shareSync, SyncShare, textSync } = require('node:stream/iter'); // This example defers to the built-in shareSync(), but a custom // implementation could use any mechanism. class SyncDataPool { #share; constructor(source) { this.#share = shareSync(source); } [Symbol.for('Stream.shareSyncProtocol')](options) { return this.#share; } } const encoder = new TextEncoder(); const pool = new SyncDataPool( function* () { yield [encoder.encode('hello')]; }(), ); const shared = SyncShare.fromSync(pool); const consumer = shared.pull(); console.log(textSync(consumer)); // 'hello'
Stream.toAsyncStreamable#
- 值:
Symbol.for('Stream.toAsyncStreamable')
该值必须是一个将对象转换为可流式值的函数。当在流式管道中的任何位置(作为传递给 from() 的源,或作为转换返回的值)遇到对象时,会调用此方法来产生实际数据。它可以返回(或解析为)任何可流式值:字符串、Uint8Array、AsyncIterable、Iterable 或另一个可流式对象。
import { from, text } from 'node:stream/iter'; class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toAsyncStreamable')]() { return `hello ${this.#name}`; } } const stream = from(new Greeting('world')); console.log(await text(stream)); // 'hello world'const { from, text } = require('node:stream/iter'); class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toAsyncStreamable')]() { return `hello ${this.#name}`; } } const stream = from(new Greeting('world')); text(stream).then(console.log); // 'hello world'
Stream.toStreamable#
- 值:
Symbol.for('Stream.toStreamable')
该值必须是一个同步将对象转换为可流式值的函数。当在流式管道中的任何位置(作为传递给 fromSync() 的源,或作为同步转换返回的值)遇到对象时,会调用此方法来产生实际数据。它必须同步返回一个可流式值:字符串、Uint8Array 或 Iterable。
import { fromSync, textSync } from 'node:stream/iter'; class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toStreamable')]() { return `hello ${this.#name}`; } } const stream = fromSync(new Greeting('world')); console.log(textSync(stream)); // 'hello world'const { fromSync, textSync } = require('node:stream/iter'); class Greeting { #name; constructor(name) { this.#name = name; } [Symbol.for('Stream.toStreamable')]() { return `hello ${this.#name}`; } } const stream = fromSync(new Greeting('world')); console.log(textSync(stream)); // 'hello world'