如何使用流

在 Node.js 应用程序中处理大量数据可能是一把双刃剑。处理海量数据的能力非常方便,但也可能导致性能瓶颈和内存耗尽。传统上,开发人员通过一次性将整个数据集读入内存来应对这一挑战。这种方法虽然对于较小的数据集很直观,但对于大数据(例如文件、网络请求等)则变得效率低下且资源密集。

这就是 Node.js 流发挥作用的地方。流提供了一种根本不同的方法,允许您增量处理数据并优化内存使用。通过将数据处理成可管理的块,流使您能够构建可扩展的应用程序,从而高效地处理即使是最艰巨的数据集。正如一句流行语所说,“流是随时间变化的数组。”

在本指南中,我们概述了流的概念、历史和 API,并就如何使用和操作它们提供了一些建议。

什么是 Node.js 流?

Node.js 流为管理应用程序中的数据流提供了强大的抽象。它们擅长处理大型数据集,例如从文件和网络请求中读取或写入数据,而不会影响性能。

这种方法不同于一次性将整个数据集加载到内存中。流以块的形式处理数据,从而显著减少内存使用。Node.js 中的所有流都继承自 EventEmitter 类,允许它们在数据处理的各个阶段发出事件。这些流可以是可读的、可写的,或两者兼备,为不同的数据处理场景提供了灵活性。

事件驱动架构

Node.js 以事件驱动架构为核心,使其成为实时 I/O 的理想选择。这意味着一旦有输入可用就立即消费,一旦应用程序生成输出就立即发送。流与这种方法无缝集成,实现了持续的数据处理。

它们通过在关键阶段发出事件来实现这一点。这些事件包括接收到数据的信号(data 事件)和流完成的信号(end 事件)。开发人员可以监听这些事件并相应地执行自定义逻辑。这种事件驱动的特性使得流在处理来自外部源的数据时非常高效。

为什么要使用流?

与其他数据处理方法相比,流提供了三个关键优势

  • 内存效率:流以增量方式处理数据,以块的形式消费和处理数据,而不是将整个数据集加载到内存中。在处理大型数据集时,这是一个主要优势,因为它显著减少了内存使用并防止了与内存相关的性能问题。
  • 改善响应时间:流允许立即处理数据。当一块数据到达时,可以立即处理,而无需等待整个有效载荷或数据集接收完毕。这减少了延迟并提高了应用程序的整体响应能力。
  • 实时处理的可扩展性:通过分块处理数据,Node.js 流可以用有限的资源高效地处理大量数据。这种可扩展性使流成为实时处理大量数据的应用程序的理想选择。

这些优势使流成为构建高性能、可扩展的 Node.js 应用程序的强大工具,尤其是在处理大型数据集或实时数据时。

关于性能的说明

如果您的应用程序中已经有所有可用的数据,使用流可能会增加不必要的开销、复杂性,并减慢您的应用程序。

流的历史

本节参考了 Node.js 中流的历史。除非您正在处理为 Node.js 0.11.5(2013 年)之前的版本编写的代码库,否则您很少会遇到旧版本的流 API,但这些术语可能仍在使用中。

流 0

第一版流与 Node.js 同时发布。虽然当时还没有 Stream 类,但不同的模块使用了这个概念并实现了 read/write 函数。util.pump() 函数可用于控制流之间的数据流。

流 1(经典)

随着 2011 年 Node v0.4.0 的发布,引入了 Stream 类以及 pipe() 方法。

流 2

2012 年,随着 Node v0.10.0 的发布,流 2 问世。此更新带来了新的流子类,包括 Readable、Writable、Duplex 和 Transform。此外,还添加了 readable 事件。为了保持向后兼容性,可以通过添加 data 事件监听器或调用 pause()resume() 方法将流切换到旧模式。

流 3

2013 年,随着 Node v0.11.5 的发布,流 3 问世,以解决流同时具有 datareadable 事件处理程序的问题。这消除了在“当前”和“旧”模式之间进行选择的需要。流 3 是 Node.js 中流的当前版本。

流类型

可读流

Readable 是我们用来顺序读取数据源的类。Node.js API 中 Readable 流的典型示例包括读取文件时的 fs.ReadStream 、读取 HTTP 请求时的 http.IncomingMessage 以及从标准输入读取时的 process.stdin

关键方法和事件

