流中的背压

数据处理中存在一个普遍问题,称为背压(backpressure),它描述了数据传输过程中缓冲区后方数据的积压。当传输的接收端有复杂操作,或由于某种原因速度较慢时,来自传入源的数据就有可能积聚起来,就像堵塞一样。

为了解决这个问题,必须有一个协调系统来确保数据从一个源头到另一个源头的顺畅流动。不同的社区针对其程序以独特的方式解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,它们通常被称为*流量控制*。在 Node.js 中,流(streams)是采纳的解决方案。

本指南的目的是进一步详细说明什么是背压,以及流在 Node.js 源代码中究竟如何解决这个问题。指南的第二部分将介绍建议的最佳实践,以确保您在实现流时应用程序代码是安全和优化的。

我们假设您对 Node.js 中的背压BufferEventEmitter的一般定义有所了解,并有一些使用Stream的经验。如果您还没有阅读过这些文档,不妨先看一下 API 文档,这将有助于您在阅读本指南时加深理解。

数据处理的问题

在计算机系统中,数据通过管道、套接字和信号从一个进程传输到另一个进程。在 Node.js 中,我们找到了一个类似的机制,称为Stream。流非常棒!它们为 Node.js 做了很多工作,几乎内部代码库的每个部分都利用了该模块。作为开发者,我们也非常鼓励您使用它们!

const  = ('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const  = .({
  : .,
  : .,
});

.('Why should you use streams? ',  => {
  .(`Maybe it's ${}, maybe it's because they are awesome! :)`);

  .();
});

通过比较 Node.js 的Stream实现与内部系统工具,可以很好地说明为什么通过流实现的背压机制是一项出色的优化。

在一个场景中,我们将一个大文件(大约 9 GB)用大家熟悉的zip(1)工具进行压缩。

zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,使用 Node.js 的zlib模块,它封装了另一个压缩工具gzip(1)

const  = ('node:fs');
const  = ('node:zlib').();

const  = .('The.Matrix.1080p.mkv');
const  = .('The.Matrix.1080p.mkv.gz');

.().();

要测试结果,请尝试打开每个压缩文件。由zip(1)工具压缩的文件会通知您文件已损坏,而由Stream完成的压缩则可以无误地解压。

在这个例子中,我们使用 .pipe() 将数据源从一端传输到另一端。但是,请注意没有附加适当的错误处理程序。如果一个数据块未能被正确接收,Readable源或gzip流将不会被销毁。pump是一个实用工具,如果管道中的某个流失败或关闭,它会正确地销毁所有流,在这种情况下是必不可少的!

pump仅适用于 Node.js 8.x 或更早版本,因为在 Node.js 10.x 或更高版本中,引入了pipeline来替代pump。这是一个模块方法,用于在流之间建立管道,转发错误,并妥善清理,并在管道完成时提供回调。

这是一个使用 pipeline 的例子

