Node.js v21.7.2 文档
- Node.js v21.7.2
-
► 目录
- 流
- 本文档的组织结构
- 流的类型
- 流使用者 API
- 可写流
- 类:
stream.Writable
- 事件:
'close'
- 事件:
'drain'
- 事件:
'error'
- 事件:
'finish'
- 事件:
'pipe'
- 事件:
'unpipe'
writable.cork()
writable.destroy([error])
writable.closed
writable.destroyed
writable.end([chunk[, encoding]][, callback])
writable.setDefaultEncoding(encoding)
writable.uncork()
writable.writable
writable.writableAborted
writable.writableEnded
writable.writableCorked
writable.errored
writable.writableFinished
writable.writableHighWaterMark
可写流的 writableLength
可写流的 writableNeedDrain
可写流的 writableObjectMode
可写流的 write(chunk[, encoding][, callback])
- 事件:
- 类:
- 可读流
- 两种读取模式
- 三种状态
- 选择一种 API 样式
- 类:
stream.Readable
- 事件:
'close'
- 事件:
'data'
- 事件:
'end'
- 事件:
'error'
- 事件:
'pause'
- 事件:
'readable'
- 事件:
'resume'
可读流的 destroy([error])
可读流的 closed
可读流的 destroyed
可读流的 isPaused()
可读流的 pause()
可读流的 pipe(destination[, options])
可读流的 read([size])
可读流的 readable
可读流的 readableAborted
可读流的 readableDidRead
可读流的 readableEncoding
可读流的 readableEnded
可读流的 errored
可读流的 readableFlowing
可读流的 readableHighWaterMark
可读流的 readableLength
可读流的 readableObjectMode
可读流的 resume()
可读流的 setEncoding(encoding)
可读流的 unpipe([destination])
可读流的 unshift(chunk[, encoding])
可读流的 wrap(stream)
可读流的 [Symbol.asyncIterator]()
可读流的 [Symbol.asyncDispose]()
可读流的 compose(stream[, options])
可读流的 iterator([options])
可读流的 map(fn[, options])
可读流的 filter(fn[, options])
可读流的 forEach(fn[, options])
可读流的 toArray([options])
可读流的 some(fn[, options])
可读流的 find(fn[, options])
可读流的 every(fn[, options])
可读流的 flatMap(fn[, options])
可读流的 drop(limit[, options])
可读流的 take(limit[, options])
可读流的 reduce(fn[, initial[, options]])
- 事件:
- 双工流和转换流
流的 finished(stream[, options], callback)
流的 pipeline(source[, ...transforms], destination, callback)
流的 pipeline(streams, callback)
流的 compose(...streams)
流的 Readable.from(iterable[, options])
流的 Readable.fromWeb(readableStream[, options])
流的 Readable.isDisturbed(stream)
流的 isErrored(stream)
流的 isReadable(stream)
流的 Readable.toWeb(streamReadable[, options])
流的 Writable.fromWeb(writableStream[, options])
流的 Writable.toWeb(streamWritable)
流的 Duplex.from(src)
流的 Duplex.fromWeb(pair[, options])
流的 Duplex.toWeb(streamDuplex)
流的 addAbortSignal(signal, stream)
流的 getDefaultHighWaterMark(objectMode)
stream.setDefaultHighWaterMark(objectMode, value)
- 可写流
- 流实现者的 API
- 其他说明
- 流
-
► 索引
- 断言测试
- 异步上下文跟踪
- 异步钩子
- 缓冲区
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入器 API
- 子进程
- 集群
- 命令行选项
- 控制台
- Corepack
- 加密
- 调试器
- 已弃用的 API
- 诊断通道
- DNS
- 域
- 错误
- 事件
- 文件系统
- 全局变量
- HTTP
- HTTP/2
- HTTPS
- 检查器
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:module
API - 模块:包
- 网络
- 操作系统
- 路径
- 性能钩子
- 权限
- 进程
- Punycode
- 查询字符串
- 读取行
- REPL
- 报告
- 单一可执行应用程序
- 流
- 字符串解码器
- 测试运行器
- 计时器
- TLS/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用工具
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- 工作线程
- Zlib
- ► 其他版本
- ► 选项
Stream[源代码]#
源代码: lib/stream.js
流是用于处理 Node.js 中流数据的一种抽象接口。node:stream
模块提供了一个用于实现流接口的 API。
Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求和 process.stdout
都是流实例。
流可以是可读的、可写的或两者兼有。所有流都是 EventEmitter
的实例。
访问 node:stream
模块
const stream = require('node:stream');
node:stream
模块对于创建新类型的流实例很有用。通常无需使用 node:stream
模块来使用流。
本文档的组织#
本文档包含两个主要部分和一个用于说明的第三部分。第一部分解释如何在应用程序中使用现有流。第二部分解释如何创建新类型的流。
流类型#
Node.js 中有四种基本流类型
Writable
:可以向其中写入数据的流(例如,fs.createWriteStream()
)。Readable
:可以从中读取数据的流(例如,fs.createReadStream()
)。Duplex
:同时是Readable
和Writable
的流(例如,net.Socket
)。Transform
:在写入和读取时可以修改或转换数据的Duplex
流(例如,zlib.createDeflate()
)。
此外,此模块还包括实用程序函数 stream.pipeline()
、stream.finished()
、stream.Readable.from()
和 stream.addAbortSignal()
。
流 Promises API#
stream/promises
API 为流提供了一组替代的异步实用程序函数,这些函数返回 Promise
对象,而不是使用回调。可以通过 require('node:stream/promises')
或 require('node:stream').promises
访问该 API。
stream.pipeline(source[, ...transforms], destination[, options])
#
stream.pipeline(streams[, options])
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 返回:<Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 返回:<Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 返回:<Promise> | <AsyncIterable>
options
<Object>signal
<AbortSignal>end
<boolean>
- 返回:<Promise> 管道完成后兑现。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
要使用 AbortSignal
,请将其作为最后一个参数传递到 options 对象中。当信号中止时,将使用 AbortError
对底层管道调用 destroy
。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
pipeline
API 还支持异步生成器
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
请记住处理传递到异步生成器的 signal
参数。尤其是在异步生成器是管道源(即第一个参数)或管道永远不会完成的情况下。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
pipeline
API 提供 回调版本
stream.finished(stream[, options])
#
stream
<Stream>options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
: <AbortSignal> | <undefined>
- 返回:<Promise> 当流不再可读或可写时兑现。
const { finished } = require('node:stream/promises');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
finished
API 还提供了一个 回调版本。
对象模式#
Node.js API 创建的所有流仅在字符串和 Buffer
(或 Uint8Array
)对象上运行。但是,流实现有可能处理其他类型的 JavaScript 值(除了 null
,它在流中具有特殊用途)。此类流被认为以“对象模式”运行。
在创建流时,使用 objectMode
选项将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的。
缓冲#
Writable
和 Readable
流都将在内部缓冲区中存储数据。
可能缓冲的数据量取决于传递给流构造函数的 highWaterMark
选项。对于普通流,highWaterMark
选项指定 字节总数。对于以对象模式运行的流,highWaterMark
指定对象总数。
当实现调用 stream.push(chunk)
时,数据会缓冲在 Readable
流中。如果流的使用者不调用 stream.read()
,数据将保存在内部队列中,直到被使用。
一旦内部读取缓冲区的总大小达到 highWaterMark
指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据可以被使用(即,流将停止调用用于填充读取缓冲区的内部 readable._read()
方法)。
当反复调用 writable.write(chunk)
方法时,数据会缓冲在 Writable
流中。当内部写入缓冲区的总大小低于 highWaterMark
设置的阈值时,对 writable.write()
的调用将返回 true
。一旦内部缓冲区的大小达到或超过 highWaterMark
,将返回 false
。
stream
API 的一个主要目标,特别是 stream.pipe()
方法,是将数据的缓冲限制在可接受的水平,以便速度不同的源和目标不会占用可用内存。
highWaterMark
选项是一个阈值,而不是一个限制:它决定了在流停止请求更多数据之前缓冲的数据量。它通常不会强制执行严格的内存限制。特定的流实现可以选择执行更严格的限制,但这样做是可选的。
由于 Duplex
和 Transform
流都是 Readable
和 Writable
,因此每个流都维护两个用于读写操作的单独内部缓冲区,允许每一方独立于另一方操作,同时保持适当且高效的数据流。例如,net.Socket
实例是 Duplex
流,其 Readable
一侧允许消耗从套接字接收的数据,而其 Writable
一侧允许将数据写入到套接字。由于写入套接字的数据速度可能比接收数据的速度快或慢,因此每一方都应该独立于另一方操作(和缓冲)。
内部缓冲区的机制是内部实现细节,并且可以随时更改。但是,对于某些高级实现,可以使用 writable.writableBuffer
或 readable.readableBuffer
检索内部缓冲区。不建议使用这些未记录的属性。
流使用者 API#
几乎所有 Node.js 应用程序,无论多么简单,都会以某种方式使用流。以下是 Node.js 应用程序中使用流的一个示例,该应用程序实现了 HTTP 服务器
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Writable
流(例如示例中的 res
)公开诸如 write()
和 end()
等方法,用于将数据写入流。
Readable
流使用 EventEmitter
API 在有数据可以从流中读取时通知应用程序代码。可以通过多种方式从流中读取可用数据。
Writable
和 Readable
流都以各种方式使用 EventEmitter
API 来传达流的当前状态。
Duplex
和 Transform
流既是 Writable
又是 Readable
。
向流写入数据或从流中使用数据的应用程序不需要直接实现流接口,并且通常没有理由调用 require('node:stream')
。
希望实现新类型的流的开发人员应参考 流实现者的 API 部分。
可写流#
可写流是对向其写入数据的目标的抽象。
Writable
流的示例包括
其中一些示例实际上是实现 Writable
接口的 Duplex
流。
所有 Writable
流都实现由 stream.Writable
类定义的接口。
虽然 Writable
流的特定实例可能以各种方式有所不同,但所有 Writable
流都遵循与以下示例中说明的相同基本使用模式
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
类:stream.Writable
#
事件:'close'
#
当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close'
事件。该事件表示不会再发出更多事件,也不会再进行进一步的计算。
如果使用 emitClose
选项创建 Writable
流,则该流将始终发出 'close'
事件。
事件:'drain'
#
如果对 stream.write(chunk)
的调用返回 false
,则当恢复向流写入数据时,将发出 'drain'
事件。
// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
}
事件:'error'
#
如果在写入或管道传输数据时发生错误,则会发出 'error'
事件。调用时,侦听器回调会传递一个 Error
参数。
在发出 'error'
事件时,流将关闭,除非在创建流时将 autoDestroy
选项设置为 false
。
在 'error'
之后,不应 再发出 'close'
之外的任何其他事件(包括 'error'
事件)。
事件:'finish'
#
在调用 stream.end()
方法,并且所有数据已刷新到底层系统后,会发出 'finish'
事件。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
事件:'pipe'
#
src
<stream.Readable> 流向此可写流的源流
当对可读流调用 stream.pipe()
方法,将此可写流添加到其目标集合时,会发出 'pipe'
事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
事件:'unpipe'
#
src
<stream.Readable> 取消管道此可写流的源流
当对 Readable
流调用 stream.unpipe()
方法时,将触发 'unpipe'
事件,从其目标集中移除此 Writable
。
当 Readable
流管道到此 Writable
流时,如果此 Writable
流触发错误,也会触发此事件。
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Something has stopped piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()
#
writable.cork()
方法强制将所有写入数据缓存在内存中。当调用 stream.uncork()
或 stream.end()
方法时,将刷新缓存数据。
writable.cork()
的主要目的是解决快速连续写入多个小块到流的情况。writable.cork()
不会立即将这些块转发到底层目标,而是会将所有块缓冲起来,直到调用 writable.uncork()
,如果存在,则会将所有块传递给 writable._writev()
。这可以防止首行阻塞的情况,即在等待处理第一个小块时对数据进行缓冲。但是,如果未实现 writable._writev()
就使用 writable.cork()
,可能会对吞吐量产生不利影响。
另请参阅:writable.uncork()
、writable._writev()
。
writable.destroy([error])
#
销毁流。可以选择触发 'error'
事件,并触发 'close'
事件(除非将 emitClose
设置为 false
)。调用此方法后,可写流已结束,随后对 write()
或 end()
的调用将导致 ERR_STREAM_DESTROYED
错误。这是一种破坏性的立即销毁流的方法。对 write()
的先前调用可能尚未完成,并可能触发 ERR_STREAM_DESTROYED
错误。如果数据应在关闭前刷新,请使用 end()
而不是 destroy()
,或在销毁流之前等待 'drain'
事件。
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
一旦调用 destroy()
,任何进一步的调用都将是 no-op,并且除了来自 _destroy()
的错误外,不会再触发任何错误作为 'error'
。
实现者不应覆盖此方法,而应实现 writable._destroy()
。
writable.closed
#
在发出“关闭”后为 true
。
writable.destroyed
#
在调用 writable.destroy()
后为 true
。
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])
#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 要写入的可选数据。对于不在对象模式下运行的流,chunk
必须是字符串、Buffer
或Uint8Array
。对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> 如果chunk
是字符串,则为编码callback
<Function> 当流完成时的回调。- 返回:<this>
调用 writable.end()
方法表示不会再向 Writable
写入更多数据。可选的 chunk
和 encoding
参数允许在关闭流之前立即写入最后一部分附加数据。
在调用 stream.end()
后调用 stream.write()
方法将引发错误。
// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.setDefaultEncoding(encoding)
#
writable.setDefaultEncoding()
方法为 Writable
流设置默认 encoding
。
writable.uncork()
#
writable.uncork()
方法刷新自调用 stream.cork()
以来缓冲的所有数据。
当使用 writable.cork()
和 writable.uncork()
管理流的写入缓冲时,使用 process.nextTick()
推迟对 writable.uncork()
的调用。这样做允许对给定 Node.js 事件循环阶段内发生的全部 writable.write()
调用进行批处理。
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
如果对流多次调用 writable.cork()
方法,则必须对 writable.uncork()
调用相同次数才能刷新缓冲数据。
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// The data will not be flushed until uncork() is called a second time.
stream.uncork();
});
另请参阅:writable.cork()
。
writable.writable
#
如果可以安全调用 writable.write()
,则为 true
,这意味着流未被销毁、出错或结束。
writable.writableAborted
#
返回在发出 'finish'
之前流是否被销毁或出错。
writable.writableEnded
#
在调用 writable.end()
之后为 true
。此属性不表示数据是否已刷新,为此请改用 writable.writableFinished
。
writable.writableCorked
#
完全取消流封存所需调用 writable.uncork()
的次数。
writable.errored
#
如果流已因错误而销毁,则返回错误。
writable.writableFinished
#
在发出 'finish'
事件之前立即设置为 true
。
writable.writableHighWaterMark
#
返回在创建此 Writable
时传递的 highWaterMark
的值。
writable.writableLength
#
此属性包含准备写入的队列中的字节(或对象)数。该值提供有关 highWaterMark
状态的内省数据。
writable.writableNeedDrain
#
如果流的缓冲区已满且流将发出 'drain'
,则为 true
。
writable.writableObjectMode
#
给定 Writable
流的 objectMode
属性的 getter。
writable.write(chunk[, encoding][, callback])
#
chunk
<string> | <Buffer> | <Uint8Array> | <any> 要写入的可选数据。对于不在对象模式下运行的流,chunk
必须是字符串、Buffer
或Uint8Array
。对于对象模式流,chunk
可以是除null
之外的任何 JavaScript 值。encoding
<string> | <null> 编码,如果chunk
是一个字符串。默认值:'utf8'
callback
<Function> 当这一块数据被刷新时调用的回调。- 返回:<boolean>
false
如果流希望调用代码等待'drain'
事件被发出,然后再继续写入额外数据;否则为true
。
writable.write()
方法将一些数据写入流,并在数据被完全处理后调用提供的 callback
。如果发生错误,则 callback
将被调用,错误作为其第一个参数。callback
是异步调用的,并且在 'error'
发出之前。
如果在接纳 chunk
后,内部缓冲区小于创建流时配置的 highWaterMark
,则返回值为 true
。如果返回 false
,则应停止进一步尝试向流中写入数据,直到 'drain'
事件发出。
当流未被清空时,对 write()
的调用将缓冲 chunk
,并返回 false。一旦所有当前缓冲的块被清空(被操作系统接受传递),'drain'
事件将被发出。一旦 write()
返回 false,请勿再写入更多块,直到 'drain'
事件发出。虽然允许在未清空的流上调用 write()
,但 Node.js 将缓冲所有写入的块,直到达到最大内存使用量,此时它将无条件中止。即使在中止之前,高内存使用量也会导致垃圾回收器性能不佳和较高的 RSS(通常不会在不再需要内存后释放回系统)。由于如果远程对等方不读取数据,则 TCP 套接字可能永远不会被清空,因此写入未被清空的套接字可能会导致远程可利用的漏洞。
当流未被清空时写入数据对于 Transform
来说尤其成问题,因为 Transform
流在默认情况下被暂停,直到它们被管道化或添加了 'data'
或 'readable'
事件处理程序。
如果要写入的数据可以按需生成或获取,建议将逻辑封装到 Readable
中,并使用 stream.pipe()
。但是,如果更喜欢调用 write()
,则可以使用 'drain'
事件来尊重反压并避免内存问题
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Wait for cb to be called before doing any other write.
write('hello', () => {
console.log('Write completed, do more writes now.');
});
对象模式中的Writable
流始终会忽略encoding
参数。
可读流#
可读流是从其中使用数据的一个源的抽象。
Readable
流的示例包括
所有 Readable
流都实现由 stream.Readable
类定义的接口。
两种读取模式#
Readable
流实际上以两种模式之一运行:流动和暂停。这些模式与 对象模式 是分开的。Readable
流可以处于对象模式或不处于对象模式,无论它处于流动模式还是暂停模式。
-
在流动模式中,数据会自动从底层系统读取,并通过
EventEmitter
接口使用事件尽可能快地提供给应用程序。 -
在暂停模式中,必须显式调用
stream.read()
方法才能从流中读取数据块。
所有 Readable
流都以暂停模式开始,但可以通过以下方式之一切换到流动模式
- 添加
'data'
事件处理程序。 - 调用
stream.resume()
方法。 - 调用
stream.pipe()
方法将数据发送到Writable
。
Readable
可以使用以下方式之一切换回暂停模式
- 如果没有管道目的地,则通过调用
stream.pause()
方法。 - 如果有管道目的地,则通过删除所有管道目的地。可以通过调用
stream.unpipe()
方法删除多个管道目的地。
需要记住的重要概念是,Readable
不会生成数据,直到提供使用或忽略该数据的机制。如果使用机制被禁用或取消,Readable
将尝试停止生成数据。
出于向后兼容性的原因,删除 'data'
事件处理程序不会自动暂停流。此外,如果有管道目的地,则调用 stream.pause()
不能保证流在这些目的地耗尽并要求更多数据后保持暂停。
如果将 Readable
切换到流动模式,并且没有可用于处理数据的使用者,那么该数据将丢失。例如,当在未附加到 'data'
事件的侦听器的情况下调用 readable.resume()
方法,或从流中移除 'data'
事件处理程序时,可能会发生这种情况。
添加 'readable'
事件处理程序会自动使流停止流动,并且必须通过 readable.read()
使用数据。如果移除 'readable'
事件处理程序,则如果存在 'data'
事件处理程序,流将再次开始流动。
三种状态#
Readable
流的“两种模式”操作是对 Readable
流实现中发生的更复杂的内部状态管理的简化抽象。
具体来说,在任何给定的时间点,每个 Readable
都处于三种可能状态之一
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
当 readable.readableFlowing
为 null
时,不提供使用流数据的机制。因此,流不会生成数据。在此状态下,附加 'data'
事件的侦听器、调用 readable.pipe()
方法或调用 readable.resume()
方法将把 readable.readableFlowing
切换为 true
,从而导致 Readable
在生成数据时开始主动发出事件。
调用 readable.pause()
、readable.unpipe()
或接收到反压将导致 readable.readableFlowing
设置为 false
,暂时停止事件流动,但不会停止生成数据。在此状态下,附加 'data'
事件的侦听器不会将 readable.readableFlowing
切换为 true
。
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
// readableFlowing is now true.
当 readable.readableFlowing
为 false
时,数据可能会在流的内部缓冲区中累积。
选择一种 API 样式#
Readable
流 API 在多个 Node.js 版本中不断发展,并提供了多种使用流数据的方法。通常,开发人员应选择一种使用数据的方法,并且绝不应使用多种方法从单个流中使用数据。具体来说,组合使用 on('data')
、on('readable')
、pipe()
或异步迭代器可能会导致非直观的行为。
类:stream.Readable
#
事件:'close'
#
当流及其任何底层资源(例如文件描述符)关闭时,将发出 'close'
事件。该事件表示不会再发出更多事件,也不会再进行进一步的计算。
如果使用 emitClose
选项创建 Readable
流,它将始终发出 'close'
事件。
事件:'data'
#
chunk
<Buffer> | <string> | <any> 数据块。对于不以对象模式运行的流,块将是字符串或Buffer
。对于以对象模式运行的流,块可以是除null
之外的任何 JavaScript 值。
每当流将数据块的所有权放弃给使用者时,都会发出 'data'
事件。这可能发生在通过调用 readable.pipe()
、readable.resume()
或为 'data'
事件附加侦听器回调函数,将流切换到流动模式时。每当调用 readable.read()
方法并且有数据块可供返回时,也会发出 'data'
事件。
为尚未明确暂停的流附加 'data'
事件侦听器将流切换到流动模式。然后,数据将在可用时立即传递。
如果已使用 readable.setEncoding()
方法为流指定默认编码,则侦听器回调函数将接收字符串形式的数据块;否则,数据将作为 Buffer
传递。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
事件:'end'
#
当流中没有更多数据可供使用时,将发出 'end'
事件。
除非完全使用数据,否则 不会发出 'end'
事件。这可以通过将流切换到流动模式或重复调用 stream.read()
直到使用所有数据来实现。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
事件:'error'
#
Readable
实现可以在任何时候发出 'error'
事件。通常,如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效的数据块时,可能会发生这种情况。
侦听器回调函数将接收一个 Error
对象。
事件:'pause'
#
当调用 stream.pause()
且 readableFlowing
不为 false
时,会触发 'pause'
事件。
事件:'readable'
#
当流中存在可读数据或已到达流的末尾时,会触发 'readable'
事件。实际上,'readable'
事件表示流中有新信息。如果存在数据,stream.read()
将返回该数据。
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
如果已到达流的末尾,调用 stream.read()
将返回 null
并触发 'end'
事件。如果从未有过任何可读数据,也会触发此事件。例如,在以下示例中,foo.txt
是一个空文件
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
运行此脚本的输出为
$ node test.js
readable: null
end
在某些情况下,为 'readable'
事件附加监听器会导致将一定量的数据读入内部缓冲区。
通常,readable.pipe()
和 'data'
事件机制比 'readable'
事件更容易理解。但是,处理 'readable'
可能会提高吞吐量。
如果同时使用 'readable'
和 'data'
,'readable'
在控制流方面具有优先权,即只有在调用 stream.read()
时才会触发 'data'
。readableFlowing
属性将变为 false
。如果在移除 'readable'
时存在 'data'
监听器,则流将开始流动,即无需调用 .resume()
即可触发 'data'
事件。
事件:'resume'
#
当调用 stream.resume()
且 readableFlowing
不为 true
时,会触发 'resume'
事件。
readable.destroy([error])
#
销毁流。可以选择发出一个'error'
事件,并发出一个'close'
事件(除非emitClose
设置为false
)。在此调用后,可读流将释放所有内部资源,并且对push()
的后续调用将被忽略。
一旦调用 destroy()
,任何进一步的调用都将是 no-op,并且除了来自 _destroy()
的错误外,不会再触发任何错误作为 'error'
。
实现者不应覆盖此方法,而应实现readable._destroy()
。
readable.closed
#
在发出“关闭”后为 true
。
readable.destroyed
#
在调用readable.destroy()
后为true
。
readable.isPaused()
#
- 返回:<boolean>
readable.isPaused()
方法返回Readable
的当前操作状态。这主要由readable.pipe()
方法底层的机制使用。在大多数典型情况下,没有理由直接使用此方法。
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
#
- 返回:<this>
readable.pause()
方法将导致处于流动模式的流停止发出'data'
事件,退出流动模式。任何可用的数据都将保留在内部缓冲区中。
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
如果存在'readable'
事件侦听器,则readable.pause()
方法不起作用。
readable.pipe(destination[, options])
#
destination
<stream.Writable> 写入数据的目标options
<Object> 管道选项end
<boolean> 在读取器结束时结束写入器。默认:true
。
- 返回:<stream.Writable>目标,如果它是
Duplex
或Transform
流,则允许管道链
readable.pipe()
方法将Writable
流附加到readable
,导致它自动切换到流动模式并将所有数据推送到附加的Writable
。数据的流将自动管理,以便目标Writable
流不会被更快的Readable
流淹没。
以下示例将所有数据从 readable
传输到名为 file.txt
的文件中
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
可以将多个 Writable
流附加到单个 Readable
流。
readable.pipe()
方法返回对目标流的引用,从而可以设置管道流链
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
默认情况下,当源 Readable
流发出 'end'
时,stream.end()
会在目标 Writable
流上调用,以便目标不再可写。要禁用此默认行为,可以将 end
选项传递为 false
,导致目标流保持打开状态
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
一个重要的注意事项是,如果 Readable
流在处理期间发出错误,则 不会自动关闭 Writable
目标。如果发生错误,则需要手动关闭每个流,以防止内存泄漏。
无论指定了哪些选项,process.stderr
和 process.stdout
Writable
流在 Node.js 进程退出之前都不会关闭。
readable.read([size])
#
readable.read()
方法从内部缓冲区中读取数据并返回它。如果没有可读取的数据,则返回 null
。默认情况下,数据作为 Buffer
对象返回,除非使用 readable.setEncoding()
方法指定了编码或流在对象模式下运行。
可选的 size
参数指定要读取的特定字节数。如果无法读取 size
字节,则将返回 null
,除非流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。
如果未指定 size
参数,则将返回内部缓冲区中包含的所有数据。
size
参数必须小于或等于 1 GiB。
readable.read()
方法只应在暂停模式下运行的 Readable
流上调用。在流动模式下,readable.read()
会自动调用,直到内部缓冲区完全耗尽。
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
每次调用 readable.read()
都会返回一个数据块,或 null
。这些数据块不会连接起来。需要一个 while
循环来使用缓冲区中当前的所有数据。读取大文件时,.read()
可能会返回 null
,因为它已经使用完了到目前为止所有缓冲的内容,但仍有更多数据尚未缓冲。在这种情况下,当缓冲区中有更多数据时,将发出一个新的 'readable'
事件。最后,当没有更多数据时,将发出 'end'
事件。
因此,要从 readable
读取文件的所有内容,需要跨多个 'readable'
事件收集数据块
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
对象模式中的 Readable
流将始终从对 readable.read(size)
的调用返回单个项,而不管 size
参数的值如何。
如果 readable.read()
方法返回一个数据块,还将发出一个 'data'
事件。
在发出 'end'
事件后调用 stream.read([size])
将返回 null
。不会引发运行时错误。
readable.readable
#
如果调用 readable.read()
是安全的,则为 true
,这意味着流尚未被销毁或发出 'error'
或 'end'
。
readable.readableAborted
#
返回在发出 'end'
之前流是否被销毁或出错。
readable.readableDidRead
#
返回是否已发出 'data'
。
readable.readableEncoding
#
给定 Readable
流的属性 encoding
的 Getter。可以使用 readable.setEncoding()
方法设置 encoding
属性。
readable.readableEnded
#
当 'end'
事件发出时,变为 true
。
readable.errored
#
如果流已因错误而销毁,则返回错误。
readable.readableFlowing
#
此属性反映了 三个状态 部分中描述的 Readable
流的当前状态。
readable.readableHighWaterMark
#
返回创建此 Readable
时传递的 highWaterMark
的值。
readable.readableLength
#
此属性包含准备就绪的队列中的字节(或对象)数。此值提供有关 highWaterMark
状态的内省数据。
readable.readableObjectMode
#
给定 Readable
流的属性 objectMode
的 Getter。
readable.resume()
#
- 返回:<this>
readable.resume()
方法使显式暂停的 Readable
流继续发出 'data'
事件,将流切换到流动模式。
readable.resume()
方法可用于完全使用流中的数据,而无需实际处理任何数据
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
如果存在 'readable'
事件侦听器,则 readable.resume()
方法不起作用。
readable.setEncoding(encoding)
#
readable.setEncoding()
方法设置从 Readable
流中读取数据的字符编码。
默认情况下,不分配任何编码,流数据将作为 Buffer
对象返回。设置编码会导致流数据作为指定编码的字符串返回,而不是作为 Buffer
对象返回。例如,调用 readable.setEncoding('utf8')
将导致输出数据被解释为 UTF-8 数据,并作为字符串传递。调用 readable.setEncoding('hex')
将导致数据以十六进制字符串格式编码。
Readable
流将正确处理通过流传递的多字节字符,否则这些字符如果只是作为 Buffer
对象从流中提取出来,就会被不正确地解码。
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
readable.unpipe([destination])
#
destination
<stream.Writable> 可选要取消管道的特定流- 返回:<this>
readable.unpipe()
方法分离之前使用 stream.pipe()
方法附加的 Writable
流。
如果未指定 destination
,则分离所有管道。
如果指定了 destination
,但未为其设置管道,则该方法不执行任何操作。
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])
#
chunk
<Buffer> | <Uint8Array> | <string> | <null> | <any> 要移入读取队列的数据块。对于不在对象模式下运行的流,chunk
必须是字符串、Buffer
、Uint8Array
或null
。对于对象模式流,chunk
可以是任何 JavaScript 值。编码
<string> 字符串块的编码。必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。
将块
作为null
传递表示流的结尾(EOF),其行为与readable.push(null)
相同,之后无法再写入更多数据。EOF 信号放在缓冲区的末尾,任何缓冲数据仍会被刷新。
readable.unshift()
方法将数据块推回到内部缓冲区。这在某些情况下很有用,例如当流被需要“取消使用”已从源中乐观提取的某些数据量的代码使用时,以便可以将数据传递给其他方。
在发出'end'
事件或抛出运行时错误后,不能调用stream.unshift(chunk)
方法。
使用stream.unshift()
的开发人员通常应该考虑改用Transform
流。有关更多信息,请参阅流实现者的 API部分。
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
与stream.push(chunk)
不同,stream.unshift(chunk)
不会通过重置流的内部读取状态来结束读取过程。如果在读取期间(即从自定义流上的stream._read()
实现中)调用readable.unshift()
,这可能会导致意外结果。在调用readable.unshift()
之后立即stream.push('')
将适当地重置读取状态,但是最好在执行读取过程中避免调用readable.unshift()
。
readable.wrap(stream)
#
在 Node.js 0.10 之前,流并未实现整个node:stream
模块 API,如其当前定义。(有关更多信息,请参阅兼容性。)
在使用发出'data'
事件且具有仅建议性的stream.pause()
方法的较旧 Node.js 库时,可以使用readable.wrap()
方法创建一个Readable
流,该流使用旧流作为其数据源。
很少需要使用readable.wrap()
,但已提供该方法以方便与较旧的 Node.js 应用程序和库进行交互。
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()
#
- 返回:<AsyncIterator> 以完全消耗流。
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
如果循环以 break
、return
或 throw
终止,则流将被销毁。换句话说,遍历流将完全消耗流。流将以等于 highWaterMark
选项的大小分块读取。在上面的代码示例中,如果文件的数据少于 64 KiB,则数据将位于单个块中,因为没有向 fs.createReadStream()
提供 highWaterMark
选项。
readable[Symbol.asyncDispose]()
#
使用 AbortError
调用 readable.destroy()
,并在流完成时返回一个承诺。
readable.compose(stream[, options])
#
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Duplex> 与流
stream
复合的流。
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
有关更多信息,请参见 stream.compose
。
readable.iterator([options])
#
options
<Object>destroyOnReturn
<boolean> 设置为false
时,对异步迭代器调用return
,或使用break
、return
或throw
退出for await...of
迭代不会销毁流。默认值:true
。
- 返回:<AsyncIterator> 用于使用流。
此方法创建的迭代器使用户可以选择在 for await...of
循环通过 return
、break
或 throw
退出时取消销毁流,或者如果流在迭代期间发出错误,则迭代器应销毁流。
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])
#
fn
<Function> | <AsyncFunction> 一个函数,用于映射流中的每个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。highWaterMark
<number> 在等待用户使用映射项时要缓冲多少项。默认值:concurrency * 2 - 1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Readable> 使用函数
fn
映射的流。
此方法允许映射流。fn
函数将对流中的每个块调用。如果 fn
函数返回一个 Promise,则在将该 Promise 传递给结果流之前,将 await
该 Promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
readable.filter(fn[, options])
#
fn
<Function> | <AsyncFunction> 一个函数,用于从流中筛选块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。highWaterMark
<number> 在等待用户使用经过筛选的项目时要缓冲多少个项目。默认值:concurrency * 2 - 1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Readable> 使用谓词
fn
筛选的流。
此方法允许筛选流。对于流中的每个块,将调用 fn
函数,如果它返回真值,则该块将传递到结果流。如果 fn
函数返回一个 Promise,则将 await
该 Promise。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
readable.forEach(fn[, options])
#
fn
<Function> | <AsyncFunction> 在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Promise> 流完成时的 Promise。
此方法允许迭代流。对于流中的每个块,将调用 fn
函数。如果 fn
函数返回一个 Promise,则将 await
该 Promise。
此方法与 for await...of
循环不同,因为它可以选择同时处理块。此外,只有通过传递 signal
选项并在 for await...of
可以使用 break
或 return
停止的情况下中止相关的 AbortController
,才能停止 forEach
迭代。在任何一种情况下,流都将被销毁。
此方法与侦听 'data'
事件不同,因为它使用底层机制中的 readable
事件,并且可以限制同时进行的 fn
调用的数量。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Logs result, similar to `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Stream has finished
readable.toArray([options])
#
options
<Object>signal
<AbortSignal> 允许在中止信号时取消 toArray 操作。
- 返回:<Promise> 包含流内容的数组的 Promise。
此方法允许轻松获取流的内容。
由于此方法将整个流读入内存,因此它否定了流的优点。它旨在用于互操作性和便利性,而不是作为使用流的主要方式。
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])
#
fn
<Function> | <AsyncFunction> 在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Promise> 一个 promise,如果
fn
为至少一个块返回了一个真值,则求值为true
。
此方法类似于 Array.prototype.some
,并且在流中的每个块上调用 fn
,直到等待的返回值为 true
(或任何真值)。一旦块上的 fn
调用的等待返回值为真值,流就会被销毁,并且 promise 会使用 true
兑现。如果块上的任何 fn
调用都没有返回真值,则 promise 会使用 false
兑现。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.find(fn[, options])
#
fn
<Function> | <AsyncFunction> 在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Promise> 一个 promise,求值为
fn
求值为真值的第一个块,如果没有找到元素,则求值为undefined
。
此方法类似于 Array.prototype.find
,并且在流中的每个块上调用 fn
以查找 fn
为真值的块。一旦 fn
调用的等待返回值为真值,流就会被销毁,并且 promise 会使用 fn
返回真值的值兑现。如果块上的所有 fn
调用都返回假值,则 promise 会使用 undefined
兑现。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
readable.every(fn[, options])
#
fn
<Function> | <AsyncFunction> 在流的每个块上调用的函数。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Promise> 一个 promise,如果
fn
为所有块返回了一个真值,则求值为true
。
此方法类似于 Array.prototype.every
,它对流中的每个块调用 fn
,以检查 fn
的所有等待返回的值是否为真值。一旦块上的 fn
调用等待的返回值为假值,流就会被销毁,并且 promise 会使用 false
兑现。如果块上的所有 fn
调用都返回真值,则 promise 会使用 true
兑现。
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
readable.flatMap(fn[, options])
#
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> 一个函数,用于映射流中的每个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
options
<Object>concurrency
<number> 在流上同时调用fn
的最大并发数。默认值:1
。signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Readable> 一个使用函数
fn
进行扁平映射的流。
此方法通过对流的每个块应用给定的回调,然后扁平化结果来返回一个新流。
可以从 fn
返回一个流或另一个可迭代对象或异步可迭代对象,并且结果流将合并(扁平化)到返回的流中。
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
readable.drop(limit[, options])
#
limit
<number> 要从可读流中丢弃的块数。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Readable> 一个丢弃了
limit
个块的流。
此方法返回一个新的流,其中丢弃了前 limit
个块。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])
#
limit
<number> 要从可读流中获取的块数。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Readable> 一个获取了
limit
个块的流。
此方法返回一个新的流,其中包含前 limit
个块。
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])
#
fn
<Function> | <AsyncFunction> 一个还原函数,用于调用流中的每个块。previous
<any> 从对fn
的上次调用获得的值,或者如果指定了initial
值,则为initial
值,否则为流的第一个块。data
<any> 来自流的数据块。options
<Object>signal
<AbortSignal> 如果流被销毁,则中止,允许提前中止fn
调用。
initial
<any> 要在还原中使用的初始值。options
<Object>signal
<AbortSignal> 允许在信号中止时销毁流。
- 返回:<Promise> 一个 promise,用于还原的最终值。
此方法按顺序对流中的每个块调用 fn
,并向其传递对前一个元素计算的结果。它返回一个 promise,用于还原的最终值。
如果没有提供initial
值,则流的第一块将用作初始值。如果流为空,则会用具有ERR_INVALID_ARGS
代码属性的TypeError
拒绝该 Promise。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
reducer 函数逐个元素迭代流,这意味着没有concurrency
参数或并行性。要并发执行reduce
,可以将异步函数提取到readable.map
方法中。
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
双工和转换流#
类:stream.Duplex
#
双工流是同时实现Readable
和Writable
接口的流。
Duplex
流的示例包括
duplex.allowHalfOpen
#
如果为false
,则当可读端结束时,流将自动结束可写端。最初由allowHalfOpen
构造函数选项设置,默认为true
。
这可以手动更改以更改现有Duplex
流实例的半开行为,但必须在发出'end'
事件之前进行更改。
类:stream.Transform
#
转换流是输出在某种程度上与输入相关的Duplex
流。与所有Duplex
流一样,Transform
流同时实现Readable
和Writable
接口。
Transform
流的示例包括
transform.destroy([error])
#
销毁流,并可以选择发出'error'
事件。在此调用之后,转换流将释放任何内部资源。实现者不应覆盖此方法,而应实现readable._destroy()
。Transform
的_destroy()
的默认实现还会发出'close'
,除非emitClose
设置为 false。
调用destroy()
后,任何进一步的调用都将是 no-op,并且不会将_destroy()
之外的任何进一步错误作为'error'
发出。
stream.finished(stream[, options], callback)
#
stream
<Stream> | <ReadableStream> | <WritableStream> 可读和/或可写流/Web 流。options
<Object>error
<boolean> 如果设置为false
,则不会将对emit('error', err)
的调用视为已完成。默认值:true
。readable
<boolean> 当设置为false
时,即使流可能仍然可读,也会在流结束时调用回调。默认值:true
。writable
<boolean> 当设置为false
时,即使流可能仍然可写,也会在流结束时调用回调。默认值:true
。signal
<AbortSignal> 允许中止对流完成的等待。如果中止信号,则不会中止基础流。回调将使用AbortError
调用。此函数添加的所有已注册侦听器也将被移除。cleanup
<boolean> 移除所有已注册的流侦听器。默认值:false
。
callback
<Function> 一个回调函数,它采用一个可选的错误参数。- 返回:<Function> 一个清除函数,它移除所有已注册的侦听器。
一个函数,用于在流不再可读、可写或遇到错误或过早关闭事件时得到通知。
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Drain the stream.
在流过早销毁(如中止的 HTTP 请求)并且不会发出 'end'
或 'finish'
的错误处理场景中特别有用。
finished
API 提供了 Promise 版本。
在调用 callback
之后,stream.finished()
会留下悬空的事件侦听器(特别是 'error'
、'end'
、'finish'
和 'close'
)。这样做是为了防止意外的 'error'
事件(由于流实现不正确)导致意外崩溃。如果这是不需要的行为,那么需要在回调中调用返回的清除函数
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)
#
stream.pipeline(streams, callback)
#
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- 返回:<AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- 返回:<AsyncIterable> | <Promise>
callback
<Function> 在管道完全完成时调用。err
<Error>val
由destination
返回的Promise
的已解析值。
- 返回:<Stream>
一个模块方法,用于在流和生成器之间进行管道传输,转发错误并正确清理,并在管道完成时提供一个回调。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
},
);
pipeline
API 提供了一个promise 版本。
stream.pipeline()
将在除以下情况之外的所有流上调用 stream.destroy(err)
- 已发出
'end'
或'close'
的Readable
流。 - 已发出
'finish'
或'close'
的Writable
流。
在调用 callback
之后,stream.pipeline()
会在流上保留悬空的事件侦听器。如果在失败后重用流,则会导致事件侦听器泄漏和错误被吞没。如果最后一个流是可读的,则会删除悬空的事件侦听器,以便稍后可以消耗最后一个流。
当引发错误时,stream.pipeline()
会关闭所有流。使用 pipeline
的 IncomingRequest
用法可能会导致意外的行为,因为它会在不发送预期响应的情况下销毁套接字。请参见以下示例
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});
stream.compose(...streams)
#
stream.compose
是实验性的。streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 返回:<stream.Duplex>
将两个或多个流合并为一个 Duplex
流,该流写入第一个流并从最后一个流读取。使用 stream.pipeline
将每个提供的流管道传输到下一个流。如果任何流出错,则所有流都会被销毁,包括外部 Duplex
流。
由于 stream.compose
返回一个新流,该流反过来可以(并且应该)管道传输到其他流,因此它支持组合。相比之下,当将流传递给 stream.pipeline
时,通常第一个流是可读流,最后一个流是可写流,形成一个闭合回路。
如果传递 Function
,则它必须是采用 source
Iterable
的工厂方法。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose
可用于将异步可迭代对象、生成器和函数转换为流。
AsyncIterable
转换为可读Duplex
。不能产生null
。AsyncGeneratorFunction
转换为可读/可写转换Duplex
。必须将源AsyncIterable
作为第一个参数。不能产生null
。AsyncFunction
转换为可写的Duplex
。必须返回null
或undefined
。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
有关 stream.compose
作为运算符的信息,请参见 readable.compose(stream)
。
stream.Readable.from(iterable[, options])
#
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。如果传递空值,则会发出“错误”事件。options
<Object> 提供给new stream.Readable([options])
的选项。默认情况下,Readable.from()
会将options.objectMode
设置为true
,除非明确选择退出,方法是将options.objectMode
设置为false
。- 返回:<stream.Readable>
用于从迭代器创建可读流的实用方法。
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
出于性能原因,调用 Readable.from(string)
或 Readable.from(buffer)
不会迭代字符串或缓冲区以匹配其他流语义。
如果将包含 Promise 的 Iterable
对象作为参数传递,则可能导致未处理的拒绝。
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Readable.fromWeb(readableStream[, options])
#
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回:<stream.Readable>
stream.Readable.isDisturbed(stream)
#
stream
<stream.Readable> | <ReadableStream>- 返回:
boolean
返回流是否已读取或取消。
stream.isErrored(stream)
#
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 返回:<boolean>
返回流是否遇到错误。
stream.isReadable(stream)
#
stream
<Readable> | <Duplex> | <ReadableStream>- 返回:<boolean>
返回流是否可读。
stream.Readable.toWeb(streamReadable[, options])
#
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 在从给定的stream.Readable
中读取时应用反压之前,(创建的ReadableStream
)的最大内部队列大小。如果未提供值,则会从给定的stream.Readable
中获取该值。size
<Function> 给定数据块大小的函数。如果未提供值,则所有块的大小都为1
。
- 返回:<ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
#
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
- 返回:<stream.Writable>
stream.Writable.toWeb(streamWritable)
#
streamWritable
<stream.Writable>- 返回:<WritableStream>
stream.Duplex.from(src)
#
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
用于创建双工流的实用方法。
Stream
将可写流转换为可写的Duplex
,将可读流转换为Duplex
。Blob
转换为可读的Duplex
。string
转换为可读的Duplex
。ArrayBuffer
转换为可读的Duplex
。AsyncIterable
转换为可读Duplex
。不能产生null
。AsyncGeneratorFunction
转换为可读/可写转换Duplex
。必须将源AsyncIterable
作为第一个参数。不能产生null
。AsyncFunction
转换为可写的Duplex
。必须返回null
或undefined
Object ({ writable, readable })
将readable
和writable
转换为Stream
,然后将它们组合到Duplex
中,其中Duplex
将写入writable
并从readable
中读取。Promise
转换为可读的Duplex
。值null
被忽略。ReadableStream
转换为可读的Duplex
。WritableStream
转换为可写的Duplex
。- 返回:<stream.Duplex>
如果将包含 Promise 的 Iterable
对象作为参数传递,则可能导致未处理的拒绝。
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]);
stream.Duplex.fromWeb(pair[, options])
#
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>- 返回:<stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}
const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)
#
streamDuplex
<stream.Duplex>- 返回:<Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)
#
signal
<AbortSignal> 表示可能取消的信号stream
<Stream> | <ReadableStream> | <WritableStream> 要附加信号的流。
将 AbortSignal 附加到可读或可写流。这允许代码使用 AbortController
控制流销毁。
对传递的 AbortSignal
对应的 AbortController
调用 abort
的行为与对流调用 .destroy(new AbortError())
的行为相同,而对于 Web 流,则为 controller.error(new AbortError())
。
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort();
或将 AbortSignal
与可读流一起用作异步可迭代对象
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
或将 AbortSignal
与 ReadableStream 一起使用
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// The operation was cancelled
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)
#
返回流使用的默认 highWaterMark。默认为 16384
(16 KiB),或 objectMode
的 16
。
stream.setDefaultHighWaterMark(objectMode, value)
#
设置流使用的默认 highWaterMark。
面向流实现者的 API#
node:stream
模块 API 的设计目的是使用 JavaScript 的原型继承模型轻松实现流。
首先,流开发者将声明一个新的 JavaScript 类,该类扩展四个基本流类之一(stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
),确保他们调用适当的父类构造函数
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
在扩展流时,请记住在将这些选项转发到基本构造函数之前,用户可以且应该提供哪些选项。例如,如果实现对 autoDestroy
和 emitClose
选项做出假设,则不允许用户覆盖这些选项。明确指出转发哪些选项,而不是隐式地转发所有选项。
然后,新的流类必须实现一个或多个特定方法,具体取决于正在创建的流类型,如下表所示
用例 | 类 | 要实现的方法 |
---|---|---|
只读 | Readable | _read() |
只写 | Writable | _write() 、_writev() 、_final() |
读写 | Duplex | _read() 、_write() 、_writev() 、_final() |
对写入的数据进行操作,然后读取结果 | Transform | _transform() 、_flush() 、_final() |
流的实现代码永远不应调用流的“公共”方法,这些方法供使用者使用(如流使用者 API部分所述)。这样做可能会对使用流的应用程序代码产生不利影响。
避免覆盖公共方法,如 write()
、end()
、cork()
、uncork()
、read()
和 destroy()
,或通过 .emit()
发出内部事件,如 'error'
、'data'
、'end'
、'finish'
和 'close'
。这样做会破坏当前和未来的流不变量,从而导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。
简化构造#
对于许多简单的情况,可以创建流而不依赖于继承。这可以通过直接创建 stream.Writable
、stream.Readable
、stream.Duplex
或 stream.Transform
对象的实例,并将适当的方法作为构造函数选项来完成。
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
},
});
实现可写流#
stream.Writable
类被扩展为实现Writable
流。
自定义 Writable
流必须调用 new stream.Writable([options])
构造函数并实现 writable._write()
和/或 writable._writev()
方法。
new stream.Writable([options])
#
options
<Object>highWaterMark
<number> 当stream.write()
开始返回false
时的缓冲区级别。默认值:16384
(16 KiB),或objectMode
流的16
。decodeStrings
<boolean> 是否将传递给stream.write()
的string
编码为Buffer
(使用stream.write()
调用中指定的编码),然后再将它们传递给stream._write()
。其他类型的数据不会被转换(即Buffer
不会解码为string
)。设置为 false 将阻止转换string
。默认值:true
。defaultEncoding
<string> 当没有将编码作为参数指定给stream.write()
时使用的默认编码。默认值:'utf8'
。objectMode
<boolean>stream.write(anyObj)
是否是有效操作。如果设置,如果流实现支持,则可以写入除字符串、Buffer
或Uint8Array
之外的 JavaScript 值。默认值:false
。emitClose
<boolean> 流在被销毁后是否应该发出'close'
。默认值:true
。write
<Function>stream._write()
方法的实现。writev
<Function>stream._writev()
方法的实现。destroy
<Function>stream._destroy()
方法的实现。final
<Function>stream._final()
方法的实现。construct
<Function>stream._construct()
方法的实现。autoDestroy
<boolean> 此流在结束之后是否应自动调用其自身的.destroy()
。默认值:true
。signal
<AbortSignal> 表示可能取消的信号。
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor.
super(options);
// ...
}
}
或者,在使用前 ES6 样式的构造函数时
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
或者,使用简化的构造函数方法
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
对传递的 AbortSignal
对应的 AbortController
调用 abort
的行为与对可写流调用 .destroy(new AbortError())
的行为相同。
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
writable._construct(callback)
#
callback
<Function> 当流完成初始化时,使用错误参数(可选)调用此函数。
_construct()
方法不得直接调用。它可以由子类实现,如果实现,则只能由内部 Writable
类方法调用。
此可选函数将在流构造函数返回后立即调用,延迟任何 _write()
、_final()
和 _destroy()
调用,直到调用 callback
。这对于在使用流之前初始化状态或异步初始化资源非常有用。
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any> 要写入的Buffer
,从传递给stream.write()
的string
转换而来。如果流的decodeStrings
选项为false
或流在对象模式下运行,则不会转换该块,并且将为传递给stream.write()
的任何内容。encoding
<string> 如果块是一个字符串,则encoding
是该字符串的字符编码。如果块是一个Buffer
,或者如果流在对象模式下操作,则可以忽略encoding
。callback
<Function> 在为提供的块完成处理时调用此函数(可选择使用错误参数)。
所有 Writable
流实现都必须提供一个 writable._write()
和/或 writable._writev()
方法,以将数据发送到底层资源。
Transform
流提供自己的 writable._write()
实现。
此函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Writable
类方法调用。
callback
函数必须在 writable._write()
中同步调用,或者异步调用(即,不同的标记)来表示写入已成功完成或因错误而失败。传递给 callback
的第一个参数必须是 Error
对象(如果调用失败)或 null
(如果写入成功)。
在调用 writable._write()
和调用 callback
之间发生的 writable.write()
的所有调用都将导致写入的数据被缓冲。当调用 callback
时,流可能会发出 'drain'
事件。如果流实现能够一次处理多个数据块,则应实现 writable._writev()
方法。
如果在构造函数选项中将 decodeStrings
属性显式设置为 false
,则 chunk
将保持传递给 .write()
的相同对象,并且可能是字符串而不是 Buffer
。这是为了支持对某些字符串数据编码具有优化处理的实现。在这种情况下,encoding
参数将指示字符串的字符编码。否则,可以安全地忽略 encoding
参数。
writable._write()
方法之前带有下划线,因为它属于定义它的类内部,并且用户程序永远不应直接调用它。
writable._writev(chunks, callback)
#
chunks
<Object[]> 要写入的数据。该值是一个 <Object> 数组,每个数组都表示要写入的离散数据块。这些对象的属性是callback
<Function> 当为提供的块完成处理时调用的回调函数(可选,带错误参数)。
此函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部 Writable
类方法调用。
writable._writev()
方法可以作为 writable._write()
的补充或替代,在能够一次处理多个数据块的流实现中实现。如果已实现并且有来自先前写入的缓冲数据,则将调用 _writev()
而不是 _write()
。
writable._writev()
方法以一个下划线开头,因为它属于定义它的类内部,并且永远不应由用户程序直接调用。
writable._destroy(err, callback)
#
err
<Error> 可能的错误。callback
<Function> 一个回调函数,它采用一个可选的错误参数。
_destroy()
方法由 writable.destroy()
调用。它可以被子类覆盖,但不得直接调用。此外,一旦在解析承诺时执行,callback
不应与 async/await 混合使用。
writable._final(callback)
#
callback
<Function> 在完成写入任何剩余数据时调用此函数(可选,带错误参数)。
不得直接调用 _final()
方法。它可以由子类实现,如果实现,则仅由内部 Writable
类方法调用。
此可选函数将在流关闭之前调用,延迟 'finish'
事件,直到调用 callback
。这对于在流结束之前关闭资源或写入缓冲数据非常有用。
写入时出错#
处理 writable._write()
、writable._writev()
和 writable._final()
方法时发生的错误必须通过调用回调并传递错误作为第一个参数来传播。从这些方法中抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
如果 Readable
流管道到 Writable
流中,当 Writable
发出错误时,Readable
流将取消管道。
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
可写流示例#
以下示例说明了一个相当简单的(有点无意义的)自定义 Writable
流实现。虽然这个特定的 Writable
流实例没有任何实际的特定用途,但该示例说明了自定义 Writable
流实例的每个必需元素
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
在可写流中解码缓冲区#
解码缓冲区是一项常见任务,例如,当使用输入为字符串的转换器时。当使用多字节字符编码(例如 UTF-8)时,这不是一个简单的过程。以下示例展示了如何使用 StringDecoder
和 Writable
解码多字节字符串。
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
实现可读流#
扩展 stream.Readable
类以实现 Readable
流。
自定义 Readable
流必须调用 new stream.Readable([options])
构造函数并实现 readable._read()
方法。
new stream.Readable([options])
#
options
<Object>highWaterMark
<数字> 在停止从底层资源读取之前,存储在内部缓冲区中的最大 字节数。默认值:16384
(16 KiB),或16
(对于objectMode
流)。encoding
<字符串> 如果指定,则缓冲区将使用指定的编码解码为字符串。默认值:null
。objectMode
<布尔值> 此流是否应表现为对象流。这意味着stream.read(n)
返回单个值,而不是大小为n
的Buffer
。默认值:false
。emitClose
<boolean> 流在被销毁后是否应该发出'close'
。默认值:true
。read
<函数>stream._read()
方法的实现。destroy
<函数>stream._destroy()
方法的实现。construct
<函数>stream._construct()
方法的实现。autoDestroy
<boolean> 此流在结束之后是否应自动调用其自身的.destroy()
。默认值:true
。signal
<AbortSignal> 表示可能取消的信号。
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
}
或者,在使用前 ES6 样式的构造函数时
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
或者,使用简化的构造函数方法
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
对传递的 AbortSignal
对应的 AbortController
调用 abort
的行为与对创建的可读流调用 .destroy(new AbortError())
的行为相同。
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort();
readable._construct(callback)
#
callback
<Function> 当流完成初始化时,使用错误参数(可选)调用此函数。
不得直接调用 _construct()
方法。它可以由子类实现,如果实现,则只能由内部 Readable
类方法调用。
此可选函数将在下一个滴答中由流构造函数安排,延迟任何 _read()
和 _destroy()
调用,直到调用 callback
。这对于在使用流之前初始化状态或异步初始化资源非常有用。
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)
#
size
<number> 异步读取的字节数
此函数不得由应用程序代码直接调用。它应由子类实现,并且仅由内部 Readable
类方法调用。
所有 Readable
流实现都必须提供 readable._read()
方法的实现,以从底层资源获取数据。
当调用 readable._read()
时,如果资源中有可用数据,实现应开始使用 this.push(dataChunk)
方法将该数据推入读取队列。在每次调用 this.push(dataChunk)
后,当流准备好接受更多数据时,将再次调用 _read()
。_read()
可以继续从资源中读取并推送数据,直到 readable.push()
返回 false
。仅当在停止后再次调用 _read()
时,它才应恢复将附加数据推入队列。
一旦调用 readable._read()
方法,它将不会再次被调用,直到通过 readable.push()
方法推送更多数据。空数据(例如空缓冲区和字符串)不会导致调用 readable._read()
。
size
参数是建议性的。对于“读取”是返回数据的单个操作的实现,可以使用 size
参数来确定要获取多少数据。其他实现可能会忽略此参数,并且仅在数据可用时提供数据。在调用 stream.push(chunk)
之前,无需“等待”直到有 size
字节可用。
readable._read()
方法以一个下划线开头,因为它属于定义它的类内部,并且永远不应由用户程序直接调用。
readable._destroy(err, callback)
#
err
<Error> 可能的错误。callback
<Function> 一个回调函数,它采用一个可选的错误参数。
_destroy()
方法由 readable.destroy()
调用。它可以被子类覆盖,但不得直接调用。
readable.push(chunk[, encoding])
#
chunk
<Buffer> | <Uint8Array> | <string> | <null> | <any> 要推入读取队列的数据块。对于不在对象模式下运行的流,chunk
必须是字符串、Buffer
或Uint8Array
。对于对象模式流,chunk
可以是任何 JavaScript 值。编码
<string> 字符串块的编码。必须是有效的Buffer
编码,例如'utf8'
或'ascii'
。- 返回:<boolean>
true
,如果可能继续推送其他数据块;否则为false
。
当 chunk
是 Buffer
、Uint8Array
或 string
时,数据 chunk
将添加到内部队列中,供流的用户使用。将 chunk
传递为 null
表示流的结尾 (EOF),之后不能再写入更多数据。
当 Readable
以暂停模式运行时,可以使用 readable.push()
添加的数据通过在发出 'readable'
事件时调用 readable.read()
方法来读取。
当 Readable
以流动模式运行时,使用 readable.push()
添加的数据将通过发出 'data'
事件来传递。
readable.push()
方法旨在尽可能灵活。例如,在包装提供某种暂停/恢复机制和数据回调函数的较低级别源时,低级别源可以由自定义 Readable
实例包装
// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
readable.push()
方法用于将内容推送到内部缓冲区。它可以由 readable._read()
方法驱动。
对于不以对象模式运行的流,如果 readable.push()
的 chunk
参数为 undefined
,它将被视为空字符串或缓冲区。有关更多信息,请参见 readable.push('')
。
读取时出错#
处理 readable._read()
期间发生的错误必须通过 readable.destroy(err)
方法传播。从 readable._read()
中抛出 Error
或手动发出 'error'
事件会导致未定义的行为。
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
计数流示例#
以下是 Readable
流的基本示例,它按升序发出 1 到 1,000,000 的数字,然后结束。
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
实现双工流#
Duplex
流是同时实现 Readable
和 Writable
的流,例如 TCP 套接字连接。
由于 JavaScript 不支持多重继承,因此 stream.Duplex
类被扩展为实现 Duplex
流(而不是扩展 stream.Readable
和 stream.Writable
类)。
stream.Duplex
类通常从 stream.Readable
继承,并从 stream.Writable
寄生,但 instanceof
将对这两个基类都正常工作,因为在 stream.Writable
上覆盖了 Symbol.hasInstance
。
自定义 Duplex
流必须调用 new stream.Duplex([options])
构造函数并实现 readable._read()
和 writable._write()
方法。
new stream.Duplex(options)
#
options
<Object> 传递给Writable
和Readable
构造函数。还具有以下字段allowHalfOpen
<boolean> 如果设置为false
,则当可读端结束时,流将自动结束可写端。默认值:true
。readable
<boolean> 设置Duplex
是否可读。默认值:true
。writable
<boolean> 设置Duplex
是否可写。默认值:true
。readableObjectMode
<boolean> 为流的可读端设置objectMode
。如果objectMode
为true
,则不起作用。默认值:false
。writableObjectMode
<boolean> 为流的可写端设置objectMode
。如果objectMode
为true
,则不起作用。默认值:false
。readableHighWaterMark
<number> 为流的可读端设置highWaterMark
。如果提供了highWaterMark
,则不起作用。writableHighWaterMark
<number> 为流的可写端设置highWaterMark
。如果提供了highWaterMark
,则不起作用。
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
或者,在使用前 ES6 样式的构造函数时
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
或者,使用简化的构造函数方法
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
使用管道时
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
一个示例双工流#
以下展示了一个简单的 Duplex
流示例,它封装了一个假设的底层源对象,可以向其写入数据,并且可以从中读取数据,尽管使用的是与 Node.js 流不兼容的 API。以下展示了一个简单的 Duplex
流示例,它通过 Writable
接口缓冲传入的写入数据,然后通过 Readable
接口读回数据。
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
Duplex
流最重要的方面是,Readable
和 Writable
端彼此独立运行,尽管它们共存于一个对象实例中。
对象模式双工流#
对于 Duplex
流,可以使用 readableObjectMode
和 writableObjectMode
选项分别为 Readable
或 Writable
端独占设置 objectMode
。
例如,在以下示例中,创建了一个新的 Transform
流(它是一种 Duplex
流),它有一个对象模式 Writable
端,接受 JavaScript 数字,这些数字在 Readable
端转换为十六进制字符串。
const { Transform } = require('node:stream');
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
实现转换流#
Transform
流是一种 Duplex
流,其中输出以某种方式从输入计算得出。示例包括压缩、加密或解密数据的 zlib 流或 crypto 流。
没有要求输出与输入大小相同、块数相同或同时到达。例如,Hash
流将仅有一个输出块,该块在输入结束时提供。zlib
流将生成比其输入小得多或大得多的输出。
stream.Transform
类被扩展为实现 Transform
流。
stream.Transform
类从 stream.Duplex
原型继承,并实现其自己的 writable._write()
和 readable._read()
方法版本。自定义 Transform
实现必须实现 transform._transform()
方法,并且可以实现 transform._flush()
方法。
使用 Transform
流时,必须小心,因为写入流的数据可能会导致流的 Writable
端暂停,如果 Readable
端的输出未被使用。
new stream.Transform([options])
#
options
<Object> 传递给Writable
和Readable
构造函数。还具有以下字段transform
<Function>stream._transform()
方法的实现。flush
<Function>stream._flush()
方法的实现。
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
或者,在使用前 ES6 样式的构造函数时
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
或者,使用简化的构造函数方法
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
事件:'end'
#
'end'
事件来自 stream.Readable
类。'end'
事件在所有数据输出后触发,即在 transform._flush()
中的回调被调用后。如果发生错误,则不应触发 'end'
。
事件:'finish'
#
'finish'
事件来自 stream.Writable
类。'finish'
事件在调用 stream.end()
且所有块已由 stream._transform()
处理后触发。如果发生错误,则不应触发 'finish'
。
transform._flush(callback)
#
callback
<Function> 当剩余数据被刷新时要调用的回调函数(可选,带有错误参数和数据)。
此函数不得由应用程序代码直接调用。它应由子类实现,并且仅由内部 Readable
类方法调用。
在某些情况下,转换操作可能需要在流的末尾发出额外的位数据。例如,zlib
压缩流将存储一定量的内部状态,用于优化输出的压缩。但是,当流结束时,需要刷新该附加数据,以便压缩数据完整。
自定义 Transform
实现可能实现 transform._flush()
方法。当没有更多要使用的已写入数据时,但在 'end'
事件发出 Readable
流结束信号之前,将调用此方法。
在 transform._flush()
实现中,transform.push()
方法可以根据需要调用零次或多次。当刷新操作完成时,必须调用 callback
函数。
transform._flush()
方法以一个下划线为前缀,因为它属于定义它的类内部,并且用户程序永远不应直接调用它。
transform._transform(chunk, encoding, callback)
#
chunk
<Buffer> | <string> | <any> 要转换的Buffer
,从传递给stream.write()
的string
转换而来。如果流的decodeStrings
选项为false
或流在对象模式下运行,则不会转换块,并且将传递给stream.write()
的任何内容。encoding
<string> 如果块是一个字符串,那么这就是编码类型。如果块是一个缓冲区,那么这就是特殊值'buffer'
。在这种情况下,忽略它。callback
<Function> 在提供chunk
已处理后要调用的回调函数(可选,带有错误参数和数据)。
此函数不得由应用程序代码直接调用。它应由子类实现,并且仅由内部 Readable
类方法调用。
所有 Transform
流实现都必须提供一个 _transform()
方法来接受输入并生成输出。transform._transform()
实现处理正在写入的字节,计算输出,然后使用 transform.push()
方法将该输出传递给可读部分。
transform.push()
方法可以调用零次或多次以从单个输入块生成输出,具体取决于块的结果要输出多少。
有可能不会从任何给定的输入数据块生成输出。
只有在当前块完全消耗时才必须调用 callback
函数。传递给 callback
的第一个参数必须是 Error
对象(如果在处理输入时发生错误)或 null
(否则)。如果将第二个参数传递给 callback
,则它将被转发到 transform.push()
方法,但仅当第一个参数为假时。换句话说,以下内容是等效的
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
transform._transform()
方法以一个下划线为前缀,因为它属于定义它的类内部,并且用户程序永远不应直接调用它。
transform._transform()
从不并行调用;流实现了一个队列机制,并且为了接收下一个块,必须调用 callback
,无论是同步还是异步。
类:stream.PassThrough
#
stream.PassThrough
类是 Transform
流的一个简单实现,它只是将输入字节传递到输出。它的目的是主要用于示例和测试,但有一些用例,其中 stream.PassThrough
作为新颖流类型的构建块很有用。
其他说明#
流与异步生成器和异步迭代器的兼容性#
随着 JavaScript 中异步生成器和迭代器的支持,异步生成器实际上是此阶段的一级语言级流构造。
下面提供了使用 Node.js 流与异步生成器和异步迭代器的一些常见互操作案例。
使用异步迭代器使用可读流#
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
异步迭代器在流上注册一个永久错误处理程序,以防止任何未处理的后销毁错误。
使用异步生成器创建可读流#
可以使用 Readable.from()
实用方法从异步生成器创建 Node.js 可读流
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
从异步迭代器传输到可写流#
从异步迭代器写入可写流时,请确保正确处理背压和错误。 stream.pipeline()
抽象了背压和背压相关错误的处理
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
与较旧的 Node.js 版本兼容#
在 Node.js 0.10 之前,Readable
流接口更简单,但功能更少,实用性更低。
'data'
事件将立即开始发出,而不是等待对stream.read()
方法的调用。需要执行一定工作量来决定如何处理数据的应用程序需要将读取的数据存储到缓冲区中,以便不会丢失数据。stream.pause()
方法是建议性的,而不是有保证的。这意味着即使流处于暂停状态,也仍然需要准备好接收'data'
事件。
在 Node.js 0.10 中,添加了 Readable
类。为了向后兼容旧版 Node.js 程序,当添加 'data'
事件处理程序或调用 stream.resume()
方法时,Readable
流将切换到“流动模式”。其效果是,即使不使用新的 stream.read()
方法和 'readable'
事件,也不必再担心丢失 'data'
块。
虽然大多数应用程序将继续正常运行,但在以下情况下会引入边缘情况
- 未添加
'data'
事件侦听器。 - 从未调用
stream.resume()
方法。 - 流未管道到任何可写目标。
例如,考虑以下代码
// WARNING! BROKEN!
net.createServer((socket) => {
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
});
}).listen(1337);
在 Node.js 0.10 之前,传入的消息数据将被简单地丢弃。然而,在 Node.js 0.10 及更高版本中,套接字将永远保持暂停状态。
这种情况下的解决方法是调用 stream.resume()
方法以开始数据流
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
});
// Start the flow of data, discarding it.
socket.resume();
}).listen(1337);
除了新的 Readable
流切换到流动模式外,还可以使用 readable.wrap()
方法将 0.10 之前的样式流包装在 Readable
类中。
readable.read(0)
#
在某些情况下,有必要触发对底层可读流机制的刷新,而无需实际使用任何数据。在这种情况下,可以调用 readable.read(0)
,它将始终返回 null
。
如果内部读取缓冲区低于 highWaterMark
,并且流当前未读取,则调用 stream.read(0)
将触发低级 stream._read()
调用。
虽然大多数应用程序几乎不需要这样做,但在 Node.js 中有一些情况需要这样做,特别是在 Readable
流类内部。
readable.push('')
#
不建议使用 readable.push('')
。
将零字节字符串、Buffer
或 Uint8Array
推送到非对象模式的流会产生一个有趣的副作用。因为这是对 readable.push()
的调用,所以该调用会结束读取进程。但是,因为参数是一个空字符串,所以不会向可读缓冲区添加任何数据,因此用户无法使用任何内容。
调用 readable.setEncoding()
后 highWaterMark
差异#
使用 readable.setEncoding()
会改变 highWaterMark
在非对象模式下运行的方式。
通常,当前缓冲区的大小是根据 字节 与 highWaterMark
进行比较的。但是,在调用 setEncoding()
之后,比较函数将开始根据 字符 来测量缓冲区的大小。
对于 latin1
或 ascii
的常见情况,这不是问题。但是,在处理可能包含多字节字符的字符串时,建议注意此行为。