可读流通过几个核心方法和事件进行操作,这些方法和事件允许对数据处理进行精细控制。

  • on('data'):每当流中有数据可用时,就会触发此事件。它非常快,因为流会尽可能快地推送数据,使其适用于高吞吐量场景。
  • on('end'):当流中没有更多数据可读时发出。它表示数据传递的完成。此事件仅在流中的所有数据都已被消费后才会触发。
  • on('readable'):当流中有数据可读或已到达流末尾时,会触发此事件。它允许在需要时更可控地读取数据。
  • on('close'):当流及其底层资源已关闭时会发出此事件,并表示不会再发出任何事件。
  • on('error'):此事件可在任何时候发出,表示处理过程中出现错误。此事件的处理程序可用于避免未捕获的异常。

以下各节将演示这些事件的使用。

基本可读流

以下是一个动态生成数据的简单可读流实现示例:

class  extends Readable {
  #count = 0;
  () {
    this.push(':-)');
    if (++this.#count === 5) {
      this.push(null);
    }
  }
}

const  = new ();

.on('data',  => {
  .(.toString());
});

在此代码中,MyStream 类扩展了 Readable 并重写了 _read() 方法,以将字符串“:-)”推送到内部缓冲区。在推送该字符串五次后,它通过推送 null 来表示流的结束。on('data') 事件处理程序会在接收到每个数据块时将其记录到控制台。

使用 readable 事件进行高级控制

为了更精细地控制数据流,可以使用 readable 事件。此事件更复杂,但通过允许明确控制何时从流中读取数据,为某些应用程序提供了更好的性能:

const  = new MyStream({
  : 1,
});

.on('readable', () => {
  .('>> readable event');
  let ;
  while (( = .read()) !== null) {
    .(.toString()); // Process the chunk
  }
});
.on('end', () => .('>> end event'));

在这里,readable 事件用于根据需要手动从流中拉取数据。readable 事件处理程序内的循环会继续从流缓冲区中读取数据,直到返回 null,这表示缓冲区暂时为空或流已结束。将 highWaterMark 设置为 1 会保持较小的缓冲区大小,从而更频繁地触发 readable 事件,并允许对数据流进行更精细的控制。

使用前面的代码,您将得到如下输出:

>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event

我们来分析一下。当我们附加 on('readable') 事件时,它会首次调用 read(),因为这可能会触发 readable 事件的发射。在该事件发射之后,我们在 while 循环的第一次迭代中调用 read。这就是为什么我们在同一行中得到前两个笑脸。之后,我们继续调用 read,直到推送了 null。每次调用 read 都会安排发射一个新的 readable 事件,但由于我们处于“流动”模式(即使用 readable 事件),发射被安排在 nextTick。这就是为什么我们在循环的同步代码结束后,最后一次性得到它们的原因。

注意:您可以尝试使用 NODE_DEBUG=stream 运行代码,以查看在每次 push 后都会触发 emitReadable

如果我们想在每个笑脸之前看到 readable 事件被调用,我们可以将 push 包装在 setImmediateprocess.nextTick 中,如下所示:

class  extends Readable {
  #count = 0;
  () {
    (() => {
      this.push(':-)');
      if (++this.#count === 5) {
        return this.push(null);
      }
    });
  }
}

然后我们会得到:

>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event

可写流

Writable 流对于创建文件、上传数据或任何涉及顺序输出数据的任务都很有用。虽然可读流提供了数据源,但 Node.js 中的可写流则充当数据的目的地。Node.js API 中可写流的典型示例是 fs.WriteStream process.stdout process.stderr

可写流中的关键方法和事件

  • .write():此方法用于向流中写入一块数据。它通过缓冲数据直到达到定义的限制 (highWaterMark) 来处理数据,并返回一个布尔值,指示是否可以立即写入更多数据。
  • .end():此方法表示数据写入过程的结束。它向流发出信号,以完成写入操作,并可能执行任何必要的清理工作。

创建可写流

以下是创建一个可写流的示例,该流在将所有传入数据写入标准输出之前将其转换为大写:

const {  } = ('node:events');
const {  } = ('node:stream');

class  extends  {
  constructor() {
    super({ : 10 /* 10 bytes */ });
  }
  (, , ) {
    ..(.toString().toUpperCase() + '\n', );
  }
}

async function () {
  const  = new ();

  for (let  = 0;  < 10; ++) {
    const  = !.('hello');

    if () {
      .('>> wait drain');
      await (, 'drain');
    }
  }

  .('world');
}

// Call the async function
().(.);

在这段代码中,MyStream 是一个自定义的 Writable 流,其缓冲区容量(highWaterMark)为 10 字节。它重写了 _write 方法,在写出数据之前将其转换为大写。

