可迭代流#

稳定性:1 - 实验性

node:stream/iter 模块提供了一种基于可迭代对象的流式 API,而不是基于事件驱动的 Readable/Writable/Transform 类层次结构,或者是 Web Streams 的 ReadableStream/WritableStream/TransformStream 接口。

此模块仅在启用 --experimental-stream-iter CLI 标志时可用。

流表示为 AsyncIterable<Uint8Array[]>(异步)或 Iterable<Uint8Array[]>(同步)。没有可扩展的基类——任何实现了可迭代协议的对象都可以参与。转换可以是纯函数,也可以是具有 transform 方法的对象。

数据以批次(每次迭代为 Uint8Array[])的形式流动,以摊销异步操作的成本。

import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'
const { from, pull, text } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Compress and decompress a string
  const compressed = pull(from('Hello, world!'), compressGzip());
  const result = await text(pull(compressed, decompressGzip()));
  console.log(result); // 'Hello, world!'
}

run().catch(console.error);
import { open } from 'node:fs/promises';
import { text, pipeTo } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Read a file, compress, write to another file
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();

// Read it back
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
const { open } = require('node:fs/promises');
const { text, pipeTo } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Read a file, compress, write to another file
  const src = await open('input.txt', 'r');
  const dst = await open('output.gz', 'w');
  await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
  await src.close();

  // Read it back
  const gz = await open('output.gz', 'r');
  console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
}

run().catch(console.error);

概念#

字节流#

此 API 中的所有数据都表示为 Uint8Array 字节。当传递给 from()push()pipeTo() 时,字符串会自动进行 UTF-8 编码。这消除了关于编码的歧义,并实现了流与原生代码之间的零拷贝传输。

批处理#

每次迭代都会产生一个批次——即 Uint8Array 数据块的数组(Uint8Array[])。批处理通过将 await 和 Promise 创建的成本摊销到多个数据块上来提高效率。如果消费者一次只处理一个数据块,只需对内部数组进行迭代即可。

for await (const batch of source) {
  for (const chunk of batch) {
    handle(chunk);
  }
}
async function run() {
  for await (const batch of source) {
    for (const chunk of batch) {
      handle(chunk);
    }
  }
}

转换#

转换有两种形式:

  • 无状态 —— 一个函数 (chunks, options) => result,每个批次调用一次。接收 Uint8Array[](或作为刷新信号的 null)以及一个 options 对象。返回 Uint8Array[]null 或数据块的可迭代对象。

  • 有状态 —— 一个对象 { transform(source, options) },其中 transform 是一个生成器(同步或异步),接收整个上游可迭代对象和一个 options 对象,并生成输出。此形式用于压缩、加密以及任何需要在批次间进行缓冲的转换。

这两种形式都会接收一个包含以下属性的 options 参数:

  • options.signal <AbortSignal> 一个 AbortSignal,当管道被取消、遇到错误或消费者停止读取时触发。转换可以检查 signal.aborted 或监听 'abort' 事件来执行早期清理。

刷新信号(null)在源结束之后发送,让转换有机会发出尾部数据(例如压缩页脚)。

// Stateless: uppercase transform
const upper = (chunks) => {
  if (chunks === null) return null; // flush
  return chunks.map((c) => new TextEncoder().encode(
    new TextDecoder().decode(c).toUpperCase(),
  ));
};

// Stateful: line splitter
const lines = {
  transform: async function*(source) {
    let partial = '';
    for await (const chunks of source) {
      if (chunks === null) {
        if (partial) yield [new TextEncoder().encode(partial)];
        continue;
      }
      for (const chunk of chunks) {
        const str = partial + new TextDecoder().decode(chunk);
        const parts = str.split('\n');
        partial = parts.pop();
        for (const line of parts) {
          yield [new TextEncoder().encode(`${line}\n`)];
        }
      }
    }
  },
};

拉取与推送#

此 API 支持两种模型:

  • 拉取 (Pull) —— 数据按需流动。pull()pullSync() 创建惰性管道,仅在消费者进行迭代时才从源读取数据。

  • 推送 (Push) —— 数据被显式写入。push() 创建了一个带有背压(backpressure)的写入者/可读对。写入者将数据推入;可读端作为异步可迭代对象被消费。

背压#

拉取流具有自然的背压 —— 消费者驱动节奏,因此源的读取速度绝不会超过消费者的处理速度。推送流需要显式背压,因为生产者和消费者是独立运行的。push()broadcast()share() 上的 highWaterMarkbackpressure 选项控制此机制。

双缓冲区模型#

