流中的反压
在数据处理过程中,存在一个普遍问题,称为 反压
,它描述了数据传输过程中缓冲区后面的数据积压。当传输的接收端具有复杂的操作,或者由于任何原因速度较慢时,来自传入源的数据往往会积累,就像堵塞一样。
为了解决这个问题,必须建立一个委托系统,以确保数据从一个源到另一个源的平稳流动。不同的社区已经为他们的程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,通常被称为流控制。在 Node.js 中,流已经被采纳为解决方案。
本指南的目的是进一步详细说明什么是反压,以及流如何在 Node.js 的源代码中解决这个问题。本指南的第二部分将介绍建议的最佳实践,以确保您的应用程序的代码在实现流时是安全和优化的。
我们假设您对 Node.js 中 反压
、Buffer
和 EventEmitters
的一般定义有一定了解,并且对 Stream
有一些经验。如果您还没有阅读过这些文档,那么最好先查看一下 API 文档,因为这将有助于您在阅读本指南时扩展您的理解。
数据处理的问题
在计算机系统中,数据通过管道、套接字和信号从一个进程传输到另一个进程。在 Node.js 中,我们发现了一种类似的机制,称为 Stream
。流非常棒!它们为 Node.js 做了很多事情,几乎每个内部代码库都使用该模块。作为一名开发人员,我们非常鼓励您也使用它们!
const readline = require('node:readline');
// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.question('Why should you use streams? ', answer => {
console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);
rl.close();
});
通过比较 Node.js 的 Stream
实现中的内部系统工具,可以很好地说明通过流实现的反压机制为什么是一个很好的优化。
在一种情况下,我们将采用一个大文件(大约 ~9 GB),并使用熟悉的 zip(1)
工具对其进行压缩。
zip The.Matrix.1080p.mkv
虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本使用 Node.js 的 zlib
模块,该模块包装了另一个压缩工具 gzip(1)
。
const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');
const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');
inp.pipe(gzip).pipe(out);
要测试结果,请尝试打开每个压缩文件。使用 zip(1)
工具压缩的文件将通知您该文件已损坏,而由 Stream
完成的压缩将解压缩且没有错误。
在此示例中,我们使用
.pipe()
将数据源从一端传输到另一端。但是,请注意没有附加适当的错误处理程序。如果一块数据未能被正确接收,则Readable
源或gzip
流将不会被销毁。pump
是一个实用工具,如果管道中的其中一个流失败或关闭,它将正确销毁管道中的所有流,在这种情况下,它是必不可少的!
pump
仅对于 Node.js 8.x 或更早版本是必需的,因为对于 Node.js 10.x 或更高版本,引入了 pipeline
来替换 pump
。这是一个模块方法,用于在流之间进行管道传输,转发错误并正确清理,并在管道完成时提供回调。
这是一个使用 pipeline 的示例
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
您还可以使用 stream/promises
模块将 pipeline 与 async
/ await
一起使用
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
);
console.log('Pipeline succeeded');
} catch (err) {
console.error('Pipeline failed', err);
}
}
数据过多,速度过快
在某些情况下,Readable
流可能会将数据传递给 Writable
流太快了——远远超过了消费者可以处理的量!
当这种情况发生时,消费者将开始排队所有数据块以供以后消费。写入队列将变得越来越长,因此,必须将更多数据保存在内存中,直到整个过程完成。
写入磁盘比从磁盘读取慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,由于写入磁盘将无法跟上读取速度,因此会发生反压。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);
这就是反压机制很重要的原因。如果没有反压系统,该过程将耗尽您的系统内存,从而有效地降低其他进程的速度,并垄断您系统的大部分资源直到完成。
这会导致以下几件事
- 降低所有其他当前进程的速度
- 一个过度劳累的垃圾回收器
- 内存耗尽
在以下示例中,我们将删除 返回值 .write()
函数,并将其更改为 true
,这有效地禁用了 Node.js 核心中的反压支持。在任何提及“修改”二进制文件的地方,我们都在谈论运行没有 return ret;
行的 node
二进制文件,而是替换为 return true;
。
垃圾回收的过度负担
让我们快速看一下基准测试。使用与上面相同的示例,我们运行了几次计时试验,以获得两个二进制文件的中值时间。
trial (#) | `node` binary (ms) | modified `node` binary (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
average time: | 55299 | 55975
两者都需要大约一分钟才能运行,因此没有太大的区别,但让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace
来评估 V8 垃圾回收器发生了什么。
GC(垃圾回收器)测量的时间表示垃圾回收器完成单次扫描的完整周期的间隔
approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
虽然这两个进程的启动方式相同,并且似乎以相同的速率运行 GC,但很明显,在几秒钟后,通过适当工作的反压系统,它将 GC 负载分散在 4-8 毫秒的持续间隔内,直到数据传输结束。
但是,当没有反压系统时,V8 垃圾回收开始拖延。正常的二进制文件在一分钟内大约调用 GC 75 次,而修改后的二进制文件仅调用 36 次。
这是由于不断增长的内存使用而累积的缓慢而渐进的债务。随着数据的传输,在没有反压系统的情况下,每次块传输都会使用更多的内存。
分配的内存越多,GC 在一次扫描中需要处理的就越多。扫描越大,GC 需要决定可以释放什么的空间就越多,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。
内存耗尽
为了确定每个二进制文件的内存消耗,我们分别使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
记录了每个进程的时间。
这是正常二进制文件的输出
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
虚拟内存占用的最大字节大小约为 87.81 mb。
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
虚拟内存占用的最大字节大小约为 1.52 gb。
如果没有流来委托反压,则分配的内存空间的数量级更大 - 同一进程之间的巨大差异!
此实验表明 Node.js 的反压机制对于您的计算系统是多么优化和具有成本效益。现在,让我们分解一下它的工作原理!
反压如何解决这些问题?
有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的函数,称为 .pipe()
。您也可以使用 其他软件包!但最终,在这个过程的基本层面,我们有两个独立的组件:数据的来源和消费者。
当从源调用 .pipe()
时,它会向消费者发出信号,表明有数据要传输。pipe 函数有助于为事件触发器设置适当的反压闭包。
在 Node.js 中,源是一个 Readable
流,而消费者是一个 Writable
流(这两者都可以与 Duplex
或 Transform
流互换,但这超出了本指南的范围)。
可以精确地将反压触发的时刻缩小到 Writable
的 .write()
函数的返回值。当然,这个返回值由一些条件决定。
在任何数据缓冲区超过 highWaterMark
或写入队列当前繁忙的情况下,.write()
将返回 false
。
当返回 false
值时,反压系统会启动。它会暂停传入的 Readable
流发送任何数据,并等待消费者再次准备就绪。一旦数据缓冲区被清空,就会发出一个 'drain'
事件,并恢复传入的数据流。
一旦队列完成,反压将允许再次发送数据。正在使用的内存空间将被释放,并为下一批数据做好准备。
这有效地允许在任何给定时间为 .pipe()
函数使用固定数量的内存。不会有内存泄漏,也不会有无限缓冲,并且垃圾回收器只需要处理内存中的一个区域!
那么,如果反压如此重要,为什么你(可能)没有听说过它呢?嗯,答案很简单:Node.js 会自动为你完成所有这些。
这太棒了!但是当我们试图理解如何实现我们的自定义流时,这也不是那么好。
在大多数机器中,有一个字节大小决定了缓冲区何时已满(这会因机器而异)。Node.js 允许你设置自定义的
highWaterMark
,但通常,默认设置为 16kb(16384,或者对于 objectMode 流为 16)。在您可能想要提高该值的情况下,请放心去做,但要谨慎!
.pipe()
的生命周期
为了更好地理解反压,这里有一个流程图,说明了 Readable
流被 管道到 Writable
流的生命周期
+===================+
x--> Piping functions +--> src.pipe(dest) |
x are set up during |===================|
x the .pipe method. | Event callbacks |
+===============+ x |-------------------|
| Your Data | x They exist outside | .on('close', cb) |
+=======+=======+ x the data flow, but | .on('data', cb) |
| x importantly attach | .on('drain', cb) |
| x events, and their | .on('unpipe', cb) |
+---------v---------+ x respective callbacks. | .on('error', cb) |
| Readable Stream +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable Stream +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | Is this chunk too big? |
^ | | emit .end(); | Is the queue busy? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Yes |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^------------^-----------------------< Buffering | |
| |============| |
+> emit .drain(); | ^Buffer^ | |
+> emit .resume(); +------------+ |
| ^Buffer^ | |
+------------+ add chunk to queue |
| <---^---------------------<
+============+
如果您正在设置一个管道来将几个流链接在一起以操作您的数据,那么您很可能正在实现
Transform
流。
在这种情况下,您的 Readable
流的输出将进入 Transform
并通过管道传输到 Writable
。
Readable.pipe(Transformable).pipe(Writable);
反压将自动应用,但请注意,Transform
流的传入和传出 highWaterMark
都可以被操作,并且会影响反压系统。
反压指南
自从 Node.js v0.10 以来,Stream
类提供了修改 .read()
或 .write()
行为的能力,方法是使用这些相应函数的下划线版本(._read()
和 ._write()
)。
有关于实现 Readable 流和实现 Writable 流的指南。 我们将假设您已经阅读了这些,下一节将更深入地探讨一些内容。
实现自定义流时要遵守的规则
流的黄金法则是始终尊重反压。构成最佳实践的是非矛盾实践。只要您小心避免与内部反压支持冲突的行为,您就可以确保您遵循了良好的实践。
一般来说,
- 如果未被要求,永远不要
.push()
。 - 在
.write()
返回 false 之后,永远不要调用它,而是等待 'drain' 事件。 - 流在不同的 Node.js 版本和你使用的库之间会发生变化。小心并测试事物。
关于第 3 点,一个非常有用的构建浏览器流的包是
readable-stream
。 Rodd Vagg 撰写了一篇 精彩的博客文章,描述了这个库的实用性。简而言之,它为Readable
流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。
特定于 Readable 流的规则
到目前为止,我们已经了解了 .write()
如何影响反压,并且主要关注 Writable
流。 由于 Node.js 的功能,数据在技术上是从 Readable
向下流动到 Writable
。 然而,正如我们可以在任何数据、物质或能量的传输中观察到的那样,源与目标同样重要,并且 Readable
流对于如何处理反压至关重要。
这两个过程都依赖于彼此才能有效地进行通信,如果 Readable
忽略了 Writable
流要求停止发送数据的时间,这可能与 .write()
的返回值不正确一样有问题。
因此,除了尊重 .write()
返回值之外,我们还必须尊重在 ._read()
方法中使用的 .push()
的返回值。 如果 .push()
返回一个 false
值,则流将停止从源读取。 否则,它将继续而不暂停。
以下是使用 .push()
的错误实践示例
// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
_read(size) {
let chunk;
while (null !== (chunk = getNextChunk())) {
this.push(chunk);
}
}
}
此外,从自定义流外部,存在忽略反压的陷阱。 在这个良好实践的反例中,应用程序的代码在数据可用时(由 'data'
事件发出信号)强制数据通过
// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));
以下是使用 .push()
与 Readable 流的示例。
const { Readable } = require('node:stream');
// Create a custom Readable stream
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// Push some data onto the stream
this.push({ message: 'Hello, world!' });
this.push(null); // Mark the end of the stream
},
});
// Consume the stream
myReadableStream.on('data', chunk => {
console.log(chunk);
});
// Output:
// { message: 'Hello, world!' }
在此示例中,我们创建一个自定义 Readable 流,该流使用 .push()
将单个对象推送到流上。 当流准备好使用数据时,将调用 ._read()
方法,在这种情况下,我们立即将一些数据推送到流上,并通过推送 null 来标记流的结束。
然后,我们通过侦听 'data' 事件并记录推送到流上的每个数据块来使用该流。 在这种情况下,我们只将单个数据块推送到流上,因此我们只看到一条日志消息。
特定于 Writable 流的规则
回想一下,.write()
可能会根据某些条件返回 true 或 false。 对我们来说幸运的是,当构建我们自己的 Writable
流时,流状态机
将处理我们的回调,并确定何时处理反压并为我们优化数据流。
但是,当我们想直接使用 Writable
时,我们必须尊重 .write()
的返回值,并密切关注这些条件
- 如果写入队列繁忙,
.write()
将返回 false。 - 如果数据块太大,
.write()
将返回 false(限制由变量highWaterMark
指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback();
else if (chunk.toString().indexOf('b') >= 0) callback();
callback();
}
}
// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();
在实现 ._writev()
时,还需要注意一些事项。 该函数与 .cork()
耦合,但在编写时存在一个常见的错误
// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();
ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();
// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);
ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);
// As a global function.
function doUncork(stream) {
stream.uncork();
}
我们可以根据需要多次调用 .cork()
,我们只需要小心调用 .uncork()
相同的次数才能使其再次流动。
结论
流是 Node.js 中经常使用的模块。 它们对于内部结构以及开发人员扩展和连接到 Node.js 模块生态系统非常重要。
希望现在您能够对自己的 Writable
和 Readable
流进行故障排除和安全编码,同时考虑到反压,并与同事和朋友分享您的知识。
务必阅读有关 Stream
的更多信息,了解其他 API 函数,以帮助您在使用 Node.js 构建应用程序时改进和释放流式传输功能。