流中的反压

在数据处理过程中,存在一个普遍问题,称为 反压,它描述了数据传输过程中缓冲区后面的数据积压。当传输的接收端具有复杂的操作,或者由于任何原因速度较慢时,来自传入源的数据往往会积累,就像堵塞一样。

为了解决这个问题,必须建立一个委托系统,以确保数据从一个源到另一个源的平稳流动。不同的社区已经为他们的程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,通常被称为流控制。在 Node.js 中,流已经被采纳为解决方案。

本指南的目的是进一步详细说明什么是反压,以及流如何在 Node.js 的源代码中解决这个问题。本指南的第二部分将介绍建议的最佳实践,以确保您的应用程序的代码在实现流时是安全和优化的。

我们假设您对 Node.js 中 反压BufferEventEmitters 的一般定义有一定了解,并且对 Stream 有一些经验。如果您还没有阅读过这些文档,那么最好先查看一下 API 文档,因为这将有助于您在阅读本指南时扩展您的理解。

数据处理的问题

在计算机系统中,数据通过管道、套接字和信号从一个进程传输到另一个进程。在 Node.js 中,我们发现了一种类似的机制,称为 Stream。流非常棒!它们为 Node.js 做了很多事情,几乎每个内部代码库都使用该模块。作为一名开发人员,我们非常鼓励您也使用它们!

const readline = require('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

rl.question('Why should you use streams? ', answer => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

通过比较 Node.js 的 Stream 实现中的内部系统工具,可以很好地说明通过流实现的反压机制为什么是一个很好的优化。

在一种情况下,我们将采用一个大文件(大约 ~9 GB),并使用熟悉的 zip(1) 工具对其进行压缩。

zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本使用 Node.js 的 zlib 模块,该模块包装了另一个压缩工具 gzip(1)

const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

要测试结果,请尝试打开每个压缩文件。使用 zip(1) 工具压缩的文件将通知您该文件已损坏,而由 Stream 完成的压缩将解压缩且没有错误。

在此示例中,我们使用 .pipe() 将数据源从一端传输到另一端。但是,请注意没有附加适当的错误处理程序。如果一块数据未能被正确接收,则 Readable 源或 gzip 流将不会被销毁。pump 是一个实用工具,如果管道中的其中一个流失败或关闭,它将正确销毁管道中的所有流,在这种情况下,它是必不可少的!

pump 仅对于 Node.js 8.x 或更早版本是必需的,因为对于 Node.js 10.x 或更高版本,引入了 pipeline 来替换 pump。这是一个模块方法,用于在流之间进行管道传输,转发错误并正确清理,并在管道完成时提供回调。

这是一个使用 pipeline 的示例

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

您还可以使用 stream/promises 模块将 pipeline 与 async / await 一起使用

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    );
    console.log('Pipeline succeeded');
  } catch (err) {
    console.error('Pipeline failed', err);
  }
}

数据过多,速度过快

在某些情况下,Readable 流可能会将数据传递给 Writable 流太快了——远远超过了消费者可以处理的量!

当这种情况发生时,消费者将开始排队所有数据块以供以后消费。写入队列将变得越来越长,因此,必须将更多数据保存在内存中,直到整个过程完成。

写入磁盘比从磁盘读取慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,由于写入磁盘将无法跟上读取速度,因此会发生反压。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

这就是反压机制很重要的原因。如果没有反压系统,该过程将耗尽您的系统内存,从而有效地降低其他进程的速度,并垄断您系统的大部分资源直到完成。

这会导致以下几件事

  • 降低所有其他当前进程的速度
  • 一个过度劳累的垃圾回收器
  • 内存耗尽

在以下示例中,我们将删除 返回值 .write() 函数,并将其更改为 true,这有效地禁用了 Node.js 核心中的反压支持。在任何提及“修改”二进制文件的地方,我们都在谈论运行没有 return ret; 行的 node 二进制文件,而是替换为 return true;

垃圾回收的过度负担

让我们快速看一下基准测试。使用与上面相同的示例,我们运行了几次计时试验,以获得两个二进制文件的中值时间。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