const  = ('node:fs');
const {  } = ('node:stream');
const  = ('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

(
  .('The.Matrix.1080p.mkv'),
  .(),
  .('The.Matrix.1080p.mkv.gz'),
   => {
    if () {
      .('Pipeline failed', );
    } else {
      .('Pipeline succeeded');
    }
  }
);

你也可以使用stream/promises模块,通过async / await来使用 pipeline。

const  = ('node:fs');
const {  } = ('node:stream/promises');
const  = ('node:zlib');

async function () {
  try {
    await (
      .('The.Matrix.1080p.mkv'),
      .(),
      .('The.Matrix.1080p.mkv.gz')
    );
    .('Pipeline succeeded');
  } catch () {
    .('Pipeline failed', );
  }
}

数据太多、太快

在某些情况下,Readable流可能会以过快的速度将数据提供给Writable流——远超消费者的处理能力!

当这种情况发生时,消费者将开始将所有数据块排队等待稍后消费。写入队列会越来越长,因此必须在内存中保留更多数据,直到整个过程完成。

写入磁盘比从磁盘读取要慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,会发生背压,因为写入磁盘的速度跟不上读取的速度。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

这就是为什么背压机制很重要。如果没有背压系统,进程会耗尽系统内存,从而有效地减慢其他进程的速度,并独占系统的大部分资源直到完成。

这会导致几个问题

  • 减慢所有其他当前进程的速度
  • 一个工作负荷极重的垃圾回收器
  • 内存耗尽

在下面的示例中,我们将取出 .write() 函数的返回值并将其更改为 true,这实际上禁用了 Node.js 核心中的背压支持。在任何提到“修改后”的二进制文件时,我们指的是运行不带 return ret; 行,而是替换为 return true;node 二进制文件。

对垃圾回收的过度拖累

让我们来看一个快速的基准测试。使用上面相同的例子,我们进行了一些计时试验,以获得两种二进制文件的中位数时间。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

两者运行时间都大约一分钟,所以差别不大,但让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具dtrace来评估 V8 垃圾回收器的情况。

GC(垃圾回收器)测量的时间表示垃圾回收器完成一次完整清除周期的间隔

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个进程开始时相同,并且似乎以相同的速率使 GC 工作,但很明显,几秒钟后,在有正常工作的背压系统的情况下,它会将 GC 负载分散到 4-8 毫秒的持续间隔中,直到数据传输结束。

然而,当没有背压系统时,V8 的垃圾回收开始变得拖沓。正常的二进制文件在一分钟内大约触发了 75 次 GC,而修改后的二进制文件只触发了 36 次。

这是由于内存使用量增长而缓慢积累的性能债务。在没有背压系统的情况下,随着数据传输,每个数据块的传输都会占用更多内存。

分配的内存越多,GC 在一次清理中需要处理的就越多。清理范围越大,GC 就需要更多时间来决定哪些可以被释放,而在更大的内存空间中扫描游离指针将消耗更多的计算能力。

内存耗尽

为了确定每个二进制文件的内存消耗,我们分别使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 对每个进程进行了计时。

这是正常二进制文件的输出

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为 87.81 MB。

现在改变 .write() 函数的返回值,我们得到

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为 1.52 GB。

如果没有流来协调背压,分配的内存空间会大一个数量级——同一个进程之间存在巨大的差异!

这个实验展示了 Node.js 的背压机制对于你的计算系统是多么优化和高效。现在,让我们来分析一下它是如何工作的!

背压如何解决这些问题?

有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内置的内部函数叫做 .pipe()。你也可以使用其他包!但最终,在这个过程的基础层面上,我们有两个独立的组件:数据*源*和*消费者*。

当从源头调用 .pipe() 时,它会向消费者发出信号,表示有数据要传输。pipe 函数帮助为事件触发器设置适当的背压闭包。

在 Node.js 中,源是 Readable 流,消费者是 Writable 流(这两者都可以与 DuplexTransform 流互换,但这超出了本指南的范围)。

背压被触发的时刻可以精确地缩小到 Writable.write() 函数的返回值。当然,这个返回值是由一些条件决定的。

在任何数据缓冲区超过 highWaterMark 或写入队列当前繁忙的情况下,.write() 将返回 false

当返回一个 false 值时,背压系统就会启动。它会暂停传入的 Readable 流发送任何数据,并等待消费者再次准备好。一旦数据缓冲区被清空,就会发出一个 'drain' 事件,并恢复传入的数据流。

一旦队列处理完成,背压机制将允许数据再次发送。之前被占用的内存空间将被释放,为下一批数据做准备。

这有效地允许在任何给定时间为 .pipe() 函数使用固定量的内存。不会有内存泄漏,也不会有无限缓冲,垃圾回收器只需要处理内存中的一个区域!

那么,如果背压如此重要,你(可能)为什么没有听说过它?嗯,答案很简单:Node.js 会自动为你完成这一切。

这太棒了!但当我们试图理解如何实现我们自己的自定义流时,这又不是那么好。

在大多数机器中,有一个字节大小决定了缓冲区何时已满(这在不同机器上会有所不同)。Node.js 允许你设置自定义的 highWaterMark,但通常默认设置为 16kb(16384,或者对于 objectMode 流是 16)。在某些情况下,你可能想提高这个值,可以这样做,但要谨慎!

.pipe() 的生命周期

为了更好地理解背压,这里有一个关于 Readable 流被管道传输Writable 流的生命周期流程图

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果你正在设置一个管道来将几个流连接起来以操作你的数据,你很可能会实现 Transform 流。

在这种情况下,你从 Readable 流的输出将进入 Transform 流,然后管道传输到 Writable 流。

Readable.pipe(Transformable).pipe(Writable);

背压会自动应用,但请注意,Transform 流的传入和传出 highWaterMark 都可以被操作,并且会影响背压系统。

背压指南

Node.js v0.10 起,Stream 类提供了修改 .read().write() 行为的能力,通过使用这些相应函数的下划线版本(._read()._write())。

有关于实现可读流实现可写流的文档指南。我们假设您已经阅读过这些内容,下一节将进行更深入的探讨。

实现自定义流时应遵守的规则

流的黄金法则是**始终尊重背压**。最佳实践就是不相互矛盾的实践。只要你小心避免与内部背压支持冲突的行为,你就可以确保你遵循了良好的实践。

总的来说,

  1. 如果没有被要求,绝不要 .push()
  2. .write() 返回 false 后,绝不要再调用它,而应等待 'drain' 事件。
  3. 流在不同的 Node.js 版本和你使用的库之间会有变化。请小心并进行测试。

关于第 3 点,一个用于构建浏览器流的非常有用的包是readable-stream。Rodd Vagg 写了一篇很棒的博客文章,描述了这个库的实用性。简而言之,它为Readable流提供了一种自动的优雅降级,并支持旧版本的浏览器和 Node.js。

针对可读流的特定规则

到目前为止,我们已经探讨了 .write() 如何影响背压,并且主要关注了 Writable 流。由于 Node.js 的功能,数据技术上是从 Readable 流向 Writable 流。然而,正如我们在任何数据、物质或能量传输中观察到的那样,源头和目的地同样重要,Readable 流对于如何处理背压至关重要。

这两个过程相互依赖以实现有效通信,如果 Readable 流忽略了 Writable 流要求其停止发送数据的请求,这可能与 .write() 的返回值不正确一样 problematic。

因此,除了尊重 .write() 的返回之外,我们还必须尊重在 ._read() 方法中使用的 .push() 的返回值。如果 .push() 返回一个 false 值,流将停止从源头读取。否则,它将不间断地继续。

这是一个使用 .push() 的不良实践示例

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class  extends Readable {
  () {
    let ;
    while (null !== ( = getNextChunk())) {
      this.push();
    }
  }
}

这是一个良好实践的例子,其中 Readable 流通过检查 this.push() 的返回值来尊重背压

class  extends Readable {
  () {
    let ;
    let  = true;
    while ( && null !== ( = getNextChunk())) {
       = this.push();
    }
  }
}

此外,从自定义流外部,忽略背压也存在陷阱。在这个良好实践的反例中,应用程序的代码在数据可用时(由 'data' 事件发出信号)强制推送数据

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data',  => writable.write());

这里有一个在可读流中使用 .push() 的例子。

const {  } = ('node:stream');

// Create a custom Readable stream
const  = new ({
  : true,
  () {
    // Push some data onto the stream
    this.({ : 'Hello, world!' });
    this.(null); // Mark the end of the stream
  },
});

// Consume the stream
.('data',  => {
  .();
});

// Output:
// { message: 'Hello, world!' }

在这个例子中,我们创建了一个自定义的可读流,它使用 .push() 将单个对象推入流中。._read() 方法在流准备好消费数据时被调用,在这种情况下,我们立即将一些数据推入流中,并通过推送 null 来标记流的结束。

然后,我们通过监听 'data' 事件来消费流,并记录推入流中的每个数据块。在这种情况下,我们只向流中推送了一个数据块,所以我们只看到一条日志消息。

特定于可写流的规则

回想一下,.write() 可能会根据一些条件返回 true 或 false。幸运的是,当我们构建自己的 Writable 流时,流状态机会处理我们的回调,并决定何时处理背压以及为我们优化数据流。

然而,当我们想直接使用 Writable 时,我们必须尊重 .write() 的返回值,并密切关注这些条件

  • 如果写队列繁忙,.write() 将返回 false。
  • 如果数据块太大,.write() 将返回 false(限制由变量 highWaterMark 指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class  extends Writable {
  (, , ) {
    if (.toString().indexOf('a') >= 0) {
      ();
    } else if (.toString().indexOf('b') >= 0) {
      ();
    }
    ();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) {
  return callback();
}

if (chunk.contains('b')) {
  return callback();
}
callback();

在实现 ._writev() 时也需要注意一些事情。该函数与 .cork() 配对使用,但在编写时有一个常见的错误

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
.(, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
.(, ws);

// As a global function.
function () {
  .uncork();
}

.cork() 可以被调用任意多次,我们只需要小心地调用 .uncork() 相同次数,以使其再次流动。

结论

流是 Node.js 中常用的模块。它们对内部结构很重要,对于开发者来说,扩展和连接 Node.js 模块生态系统也很重要。

希望现在你能够在考虑背压的情况下,安全地编写自己的 WritableReadable 流,并与同事和朋友分享你的知识。

请务必多阅读关于 Stream 的其他 API 函数,以帮助你在使用 Node.js 构建应用程序时,提升和释放你的流处理能力。