流中的背压
在数据处理过程中,会发生一个普遍的问题,称为 背压
,它描述了数据传输过程中缓冲区后面数据堆积的情况。当传输的接收端进行复杂操作,或者由于某种原因速度较慢时,来自传入源的数据往往会累积,就像堵塞一样。
为了解决这个问题,必须有一个委托系统来确保数据从一个源到另一个源的平滑流动。不同的社区已经为其程序独特地解决了这个问题,Unix管道和TCP套接字就是很好的例子,通常被称为流控。在Node.js中,流是采用的解决方案。
本指南的目的是进一步详细说明什么是背压,以及流如何在Node.js的源代码中解决这个问题。指南的第二部分将介绍建议的最佳实践,以确保您的应用程序代码在实现流时安全且优化。
我们假设您对背压
、Buffer
和EventEmitters
在Node.js中的通用定义以及对Stream
的一些经验有所了解。如果您还没有阅读这些文档,最好先看一下API文档,因为它将有助于您在阅读本指南时扩展理解。
数据处理问题
在计算机系统中,数据通过管道、套接字和信号从一个进程传输到另一个进程。在Node.js中,我们发现了一种类似的机制,称为Stream
。流很棒!它们为Node.js做了很多事情,几乎所有内部代码库都利用了该模块。作为开发人员,您更鼓励使用它们!
const readline = require('node:readline');
// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.question('Why should you use streams? ', answer => {
console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);
rl.close();
});
通过流实现的背压机制为什么是一个很好的优化,可以通过比较Node.js的Stream
实现中的内部系统工具来证明。
在一个场景中,我们将使用熟悉的zip(1)
工具压缩一个大文件(大约~9 GB)。
zip The.Matrix.1080p.mkv
虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本使用 Node.js 的模块 zlib
,它封装了另一个压缩工具 gzip(1)
。
const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');
const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');
inp.pipe(gzip).pipe(out);
要测试结果,请尝试打开每个压缩文件。由 zip(1)
工具压缩的文件会通知您文件已损坏,而由 Stream
完成的压缩将无错误地解压缩。
在这个例子中,我们使用
.pipe()
将数据源从一端传递到另一端。但是,请注意,没有附加适当的错误处理程序。如果数据块无法正确接收,Readable
源或gzip
流将不会被销毁。pump
是一个实用工具,如果管道中的一个流失败或关闭,它会适当地销毁所有流,在这种情况下是必不可少的!
pump
仅适用于 Node.js 8.x 或更早版本,对于 Node.js 10.x 或更高版本,引入了 pipeline
来代替 pump
。这是一个模块方法,用于在流之间进行管道传输,转发错误并适当地清理,并在管道完成后提供回调。
以下是用 pipeline 的例子
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
您也可以在 pipeline 上调用 promisify
,以便与 async
/ await
一起使用
const stream = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
const util = require('node:util');
const pipeline = util.promisify(stream.pipeline);
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
);
console.log('Pipeline succeeded');
} catch (err) {
console.error('Pipeline failed', err);
}
}
数据太多,太快
在某些情况下,Readable
流可能会将数据传递给 Writable
太快,远远超过消费者的处理能力!
发生这种情况时,消费者将开始将所有数据块排队以供以后使用。写入队列将越来越长,因此必须将更多数据保存在内存中,直到整个过程完成。
写入磁盘比从磁盘读取要慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,由于写入磁盘无法跟上读取速度,因此会发生背压。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);
这就是背压机制很重要的原因。如果没有背压系统,该过程将耗尽系统的内存,有效地减慢其他进程的速度,并独占系统的大部分资源,直到完成。
这会导致一些问题
- 减慢所有其他当前进程
- 垃圾收集器过度工作
- 内存耗尽
在以下示例中,我们将从 返回值 中删除 .write()
函数并将其更改为 true
,这实际上禁用了 Node.js 内核中的背压支持。在任何对“修改”二进制文件的引用中,我们指的是运行 node
二进制文件而不使用 return ret;
行,而是使用替换的 return true;
。
垃圾收集的过度拖累
让我们看一下快速基准测试。使用上面相同的示例,我们运行了几次时间测试以获得两个二进制文件的中间时间。
trial (#) | `node` binary (ms) | modified `node` binary (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
average time: | 55299 | 55975
两者都大约需要一分钟才能运行,因此几乎没有区别,但让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace
来评估 V8 垃圾收集器发生了什么。
GC(垃圾收集器)测量的时长表示垃圾收集器完成一次完整循环的间隔
approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
虽然这两个进程开始时相同,并且似乎以相同的速率工作 GC,但很明显,在具有正常工作的背压系统的情况下运行了几秒钟后,它将 GC 负载分散到 4-8 毫秒的持续间隔中,直到数据传输结束。
但是,当没有背压系统时,V8 垃圾收集开始拖延。正常二进制文件在 1 分钟内调用 GC 大约 75 次,而修改后的二进制文件仅调用 36 次。
这是由于不断增长的内存使用而逐渐积累的缓慢债务。随着数据的传输,如果没有背压系统,每个块传输将使用更多的内存。
分配的内存越多,GC 在一次扫描中需要处理的就越多。扫描越大,GC 就越需要决定哪些可以释放,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。
内存耗尽
为了确定每个二进制文件的内存消耗,我们使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
分别记录了每个进程。
这是正常二进制文件的输出
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
虚拟内存占用的最大字节数约为 87.81 兆字节。
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
虚拟内存占用的最大字节数约为 1.52 gb。
如果没有流来处理背压,内存空间的分配量将大一个数量级——同一个进程之间存在巨大的差异!
这个实验展示了 Node.js 背压机制在您的计算系统中是如何优化和节省成本的。现在,让我们来分解一下它的工作原理!
背压如何解决这些问题?
有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的内部函数叫做 .pipe()
。还有 其他包 可以使用!但最终,在这个过程的基本层面上,我们有两个独立的组件:数据源和消费者。
当从源代码调用 .pipe()
时,它会向消费者发出信号,表明有数据要传输。管道函数有助于为事件触发器设置适当的背压闭包。
在 Node.js 中,源是 Readable
流,消费者是 Writable
流(这两个都可以与 Duplex
或 Transform
流互换,但这超出了本指南的范围)。
背压触发的时刻可以精确地缩小到 Writable
的 .write()
函数的返回值。当然,这个返回值是由一些条件决定的。
在任何数据缓冲区超过 highWaterMark
或写入队列当前繁忙的情况下,.write()
将返回 false
。
当返回 false
值时,背压系统就会启动。它将暂停传入的 Readable
流发送任何数据,并等待消费者再次准备好。一旦数据缓冲区清空,就会发出 'drain'
事件,并恢复传入的数据流。
队列完成后,背压将允许再次发送数据。正在使用的内存空间将释放并为下一批数据做好准备。
这有效地允许在任何给定时间为 .pipe()
函数使用固定数量的内存。不会出现内存泄漏,也不会出现无限缓冲,垃圾收集器只需要处理内存中的一个区域!
所以,如果背压如此重要,为什么你(可能)没有听说过它?答案很简单:Node.js 会自动为你完成所有这些操作。
太棒了!但当我们试图理解如何实现自定义流时,它也不那么棒了。
在大多数机器中,有一个字节大小决定了缓冲区何时已满(这在不同的机器上会有所不同)。Node.js 允许你设置自定义
highWaterMark
,但通常情况下,默认值为 16kb(16384,或对于 objectMode 流为 16)。在可能需要提高该值的情况下,可以这样做,但要谨慎!
.pipe()
的生命周期
为了更好地理解背压,这里有一个关于 Readable
流被 管道化 到 Writable
流的生命周期的流程图。
+===================+
x--> Piping functions +--> src.pipe(dest) |
x are set up during |===================|
x the .pipe method. | Event callbacks |
+===============+ x |-------------------|
| Your Data | x They exist outside | .on('close', cb) |
+=======+=======+ x the data flow, but | .on('data', cb) |
| x importantly attach | .on('drain', cb) |
| x events, and their | .on('unpipe', cb) |
+---------v---------+ x respective callbacks. | .on('error', cb) |
| Readable Stream +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable Stream +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | Is this chunk too big? |
^ | | emit .end(); | Is the queue busy? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Yes |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^------------^-----------------------< Buffering | |
| |============| |
+> emit .drain(); | ^Buffer^ | |
+> emit .resume(); +------------+ |
| ^Buffer^ | |
+------------+ add chunk to queue |
| <---^---------------------<
+============+
如果你正在设置一个管道来将几个流链接在一起以操作你的数据,你很可能正在实现
Transform
流。
在这种情况下,来自 Readable
流的输出将进入 Transform
并将管道化到 Writable
中。
Readable.pipe(Transformable).pipe(Writable);
背压将自动应用,但请注意,Transform
流的传入和传出 highWaterMark
都可以被操作,并将影响背压系统。
背压指南
从 Node.js v0.10 开始,Stream
类提供了修改 .read()
或 .write()
行为的能力,方法是使用这些函数的带下划线的版本(._read()
和 ._write()
)。
有关于 实现可读流 和 实现可写流 的指南。我们假设你已经阅读了这些指南,下一节将更深入地探讨。
实现自定义流时需要遵守的规则
流的黄金法则就是始终尊重背压。最佳实践的定义是不矛盾的实践。只要你小心避免与内部背压支持冲突的行为,你就可以确定你遵循了良好的实践。
一般来说,
- 如果你没有被要求,就不要
.push()
。 - 如果
.write()
返回false,就不要调用它,而是等待'drain'。 - 流在不同的 Node.js 版本和使用的库之间会有变化。要小心并进行测试。
关于第 3 点,一个非常有用的用于构建浏览器流的包是
readable-stream
。Rodd Vagg 写了一篇 很棒的博客文章 描述了这个库的实用性。简而言之,它为Readable
流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。
特定于可读流的规则
到目前为止,我们已经了解了 .write()
如何影响背压,并且主要关注了 Writable
流。由于 Node.js 的功能,数据在技术上是从 Readable
流向 Writable
流下游传输的。但是,正如我们在任何数据、物质或能量传输中观察到的那样,源头与目的地一样重要,而 Readable
流对于如何处理背压至关重要。
这两个过程都依赖于彼此进行有效地通信,如果 Readable
忽略了 Writable
流要求它停止发送数据的请求,它可能与 .write()
的返回值不正确一样有问题。
因此,除了尊重 .write()
的返回值之外,我们还必须尊重 .push()
在 ._read()
方法中使用的返回值。如果 .push()
返回一个 false
值,流将停止从源读取。否则,它将继续不间断地读取。
以下是一个使用 .push()
的不良实践示例。
// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
_read(size) {
let chunk;
while (null !== (chunk = getNextChunk())) {
this.push(chunk);
}
}
}
此外,在自定义流外部,忽略背压也会带来陷阱。在这个良好的实践反例中,应用程序代码强制数据在可用时(由 'data'
事件 信号)通过。
// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));
以下是如何使用 .push()
与可读流的示例。
const { Readable } = require('node:stream');
// Create a custom Readable stream
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// Push some data onto the stream
this.push({ message: 'Hello, world!' });
this.push(null); // Mark the end of the stream
},
});
// Consume the stream
myReadableStream.on('data', chunk => {
console.log(chunk);
});
// Output:
// { message: 'Hello, world!' }
在这个示例中,我们创建了一个自定义可读流,使用 .push()
将单个对象推送到流中。当流准备好消费数据时,会调用 ._read()
方法,在这种情况下,我们立即将一些数据推送到流中,并通过推送 null 来标记流的结束。
然后,我们通过监听 'data' 事件来消费流,并记录推送到流中的每个数据块。在这种情况下,我们只将单个数据块推送到流中,因此我们只看到一条日志消息。
可写流的特定规则
回想一下,.write()
可能会根据某些条件返回 true 或 false。幸运的是,当我们构建自己的 Writable
流时,流状态机
将处理我们的回调并确定何时处理背压并为我们优化数据流。
但是,当我们想要直接使用 Writable
时,我们必须尊重 .write()
的返回值,并密切注意这些条件。
- 如果写入队列繁忙,
.write()
将返回 false。 - 如果数据块太大,
.write()
将返回 false(限制由变量highWaterMark
指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback();
else if (chunk.toString().indexOf('b') >= 0) callback();
callback();
}
}
// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();
在实现 ._writev()
时,还有一些需要注意的地方。该函数与 .cork()
耦合,但在编写时有一个常见的错误。
// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();
ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();
// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);
ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);
// As a global function.
function doUncork(stream) {
stream.uncork();
}
.cork()
可以被调用任意多次,我们只需要确保调用 .uncork()
相同次数,使其再次流动。
结论
流是 Node.js 中经常使用的模块。它们对内部结构很重要,对于开发人员来说,它们可以扩展和连接 Node.js 模块生态系统。
希望你现在能够在考虑背压的情况下,安全地调试和编写自己的 Writable
和 Readable
流,并与同事和朋友分享你的知识。
请务必阅读更多关于 Stream
的内容,了解其他 API 函数,以帮助你改进和释放你的流式处理能力,从而在使用 Node.js 构建应用程序时。