两者都需要大约一分钟才能运行,因此没有太大的区别,但让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace 来评估 V8 垃圾回收器发生了什么。

GC(垃圾回收器)测量的时间表示垃圾回收器完成单次扫描的完整周期的间隔

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个进程的启动方式相同,并且似乎以相同的速率运行 GC,但很明显,在几秒钟后,通过适当工作的反压系统,它将 GC 负载分散在 4-8 毫秒的持续间隔内,直到数据传输结束。

但是,当没有反压系统时,V8 垃圾回收开始拖延。正常的二进制文件在一分钟内大约调用 GC 75 次,而修改后的二进制文件仅调用 36 次。

这是由于不断增长的内存使用而累积的缓慢而渐进的债务。随着数据的传输,在没有反压系统的情况下,每次块传输都会使用更多的内存。

分配的内存越多,GC 在一次扫描中需要处理的就越多。扫描越大,GC 需要决定可以释放什么的空间就越多,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。

内存耗尽

为了确定每个二进制文件的内存消耗,我们分别使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 记录了每个进程的时间。

这是正常二进制文件的输出

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为 87.81 mb。

现在更改 返回值.write() 函数,我们得到

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为 1.52 gb。

如果没有流来委托反压,则分配的内存空间的数量级更大 - 同一进程之间的巨大差异!

此实验表明 Node.js 的反压机制对于您的计算系统是多么优化和具有成本效益。现在,让我们分解一下它的工作原理!

反压如何解决这些问题?

有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的函数,称为 .pipe()。您也可以使用 其他软件包!但最终,在这个过程的基本层面,我们有两个独立的组件:数据的来源消费者

当从源调用 .pipe() 时,它会向消费者发出信号,表明有数据要传输。pipe 函数有助于为事件触发器设置适当的反压闭包。

在 Node.js 中,源是一个 Readable 流,而消费者是一个 Writable 流(这两者都可以与 DuplexTransform 流互换,但这超出了本指南的范围)。

可以精确地将反压触发的时刻缩小到 Writable.write() 函数的返回值。当然,这个返回值由一些条件决定。

在任何数据缓冲区超过 highWaterMark 或写入队列当前繁忙的情况下,.write() 将返回 false

当返回 false 值时,反压系统会启动。它会暂停传入的 Readable 流发送任何数据,并等待消费者再次准备就绪。一旦数据缓冲区被清空,就会发出一个 'drain' 事件,并恢复传入的数据流。

一旦队列完成,反压将允许再次发送数据。正在使用的内存空间将被释放,并为下一批数据做好准备。

这有效地允许在任何给定时间为 .pipe() 函数使用固定数量的内存。不会有内存泄漏,也不会有无限缓冲,并且垃圾回收器只需要处理内存中的一个区域!

那么,如果反压如此重要,为什么你(可能)没有听说过它呢?嗯,答案很简单:Node.js 会自动为你完成所有这些。

这太棒了!但是当我们试图理解如何实现我们的自定义流时,这也不是那么好。

在大多数机器中,有一个字节大小决定了缓冲区何时已满(这会因机器而异)。Node.js 允许你设置自定义的 highWaterMark,但通常,默认设置为 16kb(16384,或者对于 objectMode 流为 16)。在您可能想要提高该值的情况下,请放心去做,但要谨慎!

.pipe() 的生命周期

为了更好地理解反压,这里有一个流程图,说明了 Readable 流被 管道Writable 流的生命周期

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果您正在设置一个管道来将几个流链接在一起以操作您的数据,那么您很可能正在实现 Transform 流。

在这种情况下,您的 Readable 流的输出将进入 Transform 并通过管道传输到 Writable

Readable.pipe(Transformable).pipe(Writable);

反压将自动应用,但请注意,Transform 流的传入和传出 highWaterMark 都可以被操作,并且会影响反压系统。

反压指南

自从 Node.js v0.10 以来,Stream 类提供了修改 .read().write() 行为的能力,方法是使用这些相应函数的下划线版本(._read()._write())。

