如何使用流

在 Node.js 应用程序中使用大量数据可能是一把双刃剑。处理海量数据的能力非常有用,但也可能导致性能瓶颈和内存耗尽。传统上,开发人员通过一次性将整个数据集读入内存来解决这个挑战。这种方法对于较小的数据集来说很直观,但对于大数据(例如,文件、网络请求……)来说,效率低下且资源密集。

这就是 Node.js 流的用武之地。流提供了一种从根本上不同的方法,允许你以增量方式处理数据并优化内存使用。通过以可管理的大小处理数据,流使你能够构建可扩展的应用程序,从而高效地处理即使是最令人生畏的数据集。正如流行引言所说,“流是随时间变化的数组。”

在本指南中,我们将概述流的概念、历史和 API,以及关于如何使用和操作它们的建议。

什么是 Node.js 流?

Node.js 流为管理应用程序中的数据流提供了一个强大的抽象。它们擅长处理大型数据集,例如从文件和网络请求中读取或写入数据,而不会影响性能。

这种方法不同于一次性将整个数据集加载到内存中。流以块的形式处理数据,从而显着减少内存使用。 Node.js 中的所有流都继承自 EventEmitter 类,允许它们在数据处理的各个阶段发出事件。这些流可以是可读的、可写的,或者两者兼而有之,从而为不同的数据处理场景提供灵活性。

事件驱动架构

Node.js 在事件驱动架构上蓬勃发展,使其成为实时 I/O 的理想选择。这意味着尽快使用可用的输入,并在应用程序生成输出后立即发送输出。流与这种方法无缝集成,从而实现连续的数据处理。

它们通过在关键阶段发出事件来实现这一点。这些事件包括接收数据的信号 (data 事件) 和流完成的信号 (end 事件)。开发人员可以监听这些事件并相应地执行自定义逻辑。这种事件驱动的特性使得流对于处理来自外部来源的数据非常高效。

为什么要使用流?

与其他数据处理方法相比,流提供了三个关键优势

  • 内存效率:流以增量方式处理数据,以块的形式使用和处理数据,而不是将整个数据集加载到内存中。这是处理大型数据集时的一个主要优势,因为它显着减少了内存使用并防止了与内存相关的性能问题。
  • 改进的响应时间:流允许立即进行数据处理。当一块数据到达时,无需等待接收整个有效负载或数据集即可对其进行处理。这减少了延迟并提高了应用程序的整体响应能力。
  • 实时处理的可扩展性:通过以块的形式处理数据,Node.js 流可以使用有限的资源有效地处理大量数据。这种可扩展性使流成为在实时处理大量数据的应用程序的理想选择。

这些优势使流成为构建高性能、可扩展的 Node.js 应用程序的强大工具,尤其是在处理大型数据集或实时数据处理时。

性能说明

如果你的应用程序已经将所有数据都准备好地存储在内存中,则使用流可能会增加不必要的开销、复杂性并降低应用程序的速度。

流的历史

本节是 Node.js 中流的历史记录参考。除非你正在使用为 0.11.5 (2013) 之前的 Node.js 版本编写的代码库,否则你很少会遇到旧版本的流 API,但这些术语可能仍在使用。

流 0

第一个版本的流与 Node.js 同时发布。虽然当时还没有 Stream 类,但不同的模块使用了这个概念并实现了 read/write 函数。可以使用 util.pump() 函数来控制流之间的数据流。

流 1 (经典)

随着 2011 年 Node v0.4.0 的发布,引入了 Stream 类,以及 pipe() 方法。

流 2

在 2012 年发布的 Node v0.10.0 中,推出了流 2。此更新带来了新的流子类,包括 Readable、Writable、Duplex 和 Transform。此外,还添加了 readable 事件。为了保持向后兼容性,可以通过添加 data 事件监听器或调用 pause()resume() 方法将流切换到旧模式。

流 3

