如何使用流
在 Node.js 应用程序中处理大量数据可能是一把双刃剑。处理海量数据的能力非常方便,但也可能导致性能瓶颈和内存耗尽。传统上,开发人员通过一次性将整个数据集读入内存来应对这一挑战。这种方法虽然对于较小的数据集很直观,但对于大数据(例如文件、网络请求等)则变得效率低下且资源密集。
这就是 Node.js 流发挥作用的地方。流提供了一种根本不同的方法,允许您增量处理数据并优化内存使用。通过将数据处理成可管理的块,流使您能够构建可扩展的应用程序,从而高效地处理即使是最艰巨的数据集。正如一句流行语所说,“流是随时间变化的数组。”
在本指南中,我们概述了流的概念、历史和 API,并就如何使用和操作它们提供了一些建议。
什么是 Node.js 流?
Node.js 流为管理应用程序中的数据流提供了强大的抽象。它们擅长处理大型数据集,例如从文件和网络请求中读取或写入数据,而不会影响性能。
这种方法不同于一次性将整个数据集加载到内存中。流以块的形式处理数据,从而显著减少内存使用。Node.js 中的所有流都继承自 EventEmitter
类,允许它们在数据处理的各个阶段发出事件。这些流可以是可读的、可写的,或两者兼备,为不同的数据处理场景提供了灵活性。
事件驱动架构
Node.js 以事件驱动架构为核心,使其成为实时 I/O 的理想选择。这意味着一旦有输入可用就立即消费,一旦应用程序生成输出就立即发送。流与这种方法无缝集成,实现了持续的数据处理。
它们通过在关键阶段发出事件来实现这一点。这些事件包括接收到数据的信号(data
事件)和流完成的信号(end
事件)。开发人员可以监听这些事件并相应地执行自定义逻辑。这种事件驱动的特性使得流在处理来自外部源的数据时非常高效。
为什么要使用流?
与其他数据处理方法相比,流提供了三个关键优势
- 内存效率:流以增量方式处理数据,以块的形式消费和处理数据,而不是将整个数据集加载到内存中。在处理大型数据集时,这是一个主要优势,因为它显著减少了内存使用并防止了与内存相关的性能问题。
- 改善响应时间:流允许立即处理数据。当一块数据到达时,可以立即处理,而无需等待整个有效载荷或数据集接收完毕。这减少了延迟并提高了应用程序的整体响应能力。
- 实时处理的可扩展性:通过分块处理数据,Node.js 流可以用有限的资源高效地处理大量数据。这种可扩展性使流成为实时处理大量数据的应用程序的理想选择。
这些优势使流成为构建高性能、可扩展的 Node.js 应用程序的强大工具,尤其是在处理大型数据集或实时数据时。
关于性能的说明
如果您的应用程序中已经有所有可用的数据,使用流可能会增加不必要的开销、复杂性,并减慢您的应用程序。
流的历史
本节参考了 Node.js 中流的历史。除非您正在处理为 Node.js 0.11.5(2013 年)之前的版本编写的代码库,否则您很少会遇到旧版本的流 API,但这些术语可能仍在使用中。
流 0
第一版流与 Node.js 同时发布。虽然当时还没有 Stream 类,但不同的模块使用了这个概念并实现了 read
/write
函数。util.pump()
函数可用于控制流之间的数据流。
流 1(经典)
随着 2011 年 Node v0.4.0 的发布,引入了 Stream 类以及 pipe()
方法。
流 2
2012 年,随着 Node v0.10.0 的发布,流 2 问世。此更新带来了新的流子类,包括 Readable、Writable、Duplex 和 Transform。此外,还添加了 readable
事件。为了保持向后兼容性,可以通过添加 data
事件监听器或调用 pause()
或 resume()
方法将流切换到旧模式。
流 3
2013 年,随着 Node v0.11.5 的发布,流 3 问世,以解决流同时具有 data
和 readable
事件处理程序的问题。这消除了在“当前”和“旧”模式之间进行选择的需要。流 3 是 Node.js 中流的当前版本。
流类型
可读流
Readable
是我们用来顺序读取数据源的类。Node.js API 中 Readable
流的典型示例包括读取文件时的 fs.ReadStream
、读取 HTTP 请求时的 http.IncomingMessage
以及从标准输入读取时的 process.stdin
。
关键方法和事件
可读流通过几个核心方法和事件进行操作,这些方法和事件允许对数据处理进行精细控制。
on('data')
:每当流中有数据可用时,就会触发此事件。它非常快,因为流会尽可能快地推送数据,使其适用于高吞吐量场景。on('end')
:当流中没有更多数据可读时发出。它表示数据传递的完成。此事件仅在流中的所有数据都已被消费后才会触发。on('readable')
:当流中有数据可读或已到达流末尾时,会触发此事件。它允许在需要时更可控地读取数据。on('close')
:当流及其底层资源已关闭时会发出此事件,并表示不会再发出任何事件。on('error')
:此事件可在任何时候发出,表示处理过程中出现错误。此事件的处理程序可用于避免未捕获的异常。
以下各节将演示这些事件的使用。
基本可读流
以下是一个动态生成数据的简单可读流实现示例:
class extends Readable {
#count = 0;
() {
this.push(':-)');
if (++this.#count === 5) {
this.push(null);
}
}
}
const = new ();
.on('data', => {
.(.toString());
});
在此代码中,MyStream
类扩展了 Readable 并重写了 _read()
方法,以将字符串“:-)”推送到内部缓冲区。在推送该字符串五次后,它通过推送 null
来表示流的结束。on('data')
事件处理程序会在接收到每个数据块时将其记录到控制台。
使用 readable 事件进行高级控制
为了更精细地控制数据流,可以使用 readable 事件。此事件更复杂,但通过允许明确控制何时从流中读取数据,为某些应用程序提供了更好的性能:
const = new MyStream({
: 1,
});
.on('readable', () => {
.('>> readable event');
let ;
while (( = .read()) !== null) {
.(.toString()); // Process the chunk
}
});
.on('end', () => .('>> end event'));
在这里,readable 事件用于根据需要手动从流中拉取数据。readable 事件处理程序内的循环会继续从流缓冲区中读取数据,直到返回 null
,这表示缓冲区暂时为空或流已结束。将 highWaterMark
设置为 1 会保持较小的缓冲区大小,从而更频繁地触发 readable 事件,并允许对数据流进行更精细的控制。
使用前面的代码,您将得到如下输出:
>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event
我们来分析一下。当我们附加 on('readable')
事件时,它会首次调用 read()
,因为这可能会触发 readable
事件的发射。在该事件发射之后,我们在 while
循环的第一次迭代中调用 read
。这就是为什么我们在同一行中得到前两个笑脸。之后,我们继续调用 read
,直到推送了 null
。每次调用 read
都会安排发射一个新的 readable
事件,但由于我们处于“流动”模式(即使用 readable
事件),发射被安排在 nextTick
。这就是为什么我们在循环的同步代码结束后,最后一次性得到它们的原因。
注意:您可以尝试使用 NODE_DEBUG=stream
运行代码,以查看在每次 push
后都会触发 emitReadable
。
如果我们想在每个笑脸之前看到 readable 事件被调用,我们可以将 push
包装在 setImmediate
或 process.nextTick
中,如下所示:
class extends Readable {
#count = 0;
() {
(() => {
this.push(':-)');
if (++this.#count === 5) {
return this.push(null);
}
});
}
}
然后我们会得到:
>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event
可写流
Writable
流对于创建文件、上传数据或任何涉及顺序输出数据的任务都很有用。虽然可读流提供了数据源,但 Node.js 中的可写流则充当数据的目的地。Node.js API 中可写流的典型示例是 fs.WriteStream
、process.stdout
和 process.stderr
。
可写流中的关键方法和事件
.write()
:此方法用于向流中写入一块数据。它通过缓冲数据直到达到定义的限制 (highWaterMark) 来处理数据,并返回一个布尔值,指示是否可以立即写入更多数据。.end()
:此方法表示数据写入过程的结束。它向流发出信号,以完成写入操作,并可能执行任何必要的清理工作。
创建可写流
以下是创建一个可写流的示例,该流在将所有传入数据写入标准输出之前将其转换为大写:
const { } = ('node:events');
const { } = ('node:stream');
class extends {
constructor() {
super({ : 10 /* 10 bytes */ });
}
(, , ) {
..(.toString().toUpperCase() + '\n', );
}
}
async function () {
const = new ();
for (let = 0; < 10; ++) {
const = !.('hello');
if () {
.('>> wait drain');
await (, 'drain');
}
}
.('world');
}
// Call the async function
().(.);
在这段代码中,MyStream
是一个自定义的 Writable
流,其缓冲区容量(highWaterMark
)为 10 字节。它重写了 _write
方法,在写出数据之前将其转换为大写。
循环尝试向流中写入十次 hello。如果缓冲区已满(waitDrain
变为 true
),它会等待一个 drain
事件,然后再继续,以确保我们不会使流的缓冲区过载。
输出将是:
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD
双工流
Duplex
流同时实现了可读和可写接口。
双工流中的关键方法和事件
双工流实现了可读流和可写流中描述的所有方法和事件。
一个很好的双工流示例是 net
模块中的 Socket
类:
const = ('node:net');
// Create a TCP server
const = .( => {
.('Hello from server!\n');
.('data', => {
.(`Client says: ${.()}`);
});
// Handle client disconnection
.('end', () => {
.('Client disconnected');
});
});
// Start the server on port 8080
.(8080, () => {
.('Server listening on port 8080');
});
之前的代码将在 8080 端口上打开一个 TCP 套接字,向任何连接的客户端发送 Hello from server!
,并记录任何接收到的数据。
const = ('node:net');
// Connect to the server at localhost:8080
const = .({ : 8080 }, () => {
.('Hello from client!\n');
});
.('data', => {
.(`Server says: ${.()}`);
});
// Handle the server closing the connection
.('end', () => {
.('Disconnected from server');
});
前面的代码将连接到 TCP 套接字,发送一条 Hello from client
消息,并记录任何接收到的数据。
转换流
Transform
流是双工流,其输出是根据输入计算得出的。顾名思义,它们通常用于在可读流和可写流之间转换数据。
转换流中的关键方法和事件
除了双工流中的所有方法和事件之外,还有:
_transform
:此函数在内部调用,用于处理可读和可写部分之间的数据流。应用程序代码不得调用此函数。
创建转换流
要创建一个新的转换流,我们可以向 Transform
构造函数传递一个 options
对象,其中包含一个 transform
函数,该函数使用 push
方法处理如何从输入数据计算输出数据。
const { } = ('node:stream');
const = new ({
(, , ) {
this.(.toString().toUpperCase());
();
},
});
此流将接收任何输入并以大写形式输出。
如何操作流
在使用流时,我们通常希望从一个源读取并写入一个目标,期间可能需要对数据进行一些转换。以下各节将介绍实现此目的的不同方法。
.pipe()
.pipe()
方法将一个可读流连接到一个可写(或转换)流。虽然这看起来是实现我们目标的简单方法,但它将所有错误处理都委托给了程序员,这使得正确实现变得困难。
下面的示例展示了一个 pipe 试图将当前文件以大写形式输出到控制台。
const = ('node:fs');
const { } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.().();
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('Transform stream closed');
});
.('error', => {
.('\nError in transform stream:', .);
});
.('close', () => {
.('Writable stream closed');
});
写入 10 个字符后,upper
将在回调中返回一个错误,这将导致流关闭。但是,其他流不会收到通知,从而导致内存泄漏。输出将是:
CONST FS =
Error in transform stream: BOOM!
Transform stream closed
pipeline()
为了避免 .pipe()
方法的陷阱和低级复杂性,在大多数情况下,建议使用 pipeline()
方法。此方法是一种更安全、更健壮的管道流方式,可自动处理错误和清理工作。
下面的示例演示了如何使用 pipeline()
来避免前面示例的陷阱:
const = ('node:fs');
const { , } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('\nTransform stream closed');
});
.('close', () => {
.('Writable stream closed');
});
(, , , => {
if () {
return .('Pipeline error:', .);
}
.('Pipeline succeeded');
});
在这种情况下,所有流都将被关闭,并显示以下输出:
CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed
pipeline()
方法还有一个 async pipeline()
版本,它不接受回调,而是返回一个在管道失败时被拒绝的 promise。
异步迭代器
异步迭代器被推荐为与流 API 交互的标准方式。与 Web 和 Node.js 中的所有流原语相比,异步迭代器更易于理解和使用,有助于减少错误并提高代码的可维护性。在 Node.js 的最新版本中,异步迭代器已成为一种更优雅、更易读的与流交互的方式。基于事件的基础,异步迭代器提供了一个更高级别的抽象,简化了流的消费。
在 Node.js 中,所有可读流都是异步可迭代的。这意味着您可以使用 for await...of
语法来循环遍历流的数据,并在数据可用时,以异步代码的效率和简单性处理每一块数据。
将异步迭代器与流一起使用的好处
将异步迭代器与流一起使用,可以从以下几个方面简化异步数据流的处理:
- 增强可读性:代码结构更清晰、更易读,尤其是在处理多个异步数据源时。
- 错误处理:异步迭代器允许使用 try/catch 块进行直接的错误处理,类似于常规的异步函数。
- 流量控制:它们通过消费者等待下一块数据来固有地管理背压,从而允许更高效的内存使用和处理。
异步迭代器提供了一种更现代且通常更易读的方式来处理可读流,尤其是在处理异步数据源或当您更喜欢采用更顺序的、基于循环的数据处理方法时。
下面是一个演示如何将异步迭代器与可读流一起使用的示例:
const = ('node:fs');
const { } = ('node:stream/promises');
async function () {
await (
.(),
async function* () {
for await (let of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
此代码实现了与前面示例相同的结果,但无需定义新的转换流。为了简洁起见,已删除了前面示例中的错误。已使用管道的异步版本,应将其包装在 try...catch
块中以处理可能的错误。
对象模式
默认情况下,流可以处理字符串、Buffer
、TypedArray
或 DataView
。如果将不同于这些类型的任意值(例如对象)推送到流中,则会抛出 TypeError
。但是,可以通过将 objectMode
选项设置为 true
来处理对象。这允许流处理任何 JavaScript 值,除了 null
,它用于表示流的结束。这意味着您可以在可读流中 push
和 read
任何值,并在可写流中 write
任何值。
const { } = ('node:stream');
const = ({
: true,
() {
this.push({ : 'world' });
this.push(null);
},
});
在对象模式下工作时,重要的是要记住 highWaterMark
选项指的是对象的数量,而不是字节数。
背压
使用流时,确保生产者不会压垮消费者非常重要。为此,背压机制在 Node.js API 的所有流中都得到了使用,并且实现者有责任维护这种行为。
在任何数据缓冲区超过 highWaterMark
或写入队列当前繁忙的情况下,.write()
将返回 false
。
当返回 false
值时,背压系统就会启动。它将暂停传入的 Readable
流发送任何数据,并等待消费者再次准备就绪。一旦数据缓冲区清空,将发出一个 'drain'
事件以恢复传入的数据流。
要更深入地了解背压,请查看 背压指南
。
流 vs Web 流
流的概念并非 Node.js 独有。实际上,Node.js 对流概念有不同的实现,称为 Web Streams
,它实现了 WHATWG Streams Standard
。虽然它们背后的概念相似,但重要的是要意识到它们有不同的 API 并且不直接兼容。
Web Streams
实现了 ReadableStream
、WritableStream
和 TransformStream
类,它们与 Node.js 的 Readable
、Writable
和 Transform
流同源。
流和 Web 流的互操作性
Node.js 提供了在 Web Streams 和 Node.js streams 之间进行转换的实用函数。这些函数在每个流类中作为 toWeb
和 fromWeb
方法实现。
Duplex
类中的以下示例演示了如何处理转换为 Web 流的可读和可写流:
const { } = ('node:stream');
const = ({
() {
this.push('world');
this.push(null);
},
(, , ) {
.('writable', );
();
},
});
const { , } = .();
.().('hello');
.()
.()
.( => {
.('readable', .);
});
如果您需要从 Node.js 模块返回一个 Web 流,反之亦然,那么这些辅助函数会很有用。对于常规的流消费,异步迭代器可以实现与 Node.js 和 Web 流的无缝交互。
const { } = ('node:stream/promises');
async function () {
const { } = await ('https://node.org.cn/api/stream.html');
await (
,
new (),
async function* () {
for await (const of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
请注意,fetch 主体是 ReadableStream<Uint8Array>
,因此需要一个 TextDecoderStream
来将数据块作为字符串处理。
这项工作源自 Matteo Collina 在 Platformatic 博客上发布的内容。