Node.js v24.0.0 文档
- Node.js v24.0.0
-
目录
- Stream
- 本文档的组织
- 流的类型
- 流消费者的 API
- 可写流
- 类:
stream.Writable
- 事件:
'close'
- 事件:
'drain'
- 事件:
'error'
- 事件:
'finish'
- 事件:
'pipe'
- 事件:
'unpipe'
writable.cork()
writable.destroy([error])
writable.closed
writable.destroyed
writable.end([chunk[, encoding]][, callback])
writable.setDefaultEncoding(encoding)
writable.uncork()
writable.writable
writable.writableAborted
writable.writableEnded
writable.writableCorked
writable.errored
writable.writableFinished
writable.writableHighWaterMark
writable.writableLength
writable.writableNeedDrain
writable.writableObjectMode
writable[Symbol.asyncDispose]()
writable.write(chunk[, encoding][, callback])
- 事件:
- 类:
- 可读流
- 两种读取模式
- 三种状态
- 选择一种 API 风格
- 类:
stream.Readable
- 事件:
'close'
- 事件:
'data'
- 事件:
'end'
- 事件:
'error'
- 事件:
'pause'
- 事件:
'readable'
- 事件:
'resume'
readable.destroy([error])
readable.closed
readable.destroyed
readable.isPaused()
readable.pause()
readable.pipe(destination[, options])
readable.read([size])
readable.readable
readable.readableAborted
readable.readableDidRead
readable.readableEncoding
readable.readableEnded
readable.errored
readable.readableFlowing
readable.readableHighWaterMark
readable.readableLength
readable.readableObjectMode
readable.resume()
readable.setEncoding(encoding)
readable.unpipe([destination])
readable.unshift(chunk[, encoding])
readable.wrap(stream)
readable[Symbol.asyncIterator]()
readable[Symbol.asyncDispose]()
readable.compose(stream[, options])
readable.iterator([options])
readable.map(fn[, options])
readable.filter(fn[, options])
readable.forEach(fn[, options])
readable.toArray([options])
readable.some(fn[, options])
readable.find(fn[, options])
readable.every(fn[, options])
readable.flatMap(fn[, options])
readable.drop(limit[, options])
readable.take(limit[, options])
readable.reduce(fn[, initial[, options]])
- 事件:
- Duplex 和 transform 流
stream.finished(stream[, options], callback)
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
stream.compose(...streams)
stream.Readable.from(iterable[, options])
stream.Readable.fromWeb(readableStream[, options])
stream.Readable.isDisturbed(stream)
stream.isErrored(stream)
stream.isReadable(stream)
stream.Readable.toWeb(streamReadable[, options])
stream.Writable.fromWeb(writableStream[, options])
stream.Writable.toWeb(streamWritable)
stream.Duplex.from(src)
stream.Duplex.fromWeb(pair[, options])
stream.Duplex.toWeb(streamDuplex)
stream.addAbortSignal(signal, stream)
stream.getDefaultHighWaterMark(objectMode)
stream.setDefaultHighWaterMark(objectMode, value)
- 可写流
- 流实现者的 API
- 补充说明
- Stream
-
索引
- 断言测试
- 异步上下文跟踪
- 异步钩子
- Buffer
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入器 API
- 子进程
- 集群
- 命令行选项
- Console
- Crypto
- 调试器
- 已弃用的 API
- 诊断通道
- DNS
- Domain
- 错误
- Events
- 文件系统
- 全局对象
- HTTP
- HTTP/2
- HTTPS
- Inspector
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:module
API - 模块:包
- 模块:TypeScript
- Net
- OS
- Path
- 性能钩子
- 权限
- Process
- Punycode
- 查询字符串
- Readline
- REPL
- Report
- 单可执行应用程序
- SQLite
- Stream
- 字符串解码器
- 测试运行器
- 定时器
- TLS/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用工具
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- Worker 线程
- Zlib
- 其他版本
- 选项
Stream[src]#
源码: lib/stream.js
流是 Node.js 中用于处理流式数据的抽象接口。node:stream
模块提供了一个 API 用于实现流接口。
Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求 和 process.stdout
都是流实例。
流可以是可读的、可写的或两者兼而有之。所有流都是 EventEmitter
的实例。
要访问 node:stream
模块
const stream = require('node:stream');
node:stream
模块可用于创建新型的流实例。通常不需要使用 node:stream
模块来消费流。
本文档的组织#
本文档包含两个主要部分和一个包含说明的第三部分。 第一部分介绍了如何在应用程序中使用现有流。 第二部分介绍了如何创建新型的流。
流的类型#
Node.js 中有四种基本流类型
Writable
:可以写入数据的流(例如,fs.createWriteStream()
)。Readable
:可以从中读取数据的流(例如,fs.createReadStream()
)。Duplex
:既是Readable
又是Writable
的流(例如,net.Socket
)。Transform
:Duplex
流,可以在写入和读取数据时修改或转换数据(例如,zlib.createDeflate()
)。
此外,此模块还包括实用工具函数 stream.duplexPair()
、stream.pipeline()
、stream.finished()
、 stream.Readable.from()
和 stream.addAbortSignal()
。
流 Promises API#
stream/promises
API 为流提供了一组替代的异步实用工具函数,这些函数返回 Promise
对象而不是使用回调。 该 API 可以通过 require('node:stream/promises')
或 require('node:stream').promises
访问。
stream.pipeline(source[, ...transforms], destination[, options])
#
stream.pipeline(streams[, options])
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 返回: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 返回: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 返回: <Promise> | <AsyncIterable>
options
<Object> Pipeline 选项signal
<AbortSignal>end
<boolean> 当源流结束时,结束目标流。即使此值为false
,转换流也始终会结束。默认值:true
。
- 返回值:<Promise> 当管道完成时 fulfilled。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
要使用 AbortSignal
,请将其作为最后一个参数传递到选项对象中。 当信号中止时,将使用 AbortError
在底层管道上调用 destroy
。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
pipeline
API 也支持异步生成器。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
请记住处理传递到异步生成器中的 signal
参数。 特别是当异步生成器是管道的源(即第一个参数)时,否则管道将永远不会完成。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
pipeline
API 提供了 callback 版本
stream.finished(stream[, options])
#
stream
<Stream> | <ReadableStream> | <WritableStream> 可读和/或可写流/webstream。options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> 如果为true
,则在 promise fulfilled 之前删除此函数注册的监听器。默认值:false
。
- 返回值:<Promise> 当流不再可读或可写时 fulfilled。
const { finished } = require('node:stream/promises');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
finished
API 也提供了一个 callback 版本。
在返回的 promise 被 resolved 或 rejected 之后,stream.finished()
会留下悬挂的事件监听器(特别是 'error'
、'end'
、'finish'
和 'close'
)。 这样做的原因是,为了防止意外的 'error'
事件(由于不正确的流实现)导致意外崩溃。 如果不需要此行为,则应将 options.cleanup
设置为 true
。
await finished(rs, { cleanup: true });
对象模式#
所有由 Node.js API 创建的流都专门处理字符串、<Buffer>、<TypedArray> 和 <DataView> 对象。
Strings
和Buffers
是流中最常用的类型。TypedArray
和DataView
允许您使用Int32Array
或Uint8Array
等类型来处理二进制数据。 当您将 TypedArray 或 DataView 写入流时,Node.js 会处理原始字节。
但是,流实现可以使用其他类型的 JavaScript 值(除了 null
,它在流中具有特殊用途)。 这样的流被认为以“对象模式”运行。
当创建流时,使用 objectMode
选项将流实例切换到对象模式。 尝试将现有流切换到对象模式是不安全的。
缓冲#
Writable
和 Readable
流都将数据存储在内部缓冲区中。
可能缓冲的数据量取决于传递给流构造函数的 highWaterMark
选项。 对于普通流,highWaterMark
选项指定一个总字节数。 对于以对象模式运行的流,highWaterMark
指定对象的总数。 对于在(但不解码)字符串上运行的流,highWaterMark
指定 UTF-16 代码单元的总数。
当实现调用 stream.push(chunk)
时,数据会缓存在 Readable
流中。 如果 Stream 的使用者不调用 stream.read()
,数据将位于内部队列中,直到被使用。
一旦内部读取缓冲区总大小达到 highWaterMark
指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据可以被使用(也就是说,流将停止调用内部 readable._read()
方法,该方法用于填充读取缓冲区)。
当重复调用 writable.write(chunk)
方法时,数据会缓存在 Writable
流中。 当内部写入缓冲区的总大小低于 highWaterMark
设置的阈值时,对 writable.write()
的调用将返回 true
。 一旦内部缓冲区的大小达到或超过 highWaterMark
,将返回 false
。
stream
API 的一个关键目标,特别是 stream.pipe()
方法,是将数据缓冲限制在可接受的水平,以便速度不同的源和目标不会压垮可用内存。
highWaterMark
选项是一个阈值,而不是一个限制:它决定了流在停止请求更多数据之前缓冲的数据量。 它通常不强制执行严格的内存限制。 特定的流实现可以选择强制执行更严格的限制,但这只是可选的。
因为 Duplex
和 Transform
流都是 Readable
和 Writable
,所以每个流都维护两个独立的内部缓冲区,用于读取和写入,允许每一侧独立于另一侧运行,同时保持适当和高效的数据流。 例如,net.Socket
实例是 Duplex
流,其 Readable
侧允许使用来自套接字的数据,其 Writable
侧允许将数据写入到套接字。 因为写入套接字的数据速度可能比接收数据的速度更快或更慢,所以每一侧都应该独立于另一侧运行(和缓冲)。
内部缓冲的机制是一个内部实现细节,并且可以随时更改。 但是,对于某些高级实现,可以使用 writable.writableBuffer
或 readable.readableBuffer
检索内部缓冲区。 不建议使用这些未文档化的属性。
流消费者的 API#
几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流。 以下是在 Node.js 应用程序中使用流的示例,该应用程序实现了 HTTP 服务器。
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable
流(例如示例中的 res
)公开了诸如 write()
和 end()
之类的方法,这些方法用于将数据写入流。
Readable
流使用 EventEmitter
API 在有数据可从流中读取时通知应用程序代码。 可以通过多种方式从流中读取可用数据。
Writable
和 Readable
流都以各种方式使用 EventEmitter
API 来传达流的当前状态。
Duplex
和 Transform
流既是 Writable
又是 Readable
。
无论是将数据写入流还是从流中使用数据,应用程序都不需要直接实现流接口,并且通常没有理由调用 require('node:stream')
。
希望实现新型流的开发人员应参阅流实现者的 API部分。
可写流#
可写流是写入数据的目标的抽象。
Writable
流的示例包括:
这些示例中的一些实际上是实现 Writable
接口的 Duplex
流。
所有 Writable
流都实现了由 stream.Writable
类定义的接口。
虽然 Writable
流的特定实例可能在各个方面有所不同,但所有 Writable
流都遵循相同的基本使用模式,如下例所示。
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
类:stream.Writable
#
事件:'close'
#
当流及其任何底层资源(例如文件描述符)已关闭时,会发出 'close'
事件。 该事件表示不再发出任何事件,并且不会发生进一步的计算。
如果使用 emitClose
选项创建 Writable
流,则它将始终发出 'close'
事件。
事件:'drain'
#
如果调用 stream.write(chunk)
返回 false
,则在适合恢复向流写入数据时,将发出 'drain'
事件。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
事件:'error'
#
如果在写入或管道传输数据时发生错误,则会发出 'error'
事件。 调用侦听器回调时,会传递一个 Error
参数。
当发出 'error'
事件时,除非在创建流时将 autoDestroy
选项设置为 false
,否则流将被关闭。
在 'error'
之后,应该不再发出其他事件(包括 'error'
事件)。
事件:'finish'
#
在调用 stream.end()
方法之后,并且所有数据都已刷新到底层系统之后,将发出 'finish'
事件。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
事件:'pipe'
#
src
<stream.Readable> 管道传输到此可写流的源流
当在可读流上调用 stream.pipe()
方法,并将此可写流添加到其目标集合时,会触发 'pipe'
事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
事件:'unpipe'
#
src
<stream.Readable> 取消管道连接到此可写流的源流。
当在 Readable
流上调用 stream.unpipe()
方法,并从此可读流的目标集合中删除此 Writable
时,会触发 'unpipe'
事件。
如果此 Writable
流在 Readable
流通过管道连接到它时发出错误,也会触发此事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()
#
writable.cork()
方法强制将所有写入的数据缓冲在内存中。当调用 stream.uncork()
或 stream.end()
方法时,将刷新缓冲的数据。
writable.cork()
的主要目的是为了适应在快速连续地将多个小块写入流的情况。writable.cork()
不会立即将它们转发到底层目标,而是缓冲所有块,直到调用 writable.uncork()
,这将把它们全部传递给 writable._writev()
(如果存在)。这可以防止队首阻塞的情况,即在等待处理第一个小块时对数据进行缓冲。但是,在不实现 writable._writev()
的情况下使用 writable.cork()
可能会对吞吐量产生不利影响。
writable.destroy([error])
#
销毁流。 可选择发出一个 'error'
事件,并发出一个 'close'
事件(除非将 emitClose
设置为 false
)。 在此调用之后,可写流已结束,随后对 write()
或 end()
的调用将导致 ERR_STREAM_DESTROYED
错误。 这是一种破坏性且直接销毁流的方法。 之前对 write()
的调用可能没有耗尽,并可能触发 ERR_STREAM_DESTROYED
错误。 如果在关闭之前应刷新数据,请使用 end()
而不是 destroy,或在销毁流之前等待 'drain'
事件。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
一旦调用了 destroy()
,任何进一步的调用都将是空操作,并且除了来自 _destroy()
的错误之外,不会再发出 'error'
。
实现者不应覆盖此方法,而应实现 writable._destroy()
。
writable.destroyed
#
在调用 writable.destroy()
之后为 true
。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])
#
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可选的要写入的数据。 对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。 对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> 如果chunk
是字符串,则为编码callback
<Function> 流完成时的回调。- 返回:<this>
调用 writable.end()
方法表示不再有数据写入 Writable
。 可选的 chunk
和 encoding
参数允许在关闭流之前立即写入最后一个额外的数据块。
在调用 stream.end()
之后调用 stream.write()
方法会引发错误。
// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.uncork()
#
writable.uncork()
方法刷新自调用 stream.cork()
以来缓冲的所有数据。
当使用 writable.cork()
和 writable.uncork()
来管理写入流的缓冲时,请使用 process.nextTick()
来推迟对 writable.uncork()
的调用。 这样做允许对给定 Node.js 事件循环阶段中发生的所有 writable.write()
调用进行批处理。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果在流上多次调用 writable.cork()
方法,则必须调用相同数量的 writable.uncork()
才能刷新缓冲的数据。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
另请参阅:writable.cork()
。
writable.writableEnded
#
在调用 writable.end()
之后为 true
。 此属性不指示数据是否已刷新,为此请使用 writable.writableFinished
。
writable[Symbol.asyncDispose]()
#
使用 AbortError
调用 writable.destroy()
并返回一个 promise,该 promise 在流完成时实现。
writable.write(chunk[, encoding][, callback])
#
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 可选的要写入的数据。 对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。 对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> | <null> 编码,如果chunk
是字符串。默认值:'utf8'
callback
<Function> 当此数据块刷新时调用的回调函数。- 返回值: <boolean> 如果流希望调用代码在继续写入其他数据之前等待
'drain'
事件被触发,则返回false
;否则返回true
。
writable.write()
方法将一些数据写入流,并在数据被完全处理后调用提供的 callback
。 如果发生错误,则将使用该错误作为其第一个参数来调用 callback
。 callback
是异步调用的,并且在 'error'
被触发之前调用。
如果内部缓冲区小于在创建流时配置的 highWaterMark
,并且允许写入 chunk
,则返回值为 true
。 如果返回 false
,则应停止进一步尝试将数据写入流,直到 'drain'
事件被触发。
当流未排空时,调用 write()
将缓冲 chunk
,并返回 false。 一旦所有当前缓冲的块都被排空(被操作系统接受进行传递),'drain'
事件将被触发。 一旦 write()
返回 false,在 'drain'
事件被触发之前,不要写入更多的块。 虽然允许在未排空的流上调用 write()
,但 Node.js 将缓冲所有写入的块,直到达到最大内存使用量,此时它将无条件中止。 即使在中止之前,高内存使用量也会导致糟糕的垃圾收集器性能和高 RSS(即使在不再需要内存之后,通常也不会将其释放回系统)。 由于如果远程对等方不读取数据,TCP 套接字可能永远不会排空,因此写入未排空的套接字可能导致可远程利用的漏洞。
当流未排空时写入数据对于 Transform
来说尤其成问题,因为 Transform
流默认情况下处于暂停状态,直到它们被管道连接或添加了 'data'
或 'readable'
事件处理程序。
如果要写入的数据可以按需生成或获取,建议将逻辑封装到 Readable
中,并使用 stream.pipe()
。 但是,如果首选调用 write()
,则可以使用 'drain'
事件来尊重背压并避免内存问题
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
对象模式下的 Writable
流将始终忽略 encoding
参数。
可读流#
可读流是一个源的抽象,数据就是从这个源消耗的。
Readable
流的例子包括
所有 Readable
流都实现了由 stream.Readable
类定义的接口。
两种读取模式#
Readable
流实际上以两种模式之一运行:流动模式和暂停模式。 这些模式与 对象模式 是分开的。 Readable
流可以处于对象模式或非对象模式,无论它是处于流动模式还是暂停模式。
-
在流动模式下,数据从底层系统自动读取,并使用
EventEmitter
接口通过事件尽可能快地提供给应用程序。 -
在暂停模式下,必须显式调用
stream.read()
方法才能从流中读取数据块。
所有 Readable
流都以暂停模式开始,但可以通过以下方式之一切换到流动模式
- 添加
'data'
事件处理程序。 - 调用
stream.resume()
方法。 - 调用
stream.pipe()
方法将数据发送到Writable
。
Readable
可以使用以下方式之一切换回暂停模式
- 如果没有管道目标,则通过调用
stream.pause()
方法。 - 如果存在管道目标,则通过删除所有管道目标。 可以通过调用
stream.unpipe()
方法来删除多个管道目标。
要记住的重要概念是,Readable
在提供用于使用或忽略该数据的机制之前,不会生成数据。 如果禁用或取消了使用机制,则 Readable
将尝试停止生成数据。
出于向后兼容的原因,删除 'data'
事件处理程序不会自动暂停流。 此外,如果存在管道目标,则调用 stream.pause()
不能保证一旦这些目标排空并要求更多数据,该流将保持暂停状态。
如果 Readable
切换到流动模式,并且没有可用的消费者来处理数据,则该数据将丢失。 例如,当调用 readable.resume()
方法而没有将侦听器附加到 'data'
事件时,或者当从流中删除 'data'
事件处理程序时,可能会发生这种情况。
添加 'readable'
事件处理程序会自动使流停止流动,并且必须通过 readable.read()
来使用数据。 如果删除了 'readable'
事件处理程序,则如果存在 'data'
事件处理程序,则该流将再次开始流动。
三种状态#
Readable
流的“两种模式”操作是对 Readable
流实现中发生的更复杂的内部状态管理的一种简化抽象。
具体来说,在任何给定的时间点,每个 Readable
都处于三种可能的状态之一
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
当 readable.readableFlowing
为 null
时,未提供用于使用流数据的机制。 因此,流将不会生成数据。 在此状态下,附加 'data'
事件的侦听器、调用 readable.pipe()
方法或调用 readable.resume()
方法会将 readable.readableFlowing
切换为 true
,从而导致 Readable
在生成数据时开始主动触发事件。
调用 readable.pause()
、readable.unpipe()
或接收背压将导致 readable.readableFlowing
设置为 false
,从而暂时停止事件的流动,但不会停止数据的生成。 在此状态下,附加 'data'
事件的侦听器不会将 readable.readableFlowing
切换为 true
。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
当 readable.readableFlowing
为 false
时,数据可能会在流的内部缓冲区中累积。
选择一种 API 样式#
Readable
流 API 在多个 Node.js 版本中不断发展,并提供了多种使用流数据的方法。 通常,开发人员应选择一种使用数据的方法,并且绝不应使用多种方法来使用来自单个流的数据。 具体来说,使用 on('data')
、on('readable')
、pipe()
或异步迭代器的组合可能会导致难以理解的行为。
类:stream.Readable
#
事件:'close'
#
当流及其任何底层资源(例如文件描述符)已关闭时,会发出 'close'
事件。 该事件表示不再发出任何事件,并且不会发生进一步的计算。
如果使用 emitClose
选项创建 Readable
流,则它将始终触发 'close'
事件。
事件:'data'
#
chunk
<Buffer> | <string> | <any> 数据块。 对于不在对象模式下运行的流,该块将是字符串或Buffer
。 对于处于对象模式的流,该块可以是除null
之外的任何 JavaScript 值。
每当流将数据块的所有权放弃给消费者时,都会触发 'data'
事件。 通过调用 readable.pipe()
、readable.resume()
或通过将侦听器回调附加到 'data'
事件,将流切换到流动模式时,可能会发生这种情况。 每当调用 readable.read()
方法并且有数据块可供返回时,也会触发 'data'
事件。
将 'data'
事件侦听器附加到未显式暂停的流会将该流切换到流动模式。 然后将尽快传递数据。
如果使用 readable.setEncoding()
方法为流指定了默认编码,则侦听器回调会将数据块作为字符串传递;否则,数据将作为 Buffer
传递。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
事件:'end'
#
当没有更多数据可以从流中使用时,将触发 'end'
事件。
除非完全使用数据,否则不会触发 'end'
事件。 这可以通过将流切换到流动模式,或通过重复调用 stream.read()
直到所有数据都被使用来完成。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
事件:'error'
#
Readable
实现可能随时触发 'error'
事件。 通常,如果底层流由于底层内部故障而无法生成数据,或者流实现尝试推送无效的数据块,则可能会发生这种情况。
侦听器回调将传递一个 Error
对象。
事件:'pause'
#
当调用 stream.pause()
且 readableFlowing
不是 false
时,将触发 'pause'
事件。
事件:'readable'
#
当流中有数据可供读取时,就会触发 'readable'
事件,数据量最大可达配置的高水位线 (state.highWaterMark
)。 实际上,它表明流的缓冲区中有新信息。 如果此缓冲区中有数据,则可以调用 stream.read()
来检索该数据。 此外,当流已到达末尾时,也可能会触发 'readable'
事件。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
如果流已到达末尾,则调用 stream.read()
将返回 null
并触发 'end'
事件。 如果从未有任何数据可读取,情况也是如此。 例如,在以下示例中,foo.txt
是一个空文件。
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
运行此脚本的输出是
$ node test.js
readable: null
end
在某些情况下,附加 'readable'
事件的监听器会导致将一定量的数据读取到内部缓冲区中。
通常,readable.pipe()
和 'data'
事件机制比 'readable'
事件更容易理解。 但是,处理 'readable'
可能会提高吞吐量。
如果同时使用 'readable'
和 'data'
,则 'readable'
在控制流中具有优先权,即只有在调用 stream.read()
时才会触发 'data'
。 readableFlowing
属性将变为 false
。 如果在删除 'readable'
时存在 'data'
监听器,则流将开始流动,即无需调用 .resume()
即可触发 'data'
事件。
事件:'resume'
#
当调用 stream.resume()
且 readableFlowing
不为 true
时,会触发 'resume'
事件。
readable.destroy([error])
#
销毁流。 可选地触发一个 'error'
事件,并触发一个 'close'
事件(除非将 emitClose
设置为 false
)。 调用此方法后,可读流将释放任何内部资源,并且后续对 push()
的调用将被忽略。
一旦调用了 destroy()
,任何进一步的调用都将是空操作,并且除了来自 _destroy()
的错误之外,不会再发出 'error'
。
实现者不应覆盖此方法,而应实现 readable._destroy()
。
readable.isPaused()
#
- 返回:<boolean>
readable.isPaused()
方法返回 Readable
的当前操作状态。 这主要由 readable.pipe()
方法的基础机制使用。 在大多数典型情况下,没有理由直接使用此方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
#
- 返回:<this>
readable.pause()
方法将导致处于流动模式的流停止触发 'data'
事件,从而退出流动模式。 任何变得可用的数据将保留在内部缓冲区中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
如果存在 'readable'
事件监听器,则 readable.pause()
方法无效。
readable.pipe(destination[, options])
#
destination
<stream.Writable> 用于写入数据的目标options
<Object> 管道选项end
<boolean> 在读取器结束时结束写入器。 默认值:true
。
- 返回:<stream.Writable> *destination*,如果它是
Duplex
或Transform
流,则允许管道链
readable.pipe()
方法将 Writable
流附加到 readable
,使其自动切换到流动模式,并将所有数据推送到附加的 Writable
。 将自动管理数据流,以便目标 Writable
流不会被更快的 Readable
流淹没。
以下示例将所有数据从 readable
管道传输到名为 file.txt
的文件中。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
可以将多个 Writable
流附加到单个 Readable
流。
readable.pipe()
方法返回对 *destination* 流的引用,从而可以设置管道流的链。
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当源 Readable
流触发 'end'
时,会在目标 Writable
流上调用 stream.end()
,以便目标不再可写。 要禁用此默认行为,可以将 end
选项作为 false
传递,导致目标流保持打开状态。
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
一个重要的注意事项是,如果在处理过程中 Readable
流触发错误,则 Writable
目标不会自动关闭。 如果发生错误,则必须手动关闭每个流,以防止内存泄漏。
在 Node.js 进程退出之前,永远不会关闭 process.stderr
和 process.stdout
Writable
流,无论指定的选项如何。
readable.read([size])
#
readable.read()
方法从内部缓冲区读取数据并返回。 如果没有数据可读取,则返回 null
。 默认情况下,数据作为 Buffer
对象返回,除非已使用 readable.setEncoding()
方法指定了编码,或者流以对象模式运行。
可选的 size
参数指定要读取的特定字节数。 如果没有 size
个字节可读取,则将返回 null
,除非流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。
如果未指定 size
参数,则将返回内部缓冲区中包含的所有数据。
size
参数必须小于或等于 1 GiB。
readable.read()
方法应仅在以暂停模式运行的 Readable
流上调用。 在流动模式下,会自动调用 readable.read()
,直到内部缓冲区完全耗尽。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
每次调用 readable.read()
都会返回一个数据块或 null
,表示当前没有更多数据可读取。 这些块不会自动连接。 由于单个 read()
调用不会返回所有数据,因此可能需要使用 while 循环来持续读取块,直到检索到所有数据。 读取大文件时,.read()
可能会暂时返回 null
,表明它已消耗所有缓冲内容,但可能还有更多数据要缓冲。 在这种情况下,一旦缓冲区中有更多数据,就会触发新的 'readable'
事件,并且 'end'
事件表示数据传输的结束。
因此,要从 readable
读取文件的全部内容,需要跨多个 'readable'
事件收集块。
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
对象模式下的 Readable
流将始终从对 readable.read(size)
的调用返回单个项目,而不管 size
参数的值如何。
如果 readable.read()
方法返回一个数据块,也会触发一个 'data'
事件。
在触发 'end'
事件后调用 stream.read([size])
将返回 null
。 不会引发运行时错误。
readable.readableEncoding
#
获取给定 Readable
流的 encoding
属性。可以使用 readable.setEncoding()
方法设置 encoding
属性。
readable.resume()
#
- 返回:<this>
readable.resume()
方法使显式暂停的 Readable
流恢复发出 'data'
事件,从而将流切换到流动模式。
readable.resume()
方法可用于完全消耗流中的数据,而无需实际处理任何数据
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
如果存在 'readable'
事件监听器,则 readable.resume()
方法无效。
readable.setEncoding(encoding)
#
readable.setEncoding()
方法设置从 Readable
流读取的数据的字符编码。
默认情况下,不分配任何编码,并且流数据将作为 Buffer
对象返回。 设置编码会导致流数据作为指定编码的字符串返回,而不是作为 Buffer
对象返回。 例如,调用 readable.setEncoding('utf8')
将导致输出数据被解释为 UTF-8 数据,并作为字符串传递。 调用 readable.setEncoding('hex')
将导致数据以十六进制字符串格式编码。
Readable
流将正确处理通过流传递的多字节字符,否则如果只是从流中提取为 Buffer
对象,这些字符将无法正确解码。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])
#
destination
<stream.Writable> 可选的要取消管道的特定流- 返回:<this>
readable.unpipe()
方法分离先前使用 stream.pipe()
方法连接的 Writable
流。
如果未指定 destination
,则分离所有管道。
如果指定了 destination
,但未为其设置任何管道,则该方法不执行任何操作。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])
#
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要取消移入读取队列的数据块。 对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray>、<DataView> 或null
。 对于对象模式流,chunk
可以是任何 JavaScript 值。encoding
<string> 字符串块的编码。 必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。
将 chunk
作为 null
传递表示流的结束 (EOF),其行为与 readable.push(null)
相同,之后无法写入更多数据。 EOF 信号放在缓冲区的末尾,任何缓冲的数据仍将被刷新。
readable.unshift()
方法将数据块推回内部缓冲区。 这在某些情况下很有用,在这些情况下,流正被需要“取消消耗”从源中乐观地提取的一些数据的代码使用,以便可以将数据传递给其他方。
在发出 'end'
事件后无法调用 stream.unshift(chunk)
方法,否则将抛出运行时错误。
使用 stream.unshift()
的开发人员通常应考虑切换到使用 Transform
流。 有关更多信息,请参见 流实现者的 API 部分。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
与 stream.push(chunk)
不同,stream.unshift(chunk)
不会通过重置流的内部读取状态来结束读取过程。 如果在读取期间(即从自定义流上的 stream._read()
实现中)调用 readable.unshift()
,这可能会导致意外的结果。 在调用 readable.unshift()
后立即调用 stream.push('')
将适当地重置读取状态,但最好只是避免在执行读取的过程中调用 readable.unshift()
。
readable.wrap(stream)
#
在 Node.js 0.10 之前,流没有实现当前定义的整个 node:stream
模块 API。(有关更多信息,请参见兼容性。)
当使用发出 'data'
事件并且具有 stream.pause()
方法(仅提供建议)的旧 Node.js 库时,可以使用 readable.wrap()
方法来创建 Readable
流,该流使用旧流作为其数据源。
很少需要使用 readable.wrap()
,但该方法已作为与旧 Node.js 应用程序和库交互的便利方法提供。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()
#
- 返回: <AsyncIterator> 以完全使用流。
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
如果循环以 break
、return
或 throw
终止,则该流将被销毁。 换句话说,迭代一个流将完全消耗该流。 流将以等于 highWaterMark
选项大小的块读取。 在上面的代码示例中,如果文件的数据少于 64 KiB,数据将位于单个块中,因为没有为 fs.createReadStream()
提供 highWaterMark
选项。
readable[Symbol.asyncDispose]()
#
使用 AbortError
调用 readable.destroy()
并返回一个 promise,该 promise 在流完成时完成。
readable.compose(stream[, options])
#
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Duplex> 一个与流
stream
组成的流。
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
有关更多信息,请参见 stream.compose
。
readable.iterator([options])
#
options
<Object>destroyOnReturn
<boolean> 设置为false
时,在异步迭代器上调用return
,或使用break
、return
或throw
退出for await...of
迭代将不会销毁流。 默认:true
。
- 返回: <AsyncIterator> 以使用流。
通过此方法创建的迭代器使用户可以选择取消流的销毁,如果 for await...of
循环通过 return
、break
或 throw
退出,或者如果流在迭代期间发出错误,则迭代器应销毁流。
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])
#
fn
<Function> | <AsyncFunction> 一个函数,用于映射流中的每个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。highWaterMark
<number> 等待用户使用映射项时要缓冲多少项。 默认值:concurrency * 2 - 1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Readable> 使用函数
fn
映射的流。
此方法允许映射流。 对于流中的每个块,都将调用 fn
函数。 如果 fn
函数返回一个 Promise - 该 Promise 将在传递给结果流之前被 await
。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
readable.filter(fn[, options])
#
fn
<Function> | <AsyncFunction> 用于从流中筛选块的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。highWaterMark
<number> 等待用户使用筛选后的项目时要缓冲多少项。 默认值:concurrency * 2 - 1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Readable> 使用谓词
fn
筛选的流。
此方法允许筛选流。 对于流中的每个块,都将调用 fn
函数,如果它返回真值,则该块将被传递到结果流。 如果 fn
函数返回一个 Promise - 该 Promise 将被 await
。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
readable.forEach(fn[, options])
#
fn
<Function> | <AsyncFunction> 要在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Promise> 流完成时的 Promise。
此方法允许迭代流。 对于流中的每个块,都将调用 fn
函数。 如果 fn
函数返回一个 Promise - 该 Promise 将被 await
。
此方法与 for await...of
循环不同,因为它可以选择性地并发处理块。 此外,只能通过传递 signal
选项并在中止相关的 AbortController
时停止 forEach
迭代,而 for await...of
可以使用 break
或 return
停止。 在任何一种情况下,流都将被销毁。
此方法与侦听 'data'
事件不同,因为它在底层机制中使用 readable
事件,并且可以限制并发 fn
调用的数量。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
readable.toArray([options])
#
options
<Object>signal
<AbortSignal> 如果信号被中止,则允许取消 toArray 操作。
- 返回: <Promise> 一个包含流内容的数组的 Promise。
此方法允许轻松获取流的内容。
由于此方法将整个流读入内存,因此它抵消了流的优势。 它旨在用于互操作性和便利性,而不是作为使用流的主要方式。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])
#
fn
<Function> | <AsyncFunction> 要在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Promise> 如果
fn
为至少一个块返回一个真值,则解析为true
的 Promise。
此方法类似于 Array.prototype.some
,并在流中的每个块上调用 fn
,直到等待的返回值是 true
(或任何真值)。 一旦对块的 fn
调用等待的返回值是真值,流将被销毁,并且 Promise 将被 true
履行。 如果对块的 fn
调用都没有返回真值,则 Promise 将被 false
履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.find(fn[, options])
#
fn
<Function> | <AsyncFunction> 要在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Promise> 一个 Promise,解析为
fn
计算结果为真值的第一个块,如果未找到元素,则解析为undefined
。
此方法类似于 Array.prototype.find
,并在流中的每个块上调用 fn
,以查找 fn
的真值的块。 一旦 fn
调用的等待返回值是真值,流将被销毁,并且 Promise 将被 fn
返回真值的 值履行。 如果所有对块的 fn
调用都返回一个假值,则 Promise 将被 undefined
履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.every(fn[, options])
#
fn
<Function> | <AsyncFunction> 要在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Promise> 如果
fn
为所有块返回一个真值,则解析为true
的 Promise。
此方法类似于 Array.prototype.every
,并在流中的每个块上调用 fn
,以检查所有等待的返回值是否是 fn
的真值。 一旦对块的 fn
调用等待的返回值是假值,流将被销毁,并且 Promise 将被 false
履行。 如果所有对块的 fn
调用都返回一个真值,则 Promise 将被 true
履行。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
readable.flatMap(fn[, options])
#
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> 一个函数,用于映射流中的每个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number>fn
在流上同时调用的最大并发数。 默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Readable> 使用函数
fn
平面映射的流。
此方法通过将给定的回调应用于流的每个块,然后展平结果来返回一个新的流。
可以从 fn
返回一个流或另一个可迭代对象或异步可迭代对象,并且结果流将被合并(展平)到返回的流中。
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
readable.drop(limit[, options])
#
limit
<number> 从可读对象中删除的块数。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Readable> 一个删除了
limit
个块的流。
此方法返回一个删除了前 limit
个块的新流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])
#
limit
<number> 从可读对象中获取的块数。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Readable> 一个获取了
limit
个块的流。
此方法返回一个包含前 limit
个块的新流。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])
#
fn
<Function> | <AsyncFunction> 一个 reducer 函数,用于在流中的每个块上调用。previous
<any> 从上次调用fn
获得的值,或者如果指定了initial
值,否则为流的第一个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
initial
<any> 在缩减中使用的初始值。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回: <Promise> 缩减的最终值的 Promise。
此方法按顺序在流的每个块上调用 fn
,并将其传递给先前元素计算的结果。 它返回缩减的最终值的 Promise。
如果没有提供 initial
值,则流的第一个块将用作初始值。 如果流为空,则 Promise 将被 TypeError
拒绝,并带有 ERR_INVALID_ARGS
代码属性。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
reducer 函数逐元素地迭代流,这意味着没有 concurrency
参数或并行性。 要并发执行 reduce
,您可以将异步函数提取到 readable.map
方法。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
双工流和转换流#
类: stream.Transform
#
转换流是 Duplex
流,其中输出在某种程度上与输入相关。 与所有 Duplex
流一样,Transform
流实现 Readable
和 Writable
接口。
Transform
流的例子包括
transform.destroy([error])
#
销毁流,并选择性地发出 'error'
事件。 在此调用之后,转换流将释放任何内部资源。 实现者不应覆盖此方法,而应实现 readable._destroy()
。 Transform
的 _destroy()
的默认实现还会发出 'close'
,除非在 false 中设置了 emitClose
。
一旦调用了 destroy()
,任何进一步的调用都将是空操作,并且不会发出任何进一步的错误,除非来自 _destroy()
可能作为 'error'
发出。
stream.duplexPair([options])
#
实用函数 duplexPair
返回一个包含两个项目的数组,每个项目都是一个连接到另一端的 Duplex
流。
const [ sideA, sideB ] = duplexPair();
写入一个流的任何内容都可以在另一个流上读取。它提供类似于网络连接的行为,客户端写入的数据可以被服务器读取,反之亦然。
Duplex 流是对称的;可以使用其中一个或另一个,行为没有任何差异。
stream.finished(stream[, options], callback)
#
stream
<Stream> | <ReadableStream> | <WritableStream> 可读和/或可写流/webstream。options
<Object>error
<boolean> 如果设置为false
,则对emit('error', err)
的调用不会被视为完成。 默认值:true
。readable
<boolean> 当设置为false
时,即使流可能仍然可读,也会在流结束时调用回调。 默认值:true
。writable
<boolean> 当设置为false
时,即使流可能仍然可写,也会在流结束时调用回调。 默认值:true
。signal
<AbortSignal> 允许中止等待流完成。 如果信号中止,则不会中止底层流。 将使用AbortError
调用回调。 此函数添加的所有已注册侦听器也将被删除。
callback
<Function> 一个回调函数,它接受一个可选的 error 参数。- 返回: <Function> 一个清理函数,用于删除所有已注册的侦听器。
一个函数,用于在流不再可读、可写或遇到错误或过早关闭事件时收到通知。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
在错误处理场景中尤其有用,在这些场景中,流过早地被销毁(例如中止的 HTTP 请求),并且不会发出 'end'
或 'finish'
。
finished
API 提供 promise 版本。
在 callback
被调用后,stream.finished()
会留下悬空的事件侦听器(特别是 'error'
、'end'
、'finish'
和 'close'
)。 这样做的原因是,意外的 'error'
事件(由于不正确的流实现)不会导致意外崩溃。 如果不需要此行为,则需要在回调中调用返回的清理函数。
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)
#
stream.pipeline(streams, callback)
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- 返回: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- 返回: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- 返回: <AsyncIterable> | <Promise>
callback
<Function> 当管道完全完成时调用。err
<Error>val
由destination
返回的Promise
的已解析值。
- 返回: <Stream>
一个模块方法,用于在流和生成器之间进行管道传输,转发错误并正确清理,并在管道完成时提供回调。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
pipeline
API 提供一个 promise 版本。
stream.pipeline()
将在所有流上调用 stream.destroy(err)
,除了
- 已发出
'end'
或'close'
的Readable
流。 - 已发出
'finish'
或'close'
的Writable
流。
在调用 callback
后,stream.pipeline()
会在流上留下悬空的事件侦听器。 在失败后重复使用流的情况下,这可能会导致事件侦听器泄漏和吞咽错误。 如果最后一个流是可读的,则将删除悬空的事件侦听器,以便以后可以使用最后一个流。
stream.pipeline()
在引发错误时关闭所有流。 与 pipeline
一起使用 IncomingRequest
可能会导致意外行为,因为它会销毁套接字而不发送预期的响应。 请参阅下面的示例
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.compose(...streams)
#
stream.compose
处于试验阶段。streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 返回: <stream.Duplex>
将两个或多个流合并到 Duplex
流中,该流写入第一个流并从最后一个流读取。 每个提供的流都使用 stream.pipeline
管道传输到下一个流。 如果任何流发生错误,则所有流都将被销毁,包括外部 Duplex
流。
由于 stream.compose
返回一个新流,该流反过来可以(并且应该)管道传输到其他流,因此它可以实现组合。 相比之下,当将流传递给 stream.pipeline
时,通常第一个流是可读流,最后一个流是可写流,形成一个闭合回路。
如果传递了 Function
,它必须是采用 source
Iterable
的工厂方法。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose
可用于将异步可迭代对象、生成器和函数转换为流。
AsyncIterable
转换为可读的Duplex
。 不能生成null
。AsyncGeneratorFunction
转换为可读/可写的转换Duplex
。 必须将源AsyncIterable
作为第一个参数。 不能生成null
。AsyncFunction
转换为可写的Duplex
。 必须返回null
或undefined
。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
有关作为运算符的 stream.compose
,请参阅 readable.compose(stream)
。
stream.Readable.from(iterable[, options])
#
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。 如果传递了 null 值,则发出“error”事件。options
<Object> 提供给new stream.Readable([options])
的选项。 默认情况下,Readable.from()
会将options.objectMode
设置为true
,除非通过将options.objectMode
设置为false
来显式选择退出此设置。- 返回: <stream.Readable>
一个用于从迭代器创建可读流的实用方法。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
调用 Readable.from(string)
或 Readable.from(buffer)
不会迭代字符串或缓冲区以匹配其他流的语义,以提高性能。
如果将包含 promise 的 Iterable
对象作为参数传递,则可能会导致未处理的拒绝。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Readable.fromWeb(readableStream[, options])
#
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回: <stream.Readable>
stream.Readable.isDisturbed(stream)
#
stream
<stream.Readable> | <ReadableStream>- 返回值:
boolean
返回流是否已被读取或取消。
stream.isErrored(stream)
#
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 返回:<boolean>
返回流是否遇到错误。
stream.Readable.toWeb(streamReadable[, options])
#
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 从给定的stream.Readable
读取之前,创建的ReadableStream
的最大内部队列大小,超过此大小会应用反压。 如果未提供值,则将从给定的stream.Readable
中获取。size
<Function> 用于指定给定数据块大小的函数。 如果未提供值,则所有块的大小都将为1
。
- 返回值: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
#
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回值: <stream.Writable>
stream.Writable.toWeb(streamWritable)
#
streamWritable
<stream.Writable>- 返回值: <WritableStream>
stream.Duplex.from(src)
#
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
用于创建双工流的实用方法。
Stream
将可写流转换为可写Duplex
,并将可读流转换为Duplex
。Blob
转换为可读的Duplex
。string
转换为可读的Duplex
。ArrayBuffer
转换为可读的Duplex
。AsyncIterable
转换为可读的Duplex
。 不能生成null
。AsyncGeneratorFunction
转换为可读/可写的转换Duplex
。 必须将源AsyncIterable
作为第一个参数。 不能生成null
。AsyncFunction
转换为可写的Duplex
。 必须返回null
或undefined
。Object ({ writable, readable })
将readable
和writable
转换为Stream
,然后将它们组合成Duplex
,其中Duplex
将写入writable
并从readable
读取。Promise
转换为可读的Duplex
。 值null
会被忽略。ReadableStream
转换为可读的Duplex
。WritableStream
转换为可写的Duplex
。- 返回: <stream.Duplex>
如果将包含 promise 的 Iterable
对象作为参数传递,则可能会导致未处理的拒绝。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.fromWeb(pair[, options])
#
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>- 返回: <stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}
const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)
#
streamDuplex
<stream.Duplex>- 返回值: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)
#
signal
<AbortSignal> 表示可能取消的信号stream
<Stream> | <ReadableStream> | <WritableStream> 要附加信号的流。
将 AbortSignal 附加到可读或可写流。 这允许代码使用 AbortController
控制流的销毁。
在与传递的 AbortSignal
对应的 AbortController
上调用 abort
的行为,与在流上调用 .destroy(new AbortError())
相同,对于 webstreams,则与 controller.error(new AbortError())
相同。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
或者,将 AbortSignal
与可读流用作异步迭代器
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
或者,将 AbortSignal
与 ReadableStream 一起使用
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)
#
返回流使用的默认 highWaterMark。 默认为 65536
(64 KiB),对于 objectMode
则为 16
。
流实现者的 API#
node:stream
模块 API 的设计旨在使其可以使用 JavaScript 的原型继承模型轻松实现流。
首先,流开发者将声明一个新的 JavaScript 类,该类扩展了四个基本流类之一(stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
),并确保他们调用适当的父类构造函数
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
扩展流时,请记住用户可以并且应该提供哪些选项,然后再将这些选项转发到基类构造函数。 例如,如果实现对 autoDestroy
和 emitClose
选项做出假设,则不允许用户覆盖这些选项。 明确说明转发了哪些选项,而不是隐式转发所有选项。
然后,新的流类必须实现一个或多个特定方法,具体取决于正在创建的流的类型,如下表所示
用例 | 类 | 要实现的方法 |
---|---|---|
仅读取 | Readable | _read() |
仅写入 | Writable | _write() 、_writev() 、_final() |
读取和写入 | Duplex | _read() 、_write() 、_writev() 、_final() |
对写入的数据进行操作,然后读取结果 | Transform | _transform() 、_flush() 、_final() |
流的实现代码永远不应该调用流的“公共”方法,这些方法旨在供使用者使用(如流使用者的 API 部分中所述)。 这样做可能会导致应用程序代码使用流时产生不利的副作用。
避免覆盖公共方法,例如 write()
、end()
、cork()
、uncork()
、read()
和 destroy()
,或者通过 .emit()
发出内部事件,例如 'error'
、'data'
、'end'
、'finish'
和 'close'
。 这样做会破坏当前和未来的流不变性,从而导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。
简化的构造#
对于许多简单的情况,可以在不依赖继承的情况下创建流。 这可以通过直接创建 stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
对象的实例,并将适当的方法作为构造函数选项传递来实现。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
实现可写流#
stream.Writable
类被扩展来实现 Writable
流。
自定义 Writable
流必须调用 new stream.Writable([options])
构造函数,并实现 writable._write()
和/或 writable._writev()
方法。
new stream.Writable([options])
#
options
<Object>highWaterMark
<number>stream.write()
开始返回false
时的缓冲区水位线。默认值:65536
(64 KiB),或objectMode
流为16
。decodeStrings
<boolean> 是否将传递给stream.write()
的string
编码为Buffer
(使用stream.write()
调用中指定的编码),然后再将其传递给stream._write()
。 其他类型的数据不会被转换(即Buffer
不会被解码为string
)。 设置为 false 将阻止string
被转换。 默认值:true
。defaultEncoding
<string> 当没有将编码指定为stream.write()
的参数时使用的默认编码。 默认值:'utf8'
。objectMode
<boolean>stream.write(anyObj)
是否为有效操作。 设置后,如果流实现支持,则可以写入字符串、<Buffer>、<TypedArray> 或 <DataView> 以外的 JavaScript 值。 默认值:false
。emitClose
<boolean> 流在销毁后是否应发出'close'
。 默认值:true
。write
<Function>stream._write()
方法的实现。writev
<Function>stream._writev()
方法的实现。destroy
<Function>stream._destroy()
方法的实现。final
<Function>stream._final()
方法的实现。construct
<Function>stream._construct()
方法的实现。autoDestroy
<boolean> 此流是否应在结束后自动调用.destroy()
。 默认值:true
。signal
<AbortSignal> 表示可能取消的信号。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
或者,当使用 pre-ES6 样式的构造函数时
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用简化的构造函数方法
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
对与传递的 AbortSignal
对应的 AbortController
调用 abort
将与对可写流调用 .destroy(new AbortError())
的行为相同。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)
#
callback
<Function> 当流完成初始化后,调用此函数(可以选择带有错误参数)。
_construct()
方法不能直接调用。 它可以由子类实现,如果实现,则只能由内部 Writable
类方法调用。
此可选函数将在流构造函数返回后的一个刻度内调用,从而延迟任何 _write()
、_final()
和 _destroy()
调用,直到调用 callback
。 这对于在可以使用流之前初始化状态或异步初始化资源很有用。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any> 要写入的Buffer
,从传递给stream.write()
的string
转换而来。 如果流的decodeStrings
选项为false
或流以对象模式运行,则 chunk 将不会被转换,并将是传递给stream.write()
的任何内容。encoding
<string> 如果 chunk 是字符串,则encoding
是该字符串的字符编码。 如果 chunk 是Buffer
,或者流以对象模式运行,则可以忽略encoding
。callback
<Function> 当完成提供的 chunk 的处理时,调用此函数(可以选择带有错误参数)。
所有 Writable
流实现都必须提供 writable._write()
和/或 writable._writev()
方法,以将数据发送到基础资源。
Transform
流提供 writable._write()
的自己的实现。
此函数不能由应用程序代码直接调用。 它应该由子类实现,并且只能由内部 Writable
类方法调用。
callback
函数必须在 writable._write()
中同步调用,或者异步(即不同的刻度)调用,以指示写入成功完成或因错误而失败。 传递给 callback
的第一个参数必须是 Error
对象(如果调用失败),或者 null
(如果写入成功)。
在调用 writable._write()
和调用 callback
之间发生的所有 writable.write()
调用都将导致写入的数据被缓冲。 当调用 callback
时,流可能会发出 'drain'
事件。 如果流实现能够一次处理多个数据块,则应实现 writable._writev()
方法。
如果在构造函数选项中将 decodeStrings
属性显式设置为 false
,则 chunk
将保持与传递给 .write()
的对象相同,并且可能是字符串而不是 Buffer
。 这是为了支持对某些字符串数据编码进行优化的处理的实现。 在这种情况下,encoding
参数将指示字符串的字符编码。 否则,可以安全地忽略 encoding
参数。
writable._write()
方法以一个下划线为前缀,因为它在定义它的类内部,并且永远不应被用户程序直接调用。
writable._writev(chunks, callback)
#
chunks
<Object[]> 要写入的数据。 该值是一个 <Object> 数组,每个对象表示要写入的离散数据块。 这些对象的属性是callback
<Function> 处理完提供的块后要调用的回调函数(可以选择带有错误参数)。
此函数不能由应用程序代码直接调用。 它应该由子类实现,并且只能由内部 Writable
类方法调用。
在能够一次处理多个数据块的流实现中,可以附加或替代 writable._write()
来实现 writable._writev()
方法。 如果已实现,并且存在来自先前写入的缓冲数据,则将调用 _writev()
而不是 _write()
。
writable._writev()
方法以一个下划线为前缀,因为它在定义它的类内部,并且永远不应被用户程序直接调用。
writable._destroy(err, callback)
#
err
<Error> 一个可能的错误。callback
<Function> 一个回调函数,它接受一个可选的 error 参数。
_destroy()
方法由 writable.destroy()
调用。 它可以被子类覆盖,但不能直接调用。
writable._final(callback)
#
callback
<Function> 完成写入任何剩余数据后,调用此函数(可以选择带有错误参数)。
不能直接调用 _final()
方法。 它可以由子类实现,如果这样,它将仅由内部 Writable
类方法调用。
这个可选函数将在流关闭之前被调用,延迟 'finish'
事件直到 callback
被调用。这对于在流结束之前关闭资源或写入缓冲数据非常有用。
写入时出错#
在处理 writable._write()
、 writable._writev()
和 writable._final()
方法期间发生的错误必须通过调用回调并将错误作为第一个参数来传播。 从这些方法中抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
如果 Readable
流管道连接到 Writable
流,当 Writable
发出错误时,Readable
流将被取消管道连接。
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
一个可写流示例#
以下说明了一个相当简单(并且有点毫无意义)的自定义 Writable
流实现。 虽然这个特定的 Writable
流实例没有任何实际用途,但该示例说明了自定义 Writable
流实例的每个必需元素
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
在可写流中解码缓冲区#
解码缓冲区是一项常见的任务,例如,当使用输入为字符串的转换器时。 当使用多字节字符编码(例如 UTF-8)时,这不是一个简单的过程。 以下示例显示了如何使用 StringDecoder
和 Writable
解码多字节字符串。
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options?.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
实现可读流#
扩展 stream.Readable
类以实现 Readable
流。
自定义 Readable
流必须调用 new stream.Readable([options])
构造函数并实现 readable._read()
方法。
new stream.Readable([options])
#
options
<Object>highWaterMark
<number> 在停止从底层资源读取之前存储在内部缓冲区中的最大字节数。 默认值:65536
(64 KiB),或16
对于objectMode
流。encoding
<string> 如果指定,则将使用指定的编码将缓冲区解码为字符串。 默认值:null
。objectMode
<boolean> 此流是否应表现为对象流。 这意味着stream.read(n)
返回单个值而不是大小为n
的Buffer
。 默认值:false
。emitClose
<boolean> 流在销毁后是否应发出'close'
。 默认值:true
。read
<Function>stream._read()
方法的实现。destroy
<Function>stream._destroy()
方法的实现。construct
<Function>stream._construct()
方法的实现。autoDestroy
<boolean> 此流是否应在结束后自动调用.destroy()
。 默认值:true
。signal
<AbortSignal> 表示可能取消的信号。
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
}
或者,当使用 pre-ES6 样式的构造函数时
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者,使用简化的构造函数方法
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
对与传递的 AbortSignal
对应的 AbortController
调用 abort
的行为与对创建的可读对象调用 .destroy(new AbortError())
的行为相同。
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
readable._construct(callback)
#
callback
<Function> 当流完成初始化后,调用此函数(可以选择带有错误参数)。
不得直接调用 _construct()
方法。 它可以由子类实现,如果这样,它将仅由内部 Readable
类方法调用。
这个可选函数将由流构造函数在下一个刻度中调度,延迟任何 _read()
和 _destroy()
调用,直到调用 callback
。 这对于初始化状态或异步初始化资源,然后才能使用流非常有用。
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)
#
size
<number> 要异步读取的字节数
应用程序代码不得直接调用此函数。 它应该由子类实现,并且只能由内部 Readable
类方法调用。
所有 Readable
流实现都必须提供 readable._read()
方法的实现,以从底层资源获取数据。
当调用 readable._read()
时,如果资源中有可用数据,则实现应开始使用 this.push(dataChunk)
方法将该数据推送到读取队列中。 在每次调用 this.push(dataChunk)
之后,一旦流准备好接受更多数据,将再次调用 _read()
。 _read()
可以继续从资源读取和推送数据,直到 readable.push()
返回 false
。 只有在停止后再次调用 _read()
时,才应恢复将额外数据推送到队列中。
一旦调用了 readable._read()
方法,除非通过 readable.push()
方法推送更多数据,否则不会再次调用它。 空数据(例如空缓冲区和字符串)不会导致调用 readable._read()
。
size
参数是建议性的。 对于“读取”是返回数据的单个操作的实现,可以使用 size
参数来确定要获取多少数据。 其他实现可能会忽略此参数,并在数据可用时简单地提供数据。 没有必要“等待”直到有 size
字节可用,然后再调用 stream.push(chunk)
。
readable._read()
方法以划线为前缀,因为它对于定义它的类是内部的,并且不应由用户程序直接调用。
readable._destroy(err, callback)
#
err
<Error> 一个可能的错误。callback
<Function> 一个回调函数,它接受一个可选的 error 参数。
_destroy()
方法由 readable.destroy()
调用。 它可以被子类覆盖,但不能直接调用。
readable.push(chunk[, encoding])
#
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 要推送到读取队列中的数据块。 对于不在对象模式下运行的流,chunk
必须是 <string>、<Buffer>、<TypedArray> 或 <DataView>。 对于对象模式流,chunk
可以是任何 JavaScript 值。encoding
<string> 字符串块的编码。 必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。- 返回: <boolean> 如果可以继续推送其他数据块,则为
true
; 否则为false
。
当 chunk
是 <Buffer>、<TypedArray>、<DataView> 或 <string> 时,数据 chunk
将添加到内部队列中,供流的用户使用。 将 chunk
作为 null
传递表示流的结束 (EOF),此后无法再写入任何数据。
当 Readable
在暂停模式下运行时,可以使用在发出 'readable'
事件时调用 readable.read()
方法来读出使用 readable.push()
添加的数据。
当 Readable
在流动模式下运行时,将通过发出 'data'
事件来传递使用 readable.push()
添加的数据。
readable.push()
方法旨在尽可能灵活。 例如,当包装提供某种形式的暂停/恢复机制和数据回调的较低级别源时,可以通过自定义 Readable
实例包装低级别源
// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
readable.push()
方法用于将内容推送到内部缓冲区中。 它可以由 readable._read()
方法驱动。
对于不在对象模式下运行的流,如果 readable.push()
的 chunk
参数是 undefined
,它将被视为空字符串或缓冲区。 有关更多信息,请参见 readable.push('')
。
读取时出错#
在处理 readable._read()
期间发生的错误必须通过 readable.destroy(err)
方法传播。 从 readable._read()
中抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
计数流示例#
以下是一个 Readable
流的基本示例,它按升序发出从 1 到 1,000,000 的数字,然后结束。
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
实现双工流#
Duplex
流是一种同时实现 Readable
和 Writable
的流,例如 TCP 套接字连接。
由于 JavaScript 不支持多重继承,因此扩展了 stream.Duplex
类以实现 Duplex
流(而不是扩展 stream.Readable
和 stream.Writable
类)。
stream.Duplex
类在原型上继承自 stream.Readable
,并寄生式地继承自 stream.Writable
,但由于覆盖了 stream.Writable
上的 Symbol.hasInstance
,instanceof
对于两个基类都将正常工作。
自定义 Duplex
流必须调用 new stream.Duplex([options])
构造函数,并实现 readable._read()
和 writable._write()
方法。
new stream.Duplex(options)
#
options
<Object> 传递给Writable
和Readable
构造函数。还具有以下字段allowHalfOpen
<boolean> 如果设置为false
,则当可读端结束时,流将自动结束可写端。默认值:true
。readable
<boolean> 设置Duplex
是否应该是可读的。默认值:true
。writable
<boolean> 设置Duplex
是否应该是可写的。默认值:true
。readableObjectMode
<boolean> 为流的可读端设置objectMode
。如果objectMode
为true
,则无效。默认值:false
。writableObjectMode
<boolean> 为流的可写端设置objectMode
。如果objectMode
为true
,则无效。默认值:false
。readableHighWaterMark
<number> 为流的可读端设置highWaterMark
。如果提供了highWaterMark
,则无效。writableHighWaterMark
<number> 为流的可写端设置highWaterMark
。如果提供了highWaterMark
,则无效。
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者,当使用 pre-ES6 样式的构造函数时
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
或者,使用简化的构造函数方法
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
当使用 pipeline 时
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
一个双工流的例子#
以下说明了一个 Duplex
流的简单示例,该流包装了一个假设的底层源对象,可以向其中写入数据,也可以从中读取数据,尽管使用与 Node.js 流不兼容的 API。以下说明了一个 Duplex
流的简单示例,该流通过 Writable
接口缓冲传入的写入数据,然后通过 Readable
接口将其读回。
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex
流最重要的方面是 Readable
和 Writable
端彼此独立运行,尽管它们共存于单个对象实例中。
对象模式双工流#
对于 Duplex
流,可以使用 readableObjectMode
和 writableObjectMode
选项分别专门为 Readable
或 Writable
端设置 objectMode
。
例如,在以下示例中,创建一个新的 Transform
流(它是 Duplex
流的一种类型),它具有一个对象模式 Writable
端,该端接受 JavaScript 数字,这些数字在 Readable
端转换为十六进制字符串。
const { Transform } = require('node:stream');
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
实现转换流#
Transform
流是一种 Duplex
流,其中输出以某种方式从输入计算得出。示例包括压缩、加密或解密数据的 zlib 流或 crypto 流。
不要求输出与输入的大小相同、块数相同或同时到达。例如,Hash
流将永远只有一个输出块,该输出块在输入结束时提供。zlib
流将产生比其输入小得多或大得多的输出。
扩展了 stream.Transform
类以实现 Transform
流。
stream.Transform
类在原型上继承自 stream.Duplex
,并实现其自己的 writable._write()
和 readable._read()
方法。自定义 Transform
实现必须实现 transform._transform()
方法,并且可以实现 transform._flush()
方法。
使用 Transform
流时必须小心,因为写入流的数据可能会导致流的 Writable
端暂停,如果未消耗 Readable
端的输出。
new stream.Transform([options])
#
options
<Object> 传递给Writable
和Readable
构造函数。还具有以下字段transform
<Function>stream._transform()
方法的实现。flush
<Function>stream._flush()
方法的实现。
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
或者,当使用 pre-ES6 样式的构造函数时
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
或者,使用简化的构造函数方法
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
事件:'end'
#
'end'
事件来自 stream.Readable
类。在输出所有数据后,会发出 'end'
事件,该事件发生在调用 transform._flush()
中的回调之后。如果发生错误,则不应发出 'end'
。
事件:'finish'
#
'finish'
事件来自 stream.Writable
类。在调用 stream.end()
并且 stream._transform()
处理完所有块后,会发出 'finish'
事件。如果发生错误,则不应发出 'finish'
。
transform._flush(callback)
#
callback
<Function> 清除剩余数据后要调用的回调函数(可选,带有错误参数和数据)。
应用程序代码不得直接调用此函数。 它应该由子类实现,并且只能由内部 Readable
类方法调用。
在某些情况下,转换操作可能需要在流的末尾发出额外的少量数据。例如,zlib
压缩流将存储一定量的内部状态,该状态用于最佳地压缩输出。但是,当流结束时,需要清除该额外数据,以便压缩数据将是完整的。
自定义 Transform
实现可以实现 transform._flush()
方法。当没有更多要消耗的写入数据时,将在发出 'end'
事件(表示 Readable
流的结束)之前调用此方法。
在 transform._flush()
实现中,可以根据需要零次或多次调用 transform.push()
方法。完成刷新操作后,必须调用 callback
函数。
transform._flush()
方法以一个下划线为前缀,因为它对于定义它的类是内部的,并且永远不应由用户程序直接调用。
transform._transform(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any> 要转换的Buffer
,从传递给stream.write()
的string
转换而来。如果流的decodeStrings
选项为false
或流以对象模式运行,则不会转换块,并且块将是传递给stream.write()
的任何内容。encoding
<string> 如果块是字符串,则这是编码类型。如果块是缓冲区,则这是特殊值'buffer'
。在这种情况下忽略它。callback
<Function> 处理完提供的chunk
后要调用的回调函数(可选,带有错误参数和数据)。
应用程序代码不得直接调用此函数。 它应该由子类实现,并且只能由内部 Readable
类方法调用。
所有 Transform
流实现都必须提供一个 _transform()
方法来接受输入并产生输出。transform._transform()
实现处理正在写入的字节,计算输出,然后使用 transform.push()
方法将该输出传递给可读部分。
可以根据要输出的块的结果,零次或多次调用 transform.push()
方法以从单个输入块生成输出。
有可能从任何给定的输入数据块都不生成任何输出。
仅当完全消耗当前块时,才必须调用 callback
函数。如果在处理输入时发生错误,则传递给 callback
的第一个参数必须是 Error
对象,否则为 null
。如果将第二个参数传递给 callback
,则将其转发到 transform.push()
方法,但前提是第一个参数是假值。换句话说,以下是等效的
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform()
方法以一个下划线为前缀,因为它对于定义它的类是内部的,并且永远不应由用户程序直接调用。
transform._transform()
永远不会并行调用;流实现了队列机制,要接收下一个块,必须调用 callback
,可以是同步或异步的。
其他说明#
流与异步生成器和异步迭代器的兼容性#
随着 JavaScript 中对异步生成器和迭代器的支持,异步生成器实际上是现阶段的一流语言级别的流结构。
下面提供了一些使用 Node.js 流与异步生成器和异步迭代器的常见互操作案例。
使用异步迭代器消费可读流#
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
异步迭代器在流上注册一个永久的错误处理程序,以防止任何未处理的销毁后错误。
使用异步生成器创建可读流#
可以使用 Readable.from()
实用方法从异步生成器创建 Node.js 可读流。
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
从异步迭代器管道传输到可写流#
从异步迭代器写入可写流时,请确保正确处理背压和错误。 stream.pipeline()
抽象出对背压和背压相关错误的处理。
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
与旧版本 Node.js 的兼容性#
在 Node.js 0.10 之前,Readable
流接口更简单,但功能更弱,用处也更小。
- 与其等待调用
stream.read()
方法,'data'
事件会立即开始发出。 需要执行一定量的工作来决定如何处理数据的应用程序需要将读取的数据存储到缓冲区中,这样数据就不会丢失。 stream.pause()
方法是建议性的,而不是保证性的。 这意味着即使流处于暂停状态,仍然需要准备好接收'data'
事件。
在 Node.js 0.10 中,添加了 Readable
类。 为了与旧的 Node.js 程序向后兼容,当添加了 'data'
事件处理程序,或者调用了 stream.resume()
方法时,Readable
流会切换到“流动模式”。 效果是,即使不使用新的 stream.read()
方法和 'readable'
事件,也不再需要担心丢失 'data'
块。
虽然大多数应用程序将继续正常运行,但这会在以下条件下引入一种极端情况:
- 未添加
'data'
事件监听器。 - 永远不会调用
stream.resume()
方法。 - 该流未管道传输到任何可写目的地。
例如,考虑以下代码:
// WARNING! BROKEN!
net.createServer((socket) => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
在 Node.js 0.10 之前,传入的消息数据将被简单地丢弃。 但是,在 Node.js 0.10 及更高版本中,套接字将永远保持暂停状态。
这种情况下的解决方法是调用 stream.resume()
方法来开始数据流:
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// Start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了切换到流动模式的新 Readable
流之外,还可以使用 readable.wrap()
方法将 0.10 之前的样式流包装在 Readable
类中。
readable.read(0)
#
在某些情况下,有必要刷新底层可读流机制,而无需实际消耗任何数据。 在这种情况下,可以调用 readable.read(0)
,它将始终返回 null
。
如果内部读取缓冲区低于 highWaterMark
,并且流当前未读取,则调用 stream.read(0)
将触发低级 stream._read()
调用。
虽然大多数应用程序几乎永远不需要这样做,但在 Node.js 中存在这种情况,尤其是在 Readable
流类内部。
readable.push('')
#
不建议使用 readable.push('')
。
将零字节 <string>、 <Buffer>、 <TypedArray> 或 <DataView> 推送到非对象模式下的流具有有趣的副作用。 因为它*是*对 readable.push()
的调用,所以该调用将结束读取过程。 但是,由于该参数是空字符串,因此不会将任何数据添加到可读缓冲区,因此用户无法消费。
调用 readable.setEncoding()
后 highWaterMark
不一致#
使用 readable.setEncoding()
将更改 highWaterMark
在非对象模式下的运行方式。
通常,当前缓冲区的大小是根据 字节 中的 highWaterMark
来衡量的。 但是,在调用 setEncoding()
之后,比较函数将开始以 字符 来衡量缓冲区的大小。
这在 latin1
或 ascii
的常见情况下不是问题。 但建议在使用可能包含多字节字符的字符串时注意此行为。