在 2013 年,流 3 与 Node v0.11.5 一起发布,以解决流同时具有 datareadable 事件处理程序的问题。这消除了在“当前”和“旧”模式之间进行选择的需要。流 3 是 Node.js 中流的当前版本。

流类型

可读 (Readable)

Readable 是我们用来按顺序读取数据源的类。Node.js API 中 Readable 流的典型示例包括读取文件时的 fs.ReadStream 、读取 HTTP 请求时的 http.IncomingMessage 以及从标准输入读取时的 process.stdin

关键方法和事件

可读流使用几个核心方法和事件运行,这些方法和事件允许对数据处理进行精细控制

  • on('data'):每当流中有数据可用时,都会触发此事件。它非常快,因为流会尽快推送数据,因此非常适合高吞吐量场景。
  • on('end'):当没有更多数据可从流中读取时发出。它表示数据传递的完成。仅当已使用流中的所有数据时,才会触发此事件。
  • on('readable'):当有数据可从流中读取或已到达流的末尾时,会触发此事件。它允许在需要时进行更受控制的数据读取。
  • on('close'):当流及其底层资源已关闭时,将发出此事件,并指示将不再发出任何事件。
  • on('error'):此事件可以在任何时候发出,表示处理过程中出现错误。此事件的处理程序可用于避免未捕获的异常。

在以下各节中可以看到这些事件的使用演示。

基本可读流

这是一个简单的可读流实现的示例,该实现动态生成数据

const { Readable } = require('node:stream');

class MyStream extends Readable {
  #count = 0;
  _read(size) {
    this.push(':-)');
    if (++this.#count === 5) {
      this.push(null);
    }
  }
}

const stream = new MyStream();

stream.on('data', chunk => {
  console.log(chunk.toString());
});

在此代码中,MyStream 类扩展了 Readable 并重写了 [_read][] 方法以将字符串 ":-)" 推送到内部缓冲区。在将字符串推送五次后,它通过推送 null 来指示流的结束。on('data') 事件处理程序会将每个块记录到控制台,因为已接收到这些块。

使用 readable 事件进行高级控制

为了更精细地控制数据流,可以使用 readable 事件。这个事件更加复杂,但通过允许显式控制何时从流中读取数据,为某些应用程序提供了更好的性能。

const stream = new MyStream({
  highWaterMark: 1,
});

stream.on('readable', () => {
  console.count('>> readable event');
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log(chunk.toString()); // Process the chunk
  }
});
stream.on('end', () => console.log('>> end event'));

在这里,readable 事件用于根据需要手动从流中提取数据。readable 事件处理程序中的循环会继续从流缓冲区读取数据,直到它返回 null,表明缓冲区暂时为空或流已结束。将 highWaterMark 设置为 1 可以保持较小的缓冲区大小,更频繁地触发 readable 事件,并允许更精细地控制数据流。

使用之前的代码,你将得到类似以下的输出

>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event

让我们尝试理解一下。当我们附加 on('readable') 事件时,它会首先调用 read(),因为这可能会触发 readable 事件的发出。在发出该事件之后,我们在 while 循环的第一次迭代中调用 read。这就是为什么我们在同一行得到前两个笑脸的原因。之后,我们一直调用 read 直到 null 被推送。每次调用 read 都会安排一个新的 readable 事件的发出,但由于我们处于“流动”模式(即,使用 readable 事件),因此该发出被安排到 nextTick。这就是为什么我们在循环的同步代码完成后,在最后得到所有这些笑脸的原因。

注意:你可以尝试使用 NODE_DEBUG=stream 运行代码,以查看每次 push 之后都会触发 emitReadable

如果我们希望在每个笑脸之前看到 readable 事件被调用,我们可以像这样将 push 包装到 setImmediateprocess.nextTick

class MyStream extends Readable {
  #count = 0;
  _read(size) {
    setImmediate(() => {
      this.push(':-)');
      if (++this.#count === 5) {
        return this.push(null);
      }
    });
  }
}