有关于实现 Readable 流实现 Writable 流的指南。 我们将假设您已经阅读了这些,下一节将更深入地探讨一些内容。

实现自定义流时要遵守的规则

流的黄金法则是始终尊重反压。构成最佳实践的是非矛盾实践。只要您小心避免与内部反压支持冲突的行为,您就可以确保您遵循了良好的实践。

一般来说,

  1. 如果未被要求,永远不要 .push()
  2. .write() 返回 false 之后,永远不要调用它,而是等待 'drain' 事件。
  3. 流在不同的 Node.js 版本和你使用的库之间会发生变化。小心并测试事物。

关于第 3 点,一个非常有用的构建浏览器流的包是 readable-stream。 Rodd Vagg 撰写了一篇 精彩的博客文章,描述了这个库的实用性。简而言之,它为 Readable 流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。

特定于 Readable 流的规则

到目前为止,我们已经了解了 .write() 如何影响反压,并且主要关注 Writable 流。 由于 Node.js 的功能,数据在技术上是从 Readable 向下流动到 Writable。 然而,正如我们可以在任何数据、物质或能量的传输中观察到的那样,源与目标同样重要,并且 Readable 流对于如何处理反压至关重要。

这两个过程都依赖于彼此才能有效地进行通信,如果 Readable 忽略了 Writable 流要求停止发送数据的时间,这可能与 .write() 的返回值不正确一样有问题。

因此,除了尊重 .write() 返回值之外,我们还必须尊重在 ._read() 方法中使用的 .push() 的返回值。 如果 .push() 返回一个 false 值,则流将停止从源读取。 否则,它将继续而不暂停。

以下是使用 .push() 的错误实践示例

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

此外,从自定义流外部,存在忽略反压的陷阱。 在这个良好实践的反例中,应用程序的代码在数据可用时(由 'data' 事件发出信号)强制数据通过

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));

以下是使用 .push() 与 Readable 流的示例。

const { Readable } = require('node:stream');

// Create a custom Readable stream
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Push some data onto the stream
    this.push({ message: 'Hello, world!' });
    this.push(null); // Mark the end of the stream
  },
});

// Consume the stream
myReadableStream.on('data', chunk => {
  console.log(chunk);
});

// Output:
// { message: 'Hello, world!' }

在此示例中,我们创建一个自定义 Readable 流,该流使用 .push() 将单个对象推送到流上。 当流准备好使用数据时,将调用 ._read() 方法,在这种情况下,我们立即将一些数据推送到流上,并通过推送 null 来标记流的结束。

然后,我们通过侦听 'data' 事件并记录推送到流上的每个数据块来使用该流。 在这种情况下,我们只将单个数据块推送到流上,因此我们只看到一条日志消息。

特定于 Writable 流的规则

回想一下,.write() 可能会根据某些条件返回 true 或 false。 对我们来说幸运的是,当构建我们自己的 Writable 流时,流状态机 将处理我们的回调,并确定何时处理反压并为我们优化数据流。

但是,当我们想直接使用 Writable 时,我们必须尊重 .write() 的返回值,并密切关注这些条件

  • 如果写入队列繁忙,.write() 将返回 false。
  • 如果数据块太大,.write() 将返回 false(限制由变量 highWaterMark 指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback();
    else if (chunk.toString().indexOf('b') >= 0) callback();
    callback();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();

在实现 ._writev() 时,还需要注意一些事项。 该函数与 .cork() 耦合,但在编写时存在一个常见的错误

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// As a global function.
function doUncork(stream) {
  stream.uncork();
}

我们可以根据需要多次调用 .cork(),我们只需要小心调用 .uncork() 相同的次数才能使其再次流动。

结论

流是 Node.js 中经常使用的模块。 它们对于内部结构以及开发人员扩展和连接到 Node.js 模块生态系统非常重要。

希望现在您能够对自己的 WritableReadable 流进行故障排除和安全编码,同时考虑到反压,并与同事和朋友分享您的知识。

务必阅读有关 Stream 的更多信息,了解其他 API 函数,以帮助您在使用 Node.js 构建应用程序时改进和释放流式传输功能。