推送流使用两部分缓冲系统。可以将其想象成通过软管(挂起的写入)填充桶(槽位),并带有一个在桶满时关闭的浮动阀门。

                          highWaterMark (e.g., 3)
                                 |
    Producer                     v
       |                    +---------+
       v                    |         |
  [ write() ] ----+    +--->| slots   |---> Consumer pulls
  [ write() ]     |    |    | (bucket)|     for await (...)
  [ write() ]     v    |    +---------+
              +--------+         ^
              | pending|         |
              | writes |    float valve
              | (hose) |    (backpressure)
              +--------+
                   ^
                   |
          'strict' mode limits this too!
  • 槽位(桶) —— 准备好供消费者使用的数据,上限为 highWaterMark。当消费者拉取时,它会将所有槽位一次性排空到一个批次中。

  • 挂起的写入(软管) —— 等待槽位空间的写入。消费者排空后,挂起的写入被提升到空出来的槽位中,它们的 Promise 得到解决。

各策略如何使用这些缓冲区:

策略 槽位限制 挂起写入限制
'strict' highWaterMark highWaterMark
'block' highWaterMark 无限制
'drop-oldest' highWaterMark N/A(从不等待)
'drop-newest' highWaterMark N/A(从不等待)
严格模式(默认)#

严格模式捕获了“即发即忘”模式,即生产者调用 write() 而不等待,这会导致内存无限制增长。它将槽位缓冲区和挂起写入队列都限制为 highWaterMark

如果您适当地 await 每次写入,则在任何时候只能有一个挂起的写入(您自己的),因此永远不会达到挂起写入限制。未等待的写入会在挂起队列中累积,一旦溢出就会抛出错误。

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({ highWaterMark: 16 });

// Consumer must run concurrently -- without it, the first write
// that fills the buffer blocks the producer forever.
const consuming = text(readable);

// GOOD: awaited writes. The producer waits for the consumer to
// make room when the buffer is full.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({ highWaterMark: 16 });

  // Consumer must run concurrently -- without it, the first write
  // that fills the buffer blocks the producer forever.
  const consuming = text(readable);

  // GOOD: awaited writes. The producer waits for the consumer to
  // make room when the buffer is full.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);

忘记 await 最终会导致抛出错误。

// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
  writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"
阻塞模式#

阻塞模式将槽位限制为 highWaterMark,但对挂起写入队列不设限制。已等待的写入会阻塞,直到消费者腾出空间,这与严格模式类似。区别在于未等待的写入会静默地无限排队,而不是抛出错误 —— 如果生产者忘记 await,这可能导致内存泄漏。

这是现有的 Node.js 经典流和 Web Streams 默认的模式。当您控制生产者并确保其正确 await 时,或者在从这些 API 迁移代码时使用它。

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({
  highWaterMark: 16,
  backpressure: 'block',
});

const consuming = text(readable);

// Safe -- awaited writes block until the consumer reads.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);
const { push, text } = require('node:stream/iter');

async function run() {
  const { writer, readable } = push({
    highWaterMark: 16,
    backpressure: 'block',
  });

  const consuming = text(readable);

  // Safe -- awaited writes block until the consumer reads.
  for (const item of dataset) {
    await writer.write(item);
  }
  await writer.end();
  console.log(await consuming);
}

run().catch(console.error);
丢弃最旧#

写入永不等待。当槽位缓冲区已满时,最旧的缓冲块会被逐出,以便为传入的写入腾出空间。消费者总是看到最新的数据。适用于实时馈送、遥测或任何旧数据价值低于当前数据的场景。

import { push } from 'node:stream/iter';

// Keep only the 5 most recent readings
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});
const { push } = require('node:stream/iter');

// Keep only the 5 most recent readings
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});
丢弃最新#

写入永不等待。当槽位缓冲区已满时,传入的写入会被静默丢弃。消费者处理已经缓冲的内容,而不会被新数据压垮。适用于速率限制或在高压下卸载。

import { push } from 'node:stream/iter';

// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});
const { push } = require('node:stream/iter');

// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});

写入者接口#

写入者是任何符合 Writer 接口的对象。只需 write() 方法;其他所有方法均为可选。

每个异步方法都有一个对应的同步 *Sync 方法,设计用于“尝试-回退”模式:首先尝试快速的同步路径,只有当同步调用表明无法完成时,才回退到异步版本。

if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err);  // Always synchronous, no fallback needed

writer.desiredSize#

达到高水位线之前可用的缓冲区槽位数。如果写入者已关闭或消费者已断开连接,则返回 null

该值总是非负的。