我们会得到

>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event

Writable

Writable 流对于创建文件、上传数据或任何涉及顺序输出数据的任务非常有用。虽然 readable 流提供数据源,但 Node.js 中的 writable 流充当数据的目的地。Node.js API 中 writable 流的典型示例包括 fs.WriteStreamprocess.stdoutprocess.stderr

Writable 流中的关键方法和事件

  • .write(): 此方法用于将数据块写入流。它通过将数据缓冲到定义的限制 (highWaterMark) 来处理数据,并返回一个布尔值,指示是否可以立即写入更多数据。
  • .end(): 此方法表示数据写入过程的结束。它向流发出信号,指示完成写入操作并可能执行任何必要的清理。

创建 Writable

这是一个创建 writable 流的示例,该流将所有传入数据转换为大写,然后再将其写入标准输出

const { Writable } = require('node:stream');
const { once } = require('node:events');

class MyStream extends Writable {
  constructor() {
    super({ highWaterMark: 10 /* 10 bytes */ });
  }
  _write(data, encode, cb) {
    process.stdout.write(data.toString().toUpperCase() + '\n', cb);
  }
}

async function main() {
  const stream = new MyStream();

  for (let i = 0; i < 10; i++) {
    const waitDrain = !stream.write('hello');

    if (waitDrain) {
      console.log('>> wait drain');
      await once(stream, 'drain');
    }
  }

  stream.end('world');
}

// Call the async function
main().catch(console.error);

在此代码中,MyStream 是一个自定义的 Writable 流,具有 10 字节的缓冲区容量 (highWaterMark)。它重写了 _write 方法,以将数据转换为大写然后再将其写出。

循环尝试将 hello 写入流十次。如果缓冲区已满(waitDrain 变为 true),它将等待 drain 事件,然后再继续,从而确保我们不会淹没流的缓冲区。

输出将是

HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD

Duplex

Duplex 流同时实现 readable 和 writable 接口。

Duplex 流中的关键方法和事件

Duplex 流实现 Readable 和 Writable 流中描述的所有方法和事件。

一个很好的 duplex 流示例是 net 模块中的 Socket

const net = require('node:net');

// Create a TCP server
const server = net.createServer(socket => {
  socket.write('Hello from server!\n');

  socket.on('data', data => {
    console.log(`Client says: ${data.toString()}`);
  });

  // Handle client disconnection
  socket.on('end', () => {
    console.log('Client disconnected');
  });
});

// Start the server on port 8080
server.listen(8080, () => {
  console.log('Server listening on port 8080');
});

之前的代码将在端口 8080 上打开一个 TCP 套接字,将 Hello from server! 发送给任何连接的客户端,并记录接收到的任何数据。

const net = require('node:net');

// Connect to the server at localhost:8080
const client = net.createConnection({ port: 8080 }, () => {
  client.write('Hello from client!\n');
});

client.on('data', data => {
  console.log(`Server says: ${data.toString()}`);
});

// Handle the server closing the connection
client.on('end', () => {
  console.log('Disconnected from server');
});

之前的代码将连接到 TCP 套接字,发送 Hello from client 消息,并记录接收到的任何数据。

Transform

Transform 流是 duplex 流,其中输出是根据输入计算得出的。顾名思义,它们通常用于 readable 流和 writable 流之间,以在数据通过时转换数据。

Transform 流中的关键方法和事件

除了 Duplex 流中的所有方法和事件之外,还有

  • _transform: 此函数在内部调用,以处理 readable 和 writable 部分之间的数据流。应用程序代码不得调用此函数。

创建 Transform 流

要创建一个新的 transform 流,我们可以将一个 options 对象传递给 Transform 构造函数,其中包括一个 transform 函数,该函数处理如何使用 push 方法从输入数据计算输出数据。

const { Transform } = require('node:stream');

const upper = new Transform({
  transform: function (data, enc, cb) {
    this.push(data.toString().toUpperCase());
    cb();
  },
});