循环尝试向流中写入十次 hello。如果缓冲区已满(waitDrain 变为 true),它会等待一个 drain 事件,然后再继续,以确保我们不会使流的缓冲区过载。

输出将是:

HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD

双工流

Duplex 流同时实现了可读和可写接口。

双工流中的关键方法和事件

双工流实现了可读流和可写流中描述的所有方法和事件。

一个很好的双工流示例是 net 模块中的 Socket 类:

const  = ('node:net');

// Create a TCP server
const  = .( => {
  .('Hello from server!\n');

  .('data',  => {
    .(`Client says: ${.()}`);
  });

  // Handle client disconnection
  .('end', () => {
    .('Client disconnected');
  });
});

// Start the server on port 8080
.(8080, () => {
  .('Server listening on port 8080');
});

之前的代码将在 8080 端口上打开一个 TCP 套接字,向任何连接的客户端发送 Hello from server!,并记录任何接收到的数据。

const  = ('node:net');

// Connect to the server at localhost:8080
const  = .({ : 8080 }, () => {
  .('Hello from client!\n');
});

.('data',  => {
  .(`Server says: ${.()}`);
});

// Handle the server closing the connection
.('end', () => {
  .('Disconnected from server');
});

前面的代码将连接到 TCP 套接字,发送一条 Hello from client 消息,并记录任何接收到的数据。

转换流

Transform 流是双工流,其输出是根据输入计算得出的。顾名思义,它们通常用于在可读流和可写流之间转换数据。

转换流中的关键方法和事件

除了双工流中的所有方法和事件之外,还有:

  • _transform:此函数在内部调用,用于处理可读和可写部分之间的数据流。应用程序代码不得调用此函数。

创建转换流

要创建一个新的转换流,我们可以向 Transform 构造函数传递一个 options 对象,其中包含一个 transform 函数,该函数使用 push 方法处理如何从输入数据计算输出数据。

const {  } = ('node:stream');

const  = new ({
  (, , ) {
    this.(.toString().toUpperCase());
    ();
  },
});

此流将接收任何输入并以大写形式输出。

如何操作流

在使用流时,我们通常希望从一个源读取并写入一个目标,期间可能需要对数据进行一些转换。以下各节将介绍实现此目的的不同方法。

.pipe()

.pipe() 方法将一个可读流连接到一个可写(或转换)流。虽然这看起来是实现我们目标的简单方法,但它将所有错误处理都委托给了程序员,这使得正确实现变得困难。

下面的示例展示了一个 pipe 试图将当前文件以大写形式输出到控制台。

const  = ('node:fs');
const {  } = ('node:stream');

let  = 0;
const  = new ({
  (, , ) {
    if ( === 10) {
      return (new ('BOOM!'));
    }
    ++;
    this.(.toString().toUpperCase());
    ();
  },
});

const  = .(, { : 1 });
const  = .;

.().();

.('close', () => {
  .('Readable stream closed');
});

.('close', () => {
  .('Transform stream closed');
});

.('error',  => {
  .('\nError in transform stream:', .);
});

.('close', () => {
  .('Writable stream closed');
});

写入 10 个字符后,upper 将在回调中返回一个错误,这将导致流关闭。但是,其他流不会收到通知,从而导致内存泄漏。输出将是:

CONST FS =
Error in transform stream: BOOM!
Transform stream closed

pipeline()

为了避免 .pipe() 方法的陷阱和低级复杂性,在大多数情况下,建议使用 pipeline() 方法。此方法是一种更安全、更健壮的管道流方式,可自动处理错误和清理工作。

下面的示例演示了如何使用 pipeline() 来避免前面示例的陷阱:

const  = ('node:fs');
const { ,  } = ('node:stream');

let  = 0;
const  = new ({
  (, , ) {
    if ( === 10) {
      return (new ('BOOM!'));
    }
    ++;
    this.(.toString().toUpperCase());
    ();
  },
});

const  = .(, { : 1 });
const  = .;

.('close', () => {
  .('Readable stream closed');
});

.('close', () => {
  .('\nTransform stream closed');
});

.('close', () => {
  .('Writable stream closed');
});

(, , ,  => {
  if () {
    return .('Pipeline error:', .);
  }
  .('Pipeline succeeded');
});

在这种情况下,所有流都将被关闭,并显示以下输出:

CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed

pipeline() 方法还有一个 async pipeline() 版本,它不接受回调,而是返回一个在管道失败时被拒绝的 promise。

异步迭代器