writer.end([options])#

  • options <Object>
  • signal <AbortSignal> 仅取消此操作。该信号仅取消挂起的 end() 调用;它不会使写入者本身失败。
  • 返回:{Promise} 总写入字节数。
  • 发出不再有数据写入的信号。

    writer.endSync()#

    • 返回:<number> 总写入字节数,如果写入者未打开则为 -1

    writer.end() 的同步变体。如果写入者已关闭或出错,则返回 -1。可用作“尝试-回退”模式。

    const result = writer.endSync();
    if (result < 0) {
      writer.end();
    }
    

    writer.fail(reason)#

    使写入者进入终结错误状态。如果写入者已关闭或出错,此操作无效。与 write()end() 不同,fail() 是无条件同步的,因为使写入者失败是一个纯状态转换,没有异步工作需要执行。

    writer.write(chunk[, options])#

  • 返回:{Promise}
  • 写入一个数据块。当缓冲区空间可用时,Promise 解析。

    writer.writeSync(chunk)#

    同步写入。不阻塞;如果背压生效,返回 false

    writer.writev(chunks[, options])#

  • 返回:{Promise}
  • 以单个批次写入多个数据块。

    writer.writevSync(chunks)#

    同步批次写入。

    stream/iter 模块#

    所有函数既可以作为命名导出,也可以作为 Stream 命名空间对象的属性使用。

    // Named exports
    import { from, pull, bytes, Stream } from 'node:stream/iter';
    
    // Namespace access
    Stream.from('hello');
    // Named exports
    const { from, pull, bytes, Stream } = require('node:stream/iter');
    
    // Namespace access
    Stream.from('hello');
    

    在模块说明符上包含 node: 前缀是可选的。

    数据源#

    from(input)#

    从给定输入创建一个异步字节流。字符串以 UTF-8 编码。ArrayBufferArrayBufferView 值被包装为 Uint8Array。数组和可迭代对象被递归展平并规范化。

    实现 Symbol.for('Stream.toAsyncStreamable')Symbol.for('Stream.toStreamable') 的对象将通过这些协议进行转换。toAsyncStreamable 协议优先于 toStreamable,后者优先于迭代协议(Symbol.asyncIterator, Symbol.iterator)。

    import { Buffer } from 'node:buffer';
    import { from, text } from 'node:stream/iter';
    
    console.log(await text(from('hello')));       // 'hello'
    console.log(await text(from(Buffer.from('hello')))); // 'hello'
    const { Buffer } = require('node:buffer');
    const { from, text } = require('node:stream/iter');
    
    async function run() {
      console.log(await text(from('hello')));       // 'hello'
      console.log(await text(from(Buffer.from('hello')))); // 'hello'
    }
    
    run().catch(console.error);
    

    fromSync(input)#

    from() 的同步版本。返回一个同步可迭代对象。不能接受异步可迭代对象或 Promise。实现 Symbol.for('Stream.toStreamable') 的对象将通过该协议转换(优先于 Symbol.iterator)。toAsyncStreamable 协议被完全忽略。

    import { fromSync, textSync } from 'node:stream/iter';
    
    console.log(textSync(fromSync('hello'))); // 'hello'
    const { fromSync, textSync } = require('node:stream/iter');
    
    console.log(textSync(fromSync('hello'))); // 'hello'
    

    管道#

    pipeTo(source[, ...transforms], writer[, options])#

    • source <AsyncIterable> | <Iterable> 数据源。
    • ...transforms <Function> | <Object> 零个或多个要应用的转换。
    • writer <Object> 具有 write(chunk) 方法的目标。
    • options <Object>
    • signal <AbortSignal> 中止管道。
    • preventClose <boolean> 如果为 true,则在源结束时不要调用 writer.end()默认值: false
    • preventFail <boolean> 如果为 true,则在错误时不要调用 writer.fail()默认值: false
  • 返回:{Promise} 总写入字节数。
  • 通过转换将源管入写入者。如果写入者具有 writev(chunks) 方法,则整个批次会在单个调用中传入(实现分散/收集 I/O)。

    如果写入者实现了可选的 *Sync 方法(writeSyncwritevSyncendSync),pipeTo() 将尝试首先使用同步方法作为快速路径,仅当同步方法表明它们无法完成时(例如背压或等待下一个滴答)才回退到异步版本。fail() 总是同步调用的。

    import { from, pipeTo } from 'node:stream/iter';
    import { compressGzip } from 'node:zlib/iter';
    import { open } from 'node:fs/promises';
    
    const fh = await open('output.gz', 'w');
    const totalBytes = await pipeTo(
      from('Hello, world!'),
      compressGzip(),
      fh.writer({ autoClose: true }),
    );
    const { from, pipeTo } = require('node:stream/iter');
    const { compressGzip } = require('node:zlib/iter');
    const { open } = require('node:fs/promises');
    
    async function run() {
      const fh = await open('output.gz', 'w');
      const totalBytes = await pipeTo(
        from('Hello, world!'),
        compressGzip(),
        fh.writer({ autoClose: true }),
      );
    }
    
    run().catch(console.error);
    

    pipeToSync(source[, ...transforms], writer[, options])#

  • 返回:<number> 总写入字节数。
  • pipeTo() 的同步版本。source、所有转换和 writer 必须是同步的。不能接受异步可迭代对象或 Promise。

    要实现此功能,writer 必须具有 *Sync 方法(writeSyncwritevSyncendSync)和 fail()

    pull(source[, ...transforms][, options])#

  • 返回:{AsyncIterable<Uint8Array[]>}
  • 创建一个惰性异步管道。除非返回的可迭代对象被消费,否则不会从 source 读取数据。转换按顺序应用。

    import { from, pull, text } from 'node:stream/iter';
    
    const asciiUpper = (chunks) => {
      if (chunks === null) return null;
      return chunks.map((c) => {
        for (let i = 0; i < c.length; i++) {
          c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
        }
        return c;
      });
    };
    
    const result = pull(from('hello'), asciiUpper);
    console.log(await text(result)); // 'HELLO'
    const { from, pull, text } = require('node:stream/iter');
    
    const asciiUpper = (chunks) => {
      if (chunks === null) return null;
      return chunks.map((c) => {
        for (let i = 0; i < c.length; i++) {
          c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
        }
        return c;
      });
    };
    
    async function run() {
      const result = pull(from('hello'), asciiUpper);
      console.log(await text(result)); // 'HELLO'
    }
    
    run().catch(console.error);
    

    使用 AbortSignal

    import { pull } from 'node:stream/iter';
    
    const ac = new AbortController();
    const result = pull(source, transform, { signal: ac.signal });
    ac.abort(); // Pipeline throws AbortError on next iteration
    const { pull } = require('node:stream/iter');
    
    const ac = new AbortController();
    const result = pull(source, transform, { signal: ac.signal });
    ac.abort(); // Pipeline throws AbortError on next iteration
    

    pullSync(source[, ...transforms])#

    • source <Iterable> 同步数据源。
    • ...transforms <Function> | <Object> 零个或多个同步转换。
    • 返回:{Iterable<Uint8Array[]>}

    pull() 的同步版本。所有转换必须是同步的。

    推送流#

    push([...transforms][, options])#

    • ...transforms <Function> | <Object> 应用于可读侧的可选转换。
    • options <Object>
    • highWaterMark <number> 在应用背压之前缓冲槽位的最大数量。必须 >= 1;小于 1 的值被截断为 1。默认值: 4
    • backpressure <string> 背压策略:'strict''block''drop-oldest''drop-newest'默认值: 'strict'
    • signal <AbortSignal> 中止流。
  • 返回:<Object>
  • writer {PushWriter} 写入侧。
  • readable {AsyncIterable<Uint8Array[]>} 可读侧。
  • 创建带有背压的推送流。写入者推入数据;可读侧作为异步可迭代对象消费。

    import { push, text } from 'node:stream/iter';
    
    const { writer, readable } = push();
    
    // Producer and consumer must run concurrently. With strict backpressure
    // (the default), awaited writes block until the consumer reads.
    const producing = (async () => {
      await writer.write('hello');
      await writer.write(' world');
      await writer.end();
    })();
    
    console.log(await text(readable)); // 'hello world'
    await producing;
    const { push, text } = require('node:stream/iter');
    
    async function run() {
      const { writer, readable } = push();
    
      // Producer and consumer must run concurrently. With strict backpressure
      // (the default), awaited writes block until the consumer reads.
      const producing = (async () => {
        await writer.write('hello');
        await writer.write(' world');
        await writer.end();
      })();
    
      console.log(await text(readable)); // 'hello world'
      await producing;
    }
    
    run().catch(console.error);
    

    push() 返回的写入者符合 [写入者接口][]。

    双工通道#

    duplex([options])#

    • options <Object>
    • highWaterMark <number> 两个方向的缓冲区大小。默认值: 4
    • backpressure <string> 两个方向的策略。默认值: 'strict'
    • signal <AbortSignal> 两个通道的取消信号。
    • a <Object> A 到 B 方向特定的选项。覆盖共享选项。
    • b <Object> B 到 A 方向特定的选项。覆盖共享选项。
  • 返回:<Array> 一对 [channelA, channelB] 双工通道。
  • 创建一对连接的双工通道以进行双向通信,类似于 socketpair()。写入一个通道写入者的数据会出现在另一个通道的可读端。

    每个通道具有:

    • writer — 用于向对等方发送数据的 [写入者接口][] 对象。
    • readable — 用于从对等方读取数据的 AsyncIterable<Uint8Array[]>
    • close() — 关闭通道的此端(幂等)。
    • [Symbol.asyncDispose]() — 支持 await using 的异步处置。
    import { duplex, text } from 'node:stream/iter';
    
    const [client, server] = duplex();
    
    // Server echoes back
    const serving = (async () => {
      for await (const chunks of server.readable) {
        await server.writer.writev(chunks);
      }
    })();
    
    await client.writer.write('hello');
    await client.writer.end();
    
    console.log(await text(server.readable)); // handled by echo
    await serving;
    const { duplex, text } = require('node:stream/iter');
    
    async function run() {
      const [client, server] = duplex();
    
      // Server echoes back
      const serving = (async () => {
        for await (const chunks of server.readable) {
          await server.writer.writev(chunks);
        }
      })();
    
      await client.writer.write('hello');
      await client.writer.end();
    
      console.log(await text(server.readable)); // handled by echo
      await serving;
    }
    
    run().catch(console.error);
    

    消费者#

    array(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
    • signal <AbortSignal>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:{Promise<Uint8Array[]>}
  • 将所有数据块收集为 Uint8Array 值的数组(不进行连接)。

    arrayBuffer(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
    • signal <AbortSignal>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:{Promise}
  • 将所有字节收集到 ArrayBuffer 中。

    arrayBufferSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回: <ArrayBuffer>
  • arrayBuffer() 的同步版本。

    arraySync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:<Uint8Array[]>
  • array() 的同步版本。

    bytes(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
    • signal <AbortSignal>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:{Promise}
  • 将流中的所有字节收集到单个 Uint8Array 中。

    import { from, bytes } from 'node:stream/iter';
    
    const data = await bytes(from('hello'));
    console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
    const { from, bytes } = require('node:stream/iter');
    
    async function run() {
      const data = await bytes(from('hello'));
      console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
    }
    
    run().catch(console.error);
    

    bytesSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:<Uint8Array>
  • bytes() 的同步版本。

    text(source[, options])#

    • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
    • options <Object>
    • encoding <string> 文本编码。默认值: 'utf-8'
    • signal <AbortSignal>
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回:{Promise}
  • 收集所有字节并解码为文本。

    import { from, text } from 'node:stream/iter';
    
    console.log(await text(from('hello'))); // 'hello'
    const { from, text } = require('node:stream/iter');
    
    async function run() {
      console.log(await text(from('hello'))); // 'hello'
    }
    
    run().catch(console.error);
    

    textSync(source[, options])#

    • source {Iterable<Uint8Array[]>}
    • options <Object>
    • encoding <string> 默认值: 'utf-8'
    • limit <number> 要消费的最大字节数。如果收集的总字节数超过限制,则抛出 ERR_OUT_OF_RANGE 错误。
  • 返回: <string>
  • text() 的同步版本。

    工具#

    ondrain(drainable)#

    • drainable <Object> 实现 drainable 协议的对象。
    • 返回:{Promise|null}

    等待 drainable 写入者的背压清除。返回一个 Promise,当写入者可以接受更多数据时解析为 true;如果对象没有实现 drainable 协议,则返回 null

    import { push, ondrain, text } from 'node:stream/iter';
    
    const { writer, readable } = push({ highWaterMark: 2 });
    writer.writeSync('a');
    writer.writeSync('b');
    
    // Start consuming so the buffer can actually drain
    const consuming = text(readable);
    
    // Buffer is full -- wait for drain
    const canWrite = await ondrain(writer);
    if (canWrite) {
      await writer.write('c');
    }
    await writer.end();
    await consuming;
    const { push, ondrain, text } = require('node:stream/iter');
    
    async function run() {
      const { writer, readable } = push({ highWaterMark: 2 });
      writer.writeSync('a');
      writer.writeSync('b');
    
      // Start consuming so the buffer can actually drain
      const consuming = text(readable);
    
      // Buffer is full -- wait for drain
      const canWrite = await ondrain(writer);
      if (canWrite) {
        await writer.write('c');
      }
      await writer.end();
      await consuming;
    }
    
    run().catch(console.error);
    

    merge(...sources[, options])#

    • ...sources {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} 两个或更多可迭代对象。
    • options <Object>
    • signal <AbortSignal>
  • 返回:{AsyncIterable<Uint8Array[]>}
  • 通过按时间顺序(哪个源先产生数据就先产生哪个)产生批次来合并多个异步可迭代对象。所有源被并发消费。

    import { from, merge, text } from 'node:stream/iter';
    
    const merged = merge(from('hello '), from('world'));
    console.log(await text(merged)); // Order depends on timing
    const { from, merge, text } = require('node:stream/iter');
    
    async function run() {
      const merged = merge(from('hello '), from('world'));
      console.log(await text(merged)); // Order depends on timing
    }
    
    run().catch(console.error);
    

    tap(callback)#

    • callback <Function> (chunks) => void 对每个批次调用。
    • 返回:<Function> 无状态转换。

    创建一个传递转换,在不修改批次的情况下观察它们。适用于日志记录、指标或调试。

    import { from, pull, text, tap } from 'node:stream/iter';
    
    const result = pull(
      from('hello'),
      tap((chunks) => console.log('Batch size:', chunks.length)),
    );
    console.log(await text(result));
    const { from, pull, text, tap } = require('node:stream/iter');
    
    async function run() {
      const result = pull(
        from('hello'),
        tap((chunks) => console.log('Batch size:', chunks.length)),
      );
      console.log(await text(result));
    }
    
    run().catch(console.error);
    

    tap() 故意不阻止 tapping 回调对数据块进行原地修改;但返回值会被忽略。

    tapSync(callback)#

    tap() 的同步版本。

    多消费者#

    broadcast([options])#

    • options <Object>
    • highWaterMark <number> 槽位中的缓冲区大小。必须 >= 1;小于 1 的值被截断为 1。默认值: 16
    • backpressure <string> 'strict''block''drop-oldest''drop-newest'默认值: 'strict'
    • signal <AbortSignal>
  • 返回:<Object>
  • writer {BroadcastWriter}
  • broadcast {Broadcast}
  • 创建推送模型的多消费者广播通道。单个写入者向多个消费者推送数据。每个消费者都有一个进入共享缓冲区的独立游标。

    import { broadcast, text } from 'node:stream/iter';
    
    const { writer, broadcast: bc } = broadcast();
    
    // Create consumers before writing
    const c1 = bc.push();  // Consumer 1
    const c2 = bc.push();  // Consumer 2
    
    // Producer and consumers must run concurrently. Awaited writes
    // block when the buffer fills until consumers read.
    const producing = (async () => {
      await writer.write('hello');
      await writer.end();
    })();
    
    const [r1, r2] = await Promise.all([text(c1), text(c2)]);
    console.log(r1); // 'hello'
    console.log(r2); // 'hello'
    await producing;
    const { broadcast, text } = require('node:stream/iter');
    
    async function run() {
      const { writer, broadcast: bc } = broadcast();
    
      // Create consumers before writing
      const c1 = bc.push();  // Consumer 1
      const c2 = bc.push();  // Consumer 2
    
      // Producer and consumers must run concurrently. Awaited writes
      // block when the buffer fills until consumers read.
      const producing = (async () => {
        await writer.write('hello');
        await writer.end();
      })();
    
      const [r1, r2] = await Promise.all([text(c1), text(c2)]);
      console.log(r1); // 'hello'
      console.log(r2); // 'hello'
      await producing;
    }
    
    run().catch(console.error);
    
    broadcast.bufferSize#

    当前缓冲的数据块数量。

    broadcast.cancel([reason])#

    取消广播。所有消费者都会收到错误。

    broadcast.consumerCount#

    活跃消费者的数量。

    broadcast.push([...transforms][, options])#
  • 返回:{AsyncIterable<Uint8Array[]>}
  • 创建新消费者。每个消费者从订阅点开始接收广播中写入的所有数据。可选转换应用于此消费者的数据视图。

    broadcast[Symbol.dispose]()#

    broadcast.cancel() 的别名。

    Broadcast.from(input[, options])#

    从现有源创建 {Broadcast}。源被自动消费并推送到所有订阅者。

    share(source[, options])#

    • source <AsyncIterable> 要共享的源。
    • options <Object>
    • highWaterMark <number> 缓冲区大小。必须 >= 1;小于 1 的值被截断为 1。默认值: 16
    • backpressure <string> 'strict''block''drop-oldest''drop-newest'默认值: 'strict'
  • 返回:{Share}
  • 创建拉取模型的多消费者共享流。与 broadcast() 不同,源仅在消费者拉取时读取。多个消费者共享单个缓冲区。

    import { from, share, text } from 'node:stream/iter';
    
    const shared = share(from('hello'));
    
    const c1 = shared.pull();
    const c2 = shared.pull();
    
    // Consume concurrently to avoid deadlock with small buffers.
    const [r1, r2] = await Promise.all([text(c1), text(c2)]);
    console.log(r1); // 'hello'
    console.log(r2); // 'hello'
    const { from, share, text } = require('node:stream/iter');
    
    async function run() {
      const shared = share(from('hello'));
    
      const c1 = shared.pull();
      const c2 = shared.pull();
    
      // Consume concurrently to avoid deadlock with small buffers.
      const [r1, r2] = await Promise.all([text(c1), text(c2)]);
      console.log(r1); // 'hello'
      console.log(r2); // 'hello'
    }
    
    run().catch(console.error);
    
    share.bufferSize#

    当前缓冲的数据块数量。

    share.cancel([reason])#

    取消共享。所有消费者都会收到错误。

    share.consumerCount#

    活跃消费者的数量。

    share.pull([...transforms][, options])#
  • 返回:{AsyncIterable<Uint8Array[]>}
  • 创建共享源的新消费者。

    share[Symbol.dispose]()#

    share.cancel() 的别名。

    Share.from(input[, options])#

    从现有源创建 {Share}。

    shareSync(source[, options])#

    • source <Iterable> 要共享的同步源。
    • options <Object>
    • highWaterMark <number> 必须 >= 1;小于 1 的值被截断为 1。默认值: 16
    • backpressure <string> 默认值: 'strict'
  • 返回:{SyncShare}
  • share() 的同步版本。

    SyncShare.fromSync(input[, options])#

    压缩与解压缩转换#

    用于 pull()pullSync()pipeTo()pipeToSync() 的压缩和解压缩转换可通过 node:zlib/iter 模块获得。详情请参阅 node:zlib/iter 文档

    协议符号#

    这些众所周知的符号允许第三方对象参与流式传输协议,而无需直接从 node:stream/iter 导入。

    Stream.broadcastProtocol#

    • 值:Symbol.for('Stream.broadcastProtocol')

    该值必须是一个函数。当被 Broadcast.from() 调用时,它接收传递给 Broadcast.from() 的选项,并必须返回符合 {Broadcast} 接口的对象。实现完全自定义 —— 它可以以任何方式管理消费者、缓冲和背压。

    import { Broadcast, text } from 'node:stream/iter';
    
    // This example defers to the built-in Broadcast, but a custom
    // implementation could use any mechanism.
    class MessageBus {
      #broadcast;
      #writer;
    
      constructor() {
        const { writer, broadcast } = Broadcast();
        this.#writer = writer;
        this.#broadcast = broadcast;
      }
    
      [Symbol.for('Stream.broadcastProtocol')](options) {
        return this.#broadcast;
      }
    
      send(data) {
        this.#writer.write(new TextEncoder().encode(data));
      }
    
      close() {
        this.#writer.end();
      }
    }
    
    const bus = new MessageBus();
    const { broadcast } = Broadcast.from(bus);
    const consumer = broadcast.push();
    bus.send('hello');
    bus.close();
    console.log(await text(consumer)); // 'hello'
    const { Broadcast, text } = require('node:stream/iter');
    
    // This example defers to the built-in Broadcast, but a custom
    // implementation could use any mechanism.
    class MessageBus {
      #broadcast;
      #writer;
    
      constructor() {
        const { writer, broadcast } = Broadcast();
        this.#writer = writer;
        this.#broadcast = broadcast;
      }
    
      [Symbol.for('Stream.broadcastProtocol')](options) {
        return this.#broadcast;
      }
    
      send(data) {
        this.#writer.write(new TextEncoder().encode(data));
      }
    
      close() {
        this.#writer.end();
      }
    }
    
    const bus = new MessageBus();
    const { broadcast } = Broadcast.from(bus);
    const consumer = broadcast.push();
    bus.send('hello');
    bus.close();
    text(consumer).then(console.log); // 'hello'
    

    Stream.drainableProtocol#

    • 值:Symbol.for('Stream.drainableProtocol')

    实现此以使写入者与 ondrain() 兼容。该方法应返回一个在背压清除时解析的 Promise,如果无背压则返回 null

    import { ondrain } from 'node:stream/iter';
    
    class CustomWriter {
      #queue = [];
      #drain = null;
      #closed = false;
      [Symbol.for('Stream.drainableProtocol')]() {
        if (this.#closed) return null;
        if (this.#queue.length < 3) return Promise.resolve(true);
        this.#drain ??= Promise.withResolvers();
        return this.#drain.promise;
      }
      write(chunk) {
        this.#queue.push(chunk);
      }
      flush() {
        this.#queue.length = 0;
        this.#drain?.resolve(true);
        this.#drain = null;
      }
      close() {
        this.#closed = true;
      }
    }
    const writer = new CustomWriter();
    const ready = ondrain(writer);
    console.log(ready); // Promise { true } -- no backpressure
    const { ondrain } = require('node:stream/iter');
    
    class CustomWriter {
      #queue = [];
      #drain = null;
      #closed = false;
    
      [Symbol.for('Stream.drainableProtocol')]() {
        if (this.#closed) return null;
        if (this.#queue.length < 3) return Promise.resolve(true);
        this.#drain ??= Promise.withResolvers();
        return this.#drain.promise;
      }
    
      write(chunk) {
        this.#queue.push(chunk);
      }
    
      flush() {
        this.#queue.length = 0;
        this.#drain?.resolve(true);
        this.#drain = null;
      }
    
      close() {
        this.#closed = true;
      }
    }
    
    const writer = new CustomWriter();
    const ready = ondrain(writer);
    console.log(ready); // Promise { true } -- no backpressure
    

    Stream.shareProtocol#

    • 值:Symbol.for('Stream.shareProtocol')

    该值必须是一个函数。当被 Share.from() 调用时,它接收传递给 Share.from() 的选项,并必须返回符合 {Share} 接口的对象。实现完全自定义 —— 它可以以任何方式管理共享源、消费者、缓冲和背压。

    import { share, Share, text } from 'node:stream/iter';
    
    // This example defers to the built-in share(), but a custom
    // implementation could use any mechanism.
    class DataPool {
      #share;
    
      constructor(source) {
        this.#share = share(source);
      }
    
      [Symbol.for('Stream.shareProtocol')](options) {
        return this.#share;
      }
    }
    
    const pool = new DataPool(
      (async function* () {
        yield 'hello';
      })(),
    );
    
    const shared = Share.from(pool);
    const consumer = shared.pull();
    console.log(await text(consumer)); // 'hello'
    const { share, Share, text } = require('node:stream/iter');
    
    // This example defers to the built-in share(), but a custom
    // implementation could use any mechanism.
    class DataPool {
      #share;
    
      constructor(source) {
        this.#share = share(source);
      }
    
      [Symbol.for('Stream.shareProtocol')](options) {
        return this.#share;
      }
    }
    
    const pool = new DataPool(
      (async function* () {
        yield 'hello';
      })(),
    );
    
    const shared = Share.from(pool);
    const consumer = shared.pull();
    text(consumer).then(console.log); // 'hello'
    

    Stream.shareSyncProtocol#

    • 值:Symbol.for('Stream.shareSyncProtocol')

    该值必须是一个函数。当被 SyncShare.fromSync() 调用时,它接收传递给 SyncShare.fromSync() 的选项,并必须返回符合 {SyncShare} 接口的对象。实现完全自定义 —— 它可以以任何方式管理共享源、消费者和缓冲。

    import { shareSync, SyncShare, textSync } from 'node:stream/iter';
    
    // This example defers to the built-in shareSync(), but a custom
    // implementation could use any mechanism.
    class SyncDataPool {
      #share;
    
      constructor(source) {
        this.#share = shareSync(source);
      }
    
      [Symbol.for('Stream.shareSyncProtocol')](options) {
        return this.#share;
      }
    }
    
    const encoder = new TextEncoder();
    const pool = new SyncDataPool(
      function* () {
        yield [encoder.encode('hello')];
      }(),
    );
    
    const shared = SyncShare.fromSync(pool);
    const consumer = shared.pull();
    console.log(textSync(consumer)); // 'hello'
    const { shareSync, SyncShare, textSync } = require('node:stream/iter');
    
    // This example defers to the built-in shareSync(), but a custom
    // implementation could use any mechanism.
    class SyncDataPool {
      #share;
    
      constructor(source) {
        this.#share = shareSync(source);
      }
    
      [Symbol.for('Stream.shareSyncProtocol')](options) {
        return this.#share;
      }
    }
    
    const encoder = new TextEncoder();
    const pool = new SyncDataPool(
      function* () {
        yield [encoder.encode('hello')];
      }(),
    );
    
    const shared = SyncShare.fromSync(pool);
    const consumer = shared.pull();
    console.log(textSync(consumer)); // 'hello'
    

    Stream.toAsyncStreamable#

    • 值:Symbol.for('Stream.toAsyncStreamable')

    该值必须是一个将对象转换为可流式值的函数。当在流式管道中的任何位置(作为传递给 from() 的源,或作为转换返回的值)遇到对象时,会调用此方法来产生实际数据。它可以返回(或解析为)任何可流式值:字符串、Uint8ArrayAsyncIterableIterable 或另一个可流式对象。

    import { from, text } from 'node:stream/iter';
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toAsyncStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = from(new Greeting('world'));
    console.log(await text(stream)); // 'hello world'
    const { from, text } = require('node:stream/iter');
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toAsyncStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = from(new Greeting('world'));
    text(stream).then(console.log); // 'hello world'
    

    Stream.toStreamable#

    • 值:Symbol.for('Stream.toStreamable')

    该值必须是一个同步将对象转换为可流式值的函数。当在流式管道中的任何位置(作为传递给 fromSync() 的源,或作为同步转换返回的值)遇到对象时,会调用此方法来产生实际数据。它必须同步返回一个可流式值:字符串、Uint8ArrayIterable

    import { fromSync, textSync } from 'node:stream/iter';
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = fromSync(new Greeting('world'));
    console.log(textSync(stream)); // 'hello world'
    const { fromSync, textSync } = require('node:stream/iter');
    
    class Greeting {
      #name;
    
      constructor(name) {
        this.#name = name;
      }
    
      [Symbol.for('Stream.toStreamable')]() {
        return `hello ${this.#name}`;
      }
    }
    
    const stream = fromSync(new Greeting('world'));
    console.log(textSync(stream)); // 'hello world'