此流将获取任何输入并以大写形式输出。

如何使用流进行操作

使用流时,我们通常希望从源读取并写入目标,可能需要对数据进行一些转换。以下各节将介绍执行此操作的不同方法。

.pipe()

.pipe() 方法将一个 readable 流连接到一个 writable(或 transform)流。虽然这看起来是实现我们目标的简单方法,但它将所有错误处理委托给程序员,这使得正确处理变得困难。

以下示例显示了一个 pipe 尝试以大写形式将当前文件输出到控制台。

const fs = require('node:fs');
const { Transform } = require('node:stream');

let errorCount = 0;
const upper = new Transform({
  transform: function (data, enc, cb) {
    if (errorCount === 10) {
      return cb(new Error('BOOM!'));
    }
    errorCount++;
    this.push(data.toString().toUpperCase());
    cb();
  },
});

const readStream = fs.createReadStream(__filename, { highWaterMark: 1 });
const writeStream = process.stdout;

readStream.pipe(upper).pipe(writeStream);

readStream.on('close', () => {
  console.log('Readable stream closed');
});

upper.on('close', () => {
  console.log('Transform stream closed');
});

upper.on('error', err => {
  console.error('\nError in transform stream:', err.message);
});

writeStream.on('close', () => {
  console.log('Writable stream closed');
});

写入 10 个字符后,upper 将在回调中返回一个错误,这将导致流关闭。但是,其他流不会收到通知,从而导致内存泄漏。输出将是

CONST FS =
Error in transform stream: BOOM!
Transform stream closed

pipeline()

为了避免 .pipe() 方法的陷阱和底层复杂性,在大多数情况下,建议使用 pipeline() 方法。此方法是一种更安全、更健壮的方式来将流管道连接在一起,自动处理错误和清理。

以下示例演示了如何使用 pipeline() 防止前一个示例的陷阱

const fs = require('node:fs');
const { Transform, pipeline } = require('node:stream');

let errorCount = 0;
const upper = new Transform({
  transform: function (data, enc, cb) {
    if (errorCount === 10) {
      return cb(new Error('BOOM!'));
    }
    errorCount++;
    this.push(data.toString().toUpperCase());
    cb();
  },
});

const readStream = fs.createReadStream(__filename, { highWaterMark: 1 });
const writeStream = process.stdout;

readStream.on('close', () => {
  console.log('Readable stream closed');
});

upper.on('close', () => {
  console.log('\nTransform stream closed');
});

writeStream.on('close', () => {
  console.log('Writable stream closed');
});

pipeline(readStream, upper, writeStream, err => {
  if (err) {
    return console.error('Pipeline error:', err.message);
  }
  console.log('Pipeline succeeded');
});

在这种情况下,所有流都将关闭,并显示以下输出

CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed

pipeline() 方法还有一个 async pipeline() 版本,该版本不接受回调,而是返回一个 promise,如果管道失败,该 promise 将被拒绝。

异步迭代器

建议使用异步迭代器作为与 Streams API 交互的标准方式。与 Web 和 Node.js 中的所有流原语相比,异步迭代器更易于理解和使用,有助于减少错误并提高代码的可维护性。在最新版本的 Node.js 中,异步迭代器已经成为与流交互的更优雅、更易读的方式。异步迭代器建立在事件的基础上,提供了一个更高级别的抽象,简化了流的使用。

在 Node.js 中,所有 readable 流都是异步可迭代的。这意味着你可以使用 for await...of 语法来循环遍历流的数据,因为它变得可用,并以异步代码的效率和简洁性处理每条数据。

将异步迭代器与流一起使用的好处

将异步迭代器与流一起使用可通过以下几种方式简化异步数据流的处理

  • 增强的可读性:代码结构更简洁、更易读,尤其是在处理多个异步数据源时。
  • 错误处理:异步迭代器允许使用 try/catch 块进行简单的错误处理,类似于常规的异步函数。
  • 流控制:它们固有地管理背压,因为消费者通过等待下一条数据来控制流,从而实现更高效的内存使用和处理。