异步迭代器被推荐为与流 API 交互的标准方式。与 Web 和 Node.js 中的所有流原语相比,异步迭代器更易于理解和使用,有助于减少错误并提高代码的可维护性。在 Node.js 的最新版本中,异步迭代器已成为一种更优雅、更易读的与流交互的方式。基于事件的基础,异步迭代器提供了一个更高级别的抽象,简化了流的消费。

在 Node.js 中,所有可读流都是异步可迭代的。这意味着您可以使用 for await...of 语法来循环遍历流的数据,并在数据可用时,以异步代码的效率和简单性处理每一块数据。

将异步迭代器与流一起使用的好处

将异步迭代器与流一起使用,可以从以下几个方面简化异步数据流的处理:

  • 增强可读性:代码结构更清晰、更易读,尤其是在处理多个异步数据源时。
  • 错误处理:异步迭代器允许使用 try/catch 块进行直接的错误处理,类似于常规的异步函数。
  • 流量控制:它们通过消费者等待下一块数据来固有地管理背压,从而允许更高效的内存使用和处理。

异步迭代器提供了一种更现代且通常更易读的方式来处理可读流,尤其是在处理异步数据源或当您更喜欢采用更顺序的、基于循环的数据处理方法时。

下面是一个演示如何将异步迭代器与可读流一起使用的示例:

const  = ('node:fs');
const {  } = ('node:stream/promises');

async function () {
  await (
    .(),
    async function* () {
      for await (let  of ) {
        yield .toString().toUpperCase();
      }
    },
    .
  );
}

().(.);

此代码实现了与前面示例相同的结果,但无需定义新的转换流。为了简洁起见,已删除了前面示例中的错误。已使用管道的异步版本,应将其包装在 try...catch 块中以处理可能的错误。

对象模式

默认情况下,流可以处理字符串、BufferTypedArrayDataView。如果将不同于这些类型的任意值(例如对象)推送到流中,则会抛出 TypeError。但是,可以通过将 objectMode 选项设置为 true 来处理对象。这允许流处理任何 JavaScript 值,除了 null,它用于表示流的结束。这意味着您可以在可读流中 pushread 任何值,并在可写流中 write 任何值。

const {  } = ('node:stream');

const  = ({
  : true,
  () {
    this.push({ : 'world' });
    this.push(null);
  },
});

在对象模式下工作时,重要的是要记住 highWaterMark 选项指的是对象的数量,而不是字节数。

背压

使用流时,确保生产者不会压垮消费者非常重要。为此,背压机制在 Node.js API 的所有流中都得到了使用,并且实现者有责任维护这种行为。

在任何数据缓冲区超过 highWaterMark 或写入队列当前繁忙的情况下,.write() 将返回 false

当返回 false 值时,背压系统就会启动。它将暂停传入的 Readable 流发送任何数据,并等待消费者再次准备就绪。一旦数据缓冲区清空,将发出一个 'drain' 事件以恢复传入的数据流。

要更深入地了解背压,请查看 背压指南

流 vs Web 流

流的概念并非 Node.js 独有。实际上,Node.js 对流概念有不同的实现,称为 Web Streams,它实现了 WHATWG Streams Standard。虽然它们背后的概念相似,但重要的是要意识到它们有不同的 API 并且不直接兼容。

Web Streams 实现了 ReadableStreamWritableStreamTransformStream 类,它们与 Node.js 的 ReadableWritableTransform 流同源。

流和 Web 流的互操作性

Node.js 提供了在 Web Streams 和 Node.js streams 之间进行转换的实用函数。这些函数在每个流类中作为 toWebfromWeb 方法实现。

Duplex 类中的以下示例演示了如何处理转换为 Web 流的可读和可写流:

const {  } = ('node:stream');

const  = ({
  () {
    this.push('world');
    this.push(null);
  },
  (, , ) {
    .('writable', );
    ();
  },
});

const { ,  } = .();
.().('hello');


  .()
  .()
  .( => {
    .('readable', .);
  });

如果您需要从 Node.js 模块返回一个 Web 流,反之亦然,那么这些辅助函数会很有用。对于常规的流消费,异步迭代器可以实现与 Node.js 和 Web 流的无缝交互。

const {  } = ('node:stream/promises');

async function () {
  const {  } = await ('https://node.org.cn/api/stream.html');

  await (
    ,
    new (),
    async function* () {
      for await (const  of ) {
        yield .toString().toUpperCase();
      }
    },
    .
  );
}

().(.);

请注意,fetch 主体是 ReadableStream<Uint8Array>,因此需要一个 TextDecoderStream 来将数据块作为字符串处理。

这项工作源自 Matteo CollinaPlatformatic 博客上发布的内容。