异步迭代器提供了一种更现代且通常更易读的方式来处理 readable 流,尤其是在处理异步数据源或当你喜欢更顺序的、基于循环的数据处理方法时。

这是一个演示将异步迭代器与 readable 流一起使用的示例

const fs = require('node:fs');
const { pipeline } = require('node:stream/promises');

async function main() {
  await pipeline(
    fs.createReadStream(__filename),
    async function* (source) {
      for await (let chunk of source) {
        yield chunk.toString().toUpperCase();
      }
    },
    process.stdout
  );
}

main().catch(console.error);

此代码实现了与前面示例相同的结果,而无需定义新的 transform 流。为了简洁起见,已删除前面示例中的错误。已使用异步版本的管道,并且应将其包装在 try...catch 块中以处理可能的错误。

对象模式

默认情况下,流可以处理字符串、BufferTypedArrayDataView。如果将与这些不同的任意值(例如,对象)推送到流中,则会抛出 TypeError。但是,可以通过将 objectMode 选项设置为 true 来处理对象。这允许流处理任何 JavaScript 值,除了 null,它用于表示流的结束。这意味着你可以在 readable 流中 pushread 任何值,并在 writable 流中 write 任何值。

const { Readable } = require('node:stream');

const readable = Readable({
  objectMode: true,
  read() {
    this.push({ hello: 'world' });
    this.push(null);
  },
});

在对象模式下工作时,重要的是要记住 highWaterMark 选项是指对象的数量,而不是字节数。

背压

使用流时,重要的是确保生产者不会淹没消费者。为此,背压机制用于 Node.js API 中的所有流,并且实现者负责维护该行为。

在数据缓冲区超过 highWaterMark 或写入队列当前繁忙的任何情况下,.write() 将返回 false

当返回 false 值时,背压系统会启动。它将暂停来自传入的 Readable 流的任何数据发送,并等待直到消费者再次准备好。一旦数据缓冲区被清空,就会发出一个 'drain' 事件以恢复传入的数据流。

要更深入地了解背压,请查看 backpressure guide

流 (Streams) 与 Web 流 (Web Streams)

流的概念并非 Node.js 独有。事实上,Node.js 拥有流概念的不同实现,称为 Web Streams,它实现了 WHATWG Streams Standard。尽管它们背后的概念相似,但重要的是要意识到它们具有不同的 API,并且彼此不直接兼容。

Web Streams 实现了 ReadableStreamWritableStreamTransformStream 类,它们分别对应于 Node.js 的 ReadableWritableTransform 流。

流与 Web Streams 的互操作性

Node.js 提供了实用函数,用于在 Web Streams 和 Node.js 流之间进行相互转换。 这些函数在每个流类中都实现为 toWebfromWeb 方法。

以下 Duplex 类中的示例演示了如何使用转换为 Web Streams 的可读和可写流。

const { Duplex } = require('node:stream');

const duplex = Duplex({
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value);
  });

如果您需要从 Node.js 模块返回 Web Stream,或反之亦然,则辅助函数很有用。 对于流的常规使用,async 迭代器可以实现与 Node.js 和 Web Streams 的无缝交互。

const { pipeline } = require('node:stream/promises');

async function main() {
  const { body } = await fetch('https://node.org.cn/api/stream.html');

  await pipeline(
    body,
    new TextDecoderStream(),
    async function* (source) {
      for await (const chunk of source) {
        yield chunk.toString().toUpperCase();
      }
    },
    process.stdout
  );
}

main().catch(console.error);

请注意,fetch body 是一个 ReadableStream<Uint8Array>,因此需要 TextDecoderStream 来处理字符串形式的块。

这项工作源自 Matteo CollinaPlatformatic 的博客中发布的内容。