工作线程#

稳定性: 2 - 稳定

源代码: lib/worker_threads.js

node:worker_threads 模块可以使用并行执行 JavaScript 的线程。 要访问它:

import worker from 'node:worker_threads';'use strict';

const worker = require('node:worker_threads');

工作线程(线程)对于执行 CPU 密集型的 JavaScript 操作很有用。 它们对 I/O 密集型工作帮助不大。 Node.js 内置的异步 I/O 操作比工作线程更有效。

child_processcluster 不同,worker_threads 可以共享内存。 它们通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现。

import {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} from 'node:worker_threads';

if (!isMainThread) {
  const { parse } = await import('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

export default function parseJSAsync(script) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(new URL(import.meta.url), {
      workerData: script,
    });
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
};'use strict';

const {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} = require('node:worker_threads');

if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script,
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

上面的示例为每个 parseJSAsync() 调用生成一个工作线程。 在实践中,使用工作线程池来处理这些类型的任务。 否则,创建工作线程的开销可能会超过它们的好处。

在实现工作线程池时,使用 AsyncResource API 来通知诊断工具(例如,提供异步堆栈跟踪)关于任务与其结果之间的相关性。 有关示例实现,请参见 async_hooks 文档中的 "为 Worker 线程池使用 AsyncResource"

默认情况下,工作线程继承非进程特定的选项。 请参阅 Worker constructor options 以了解如何自定义工作线程选项,特别是 argvexecArgv 选项。

worker.getEnvironmentData(key)#

  • key <any> 可以用作 <Map> 键的任何任意的、可克隆的 JavaScript 值。
  • 返回: <any>

在工作线程中,worker.getEnvironmentData() 返回传递给生成线程的 worker.setEnvironmentData() 的数据的克隆。 每个新的 Worker 自动接收其自己的环境数据副本。

import {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} from 'node:worker_threads';

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(new URL(import.meta.url));
} else {
  console.log(getEnvironmentData('Hello'));  // Prints 'World!'.
}'use strict';

const {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} = require('node:worker_threads');

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(__filename);
} else {
  console.log(getEnvironmentData('Hello'));  // Prints 'World!'.
}

worker.isInternalThread#

如果此代码在内部 Worker 线程(例如加载器线程)中运行,则为 true

node --experimental-loader ./loader.js main.js 
// loader.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // true// loader.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // true
// main.js
import { isInternalThread } from 'node:worker_threads';
console.log(isInternalThread);  // false// main.js
'use strict';

const { isInternalThread } = require('node:worker_threads');
console.log(isInternalThread);  // false

worker.isMainThread#

如果此代码未在 Worker 线程中运行,则为 true

import { Worker, isMainThread } from 'node:worker_threads';

if (isMainThread) {
  // This re-loads the current file inside a Worker instance.
  new Worker(new URL(import.meta.url));
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Prints 'false'.
}'use strict';

const { Worker, isMainThread } = require('node:worker_threads');

if (isMainThread) {
  // This re-loads the current file inside a Worker instance.
  new Worker(__filename);
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // Prints 'false'.
}

worker.markAsUntransferable(object)#

  • object <any> 任何任意的 JavaScript 值。

将对象标记为不可传输。 如果 object 出现在 port.postMessage() 调用的传输列表中,则会抛出错误。 如果 object 是原始值,则此操作无效。

特别是,这对于可以克隆而不是传输的对象,以及发送方其他对象使用的对象来说是有意义的。 例如,Node.js 使用此方法标记它用于其 BufferArrayBuffer

此操作无法撤消。

import { MessageChannel, markAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // This will throw an error, because pooledBuffer is not transferable.
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // error.name === 'DataCloneError'
}

// The following line prints the contents of typedArray1 -- it still owns
// its memory and has not been transferred. Without
// `markAsUntransferable()`, this would print an empty Uint8Array and the
// postMessage call would have succeeded.
// typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);'use strict';

const { MessageChannel, markAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // This will throw an error, because pooledBuffer is not transferable.
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // error.name === 'DataCloneError'
}

// The following line prints the contents of typedArray1 -- it still owns
// its memory and has not been transferred. Without
// `markAsUntransferable()`, this would print an empty Uint8Array and the
// postMessage call would have succeeded.
// typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);

浏览器中没有与此 API 等效的 API。

worker.isMarkedAsUntransferable(object)#

检查对象是否使用 markAsUntransferable() 标记为不可传输。

import { markAsUntransferable, isMarkedAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // Returns true.'use strict';

const { markAsUntransferable, isMarkedAsUntransferable } = require('node:worker_threads');

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // Returns true.

浏览器中没有与此 API 等效的 API。

worker.markAsUncloneable(object)#

  • object <any> 任何任意的 JavaScript 值。

将对象标记为不可克隆。 如果 objectport.postMessage() 调用中用作 message,则会抛出错误。 如果 object 是原始值,则此操作无效。

这对 ArrayBuffer 或任何类似 Buffer 的对象没有影响。

此操作无法撤消。

import { markAsUncloneable } from 'node:worker_threads';

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // This will throw an error, because anyObject is not cloneable.
  port1.postMessage(anyObject);
} catch (error) {
  // error.name === 'DataCloneError'
}'use strict';

const { markAsUncloneable } = require('node:worker_threads');

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // This will throw an error, because anyObject is not cloneable.
  port1.postMessage(anyObject);
} catch (error) {
  // error.name === 'DataCloneError'
}

浏览器中没有与此 API 等效的 API。

worker.moveMessagePortToContext(port, contextifiedSandbox)#

MessagePort 传输到不同的 vm 上下文。 原始的 port 对象变为不可用,并且返回的 MessagePort 实例取代它。

返回的 MessagePort 是目标上下文中的一个对象,并继承自它的全局 Object 类。 传递给 port.onmessage() 监听器的对象也在目标上下文中创建,并继承自它的全局 Object 类。

但是,创建的 MessagePort 不再继承自 <EventTarget>,并且只能使用 port.onmessage() 来接收事件。

worker.parentPort#

如果此线程是 Worker,则这是一个 MessagePort,允许与父线程进行通信。 使用 parentPort.postMessage() 发送的消息在使用 worker.on('message') 的父线程中可用,并且从父线程使用 worker.postMessage() 发送的消息在使用 parentPort.on('message') 的此线程中可用。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  worker.once('message', (message) => {
    console.log(message);  // Prints 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // When a message from the parent thread is received, send it back:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    console.log(message);  // Prints 'Hello, world!'.
  });
  worker.postMessage('Hello, world!');
} else {
  // When a message from the parent thread is received, send it back:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}

worker.postMessageToThread(threadId, value[, transferList][, timeout])#

稳定性: 1.1 - 积极开发

向另一个工作线程发送一个值,该工作线程由其线程 ID 标识。

如果目标线程没有 workerMessage 事件的监听器,则该操作将抛出 ERR_WORKER_MESSAGING_FAILED 错误。

如果目标线程在处理 workerMessage 事件时抛出一个错误,则该操作将抛出一个 ERR_WORKER_MESSAGING_ERRORED 错误。

当目标线程不是当前线程的直接父级或子级时,应使用此方法。 如果两个线程是父子关系,请使用 require('node:worker_threads').parentPort.postMessage()worker.postMessage() 以便线程进行通信。

下面的示例展示了 postMessageToThread 的用法:它创建了 10 个嵌套的线程,最后一个线程将尝试与主线程通信。

import process from 'node:process';
import {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(new URL(import.meta.url), {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;'use strict';

const process = require('node:process');
const {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} = require('node:worker_threads');

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(__filename, {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;

worker.receiveMessageOnPort(port)#

从给定的 MessagePort 接收单个消息。 如果没有可用的消息,则返回 undefined,否则返回一个具有单个 message 属性的对象,该属性包含消息有效负载,对应于 MessagePort 队列中最旧的消息。

import { MessageChannel, receiveMessageOnPort } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined'use strict';

const { MessageChannel, receiveMessageOnPort } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

使用此函数时,不会发出 'message' 事件,也不会调用 onmessage 监听器。

worker.resourceLimits#

提供此 Worker 线程内的 JS 引擎资源约束集。 如果将 resourceLimits 选项传递给 Worker 构造函数,则此值与其值匹配。

如果在主线程中使用此方法,则其值为空对象。

worker.SHARE_ENV#

一个特殊的值,可以作为 Worker 构造函数的 env 选项传递,以指示当前线程和 Worker 线程应共享对同一组环境变量的读写访问权限。

import process from 'node:process';
import { Worker, SHARE_ENV } from 'node:worker_threads';
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });'use strict';

const { Worker, SHARE_ENV } = require('node:worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .on('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.
  });

worker.setEnvironmentData(key[, value])#

  • key <any> 可以用作 <Map> 键的任何任意的、可克隆的 JavaScript 值。
  • value <any> 任何任意的、可克隆的 JavaScript 值,这些值将被克隆并自动传递给所有新的 Worker 实例。 如果将 value 作为 undefined 传递,则将删除先前为 key 设置的任何值。

worker.setEnvironmentData() API 设置当前线程和从当前上下文生成的所有新 Worker 实例中 worker.getEnvironmentData() 的内容。

worker.threadId#

当前线程的整数标识符。 在相应的工作线程对象上(如果有),它可用作 worker.threadId。 此值对于单个进程中的每个 Worker 实例都是唯一的。

worker.workerData#

一个任意的 JavaScript 值,其中包含传递给此线程的 Worker 构造函数的数据的克隆。

数据被克隆,就像使用 postMessage() 一样,根据 HTML 结构化克隆算法

import { Worker, isMainThread, workerData } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url), { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}'use strict';

const { Worker, isMainThread, workerData } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // Prints 'Hello, world!'.
}

类: BroadcastChannel extends EventTarget#

BroadcastChannel 的实例允许与绑定到同一通道名称的所有其他 BroadcastChannel 实例进行异步的一对多通信。

import {
  isMainThread,
  BroadcastChannel,
  Worker,
} from 'node:worker_threads';

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(new URL(import.meta.url));
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}'use strict';

const {
  isMainThread,
  BroadcastChannel,
  Worker,
} = require('node:worker_threads');

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(__filename);
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}

new BroadcastChannel(name)#

  • name <any> 要连接到的通道的名称。 允许使用任何可以使用 `${name}` 转换为字符串的 JavaScript 值。

broadcastChannel.close()#

关闭 BroadcastChannel 连接。

broadcastChannel.onmessage#

  • 类型: <Function> 接收到消息时,使用单个 MessageEvent 参数调用。

broadcastChannel.onmessageerror#

  • 类型: <Function> 当无法反序列化收到的消息时调用。

broadcastChannel.postMessage(message)#

  • message <any> 任何可克隆的 JavaScript 值。

broadcastChannel.ref()#

unref() 相反。 如果先前 unref() 编辑过的 BroadcastChannel 是唯一剩下的活动句柄(默认行为),则在其上调用 ref() 不会让程序退出。 如果端口已 ref() 编辑过,则再次调用 ref() 无效。

broadcastChannel.unref()#

在 BroadcastChannel 上调用 unref() 允许线程退出,如果这是事件系统中唯一的活动句柄。 如果 BroadcastChannel 已经 unref() 编辑过,则再次调用 unref() 无效。

类: MessageChannel#

worker.MessageChannel 类的实例表示一个异步的双向通信通道。 MessageChannel 没有自己的方法。 new MessageChannel() 产生一个具有 port1port2 属性的对象,这些属性引用链接的 MessagePort 实例。

import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener'use strict';

const { MessageChannel } = require('node:worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

类: MessagePort#

worker.MessagePort 类的实例表示异步双向通信通道的一端。 它可用于在不同的 Worker 之间传输结构化数据、内存区域和其他 MessagePort

此实现与 浏览器 MessagePort 匹配。

事件: 'close'#

一旦通道的任一侧断开连接,就会发出 'close' 事件。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

// Prints:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

事件: 'message'#

  • value <any> 传输的值

对于任何传入的消息,都会发出 'message' 事件,其中包含 port.postMessage() 的克隆输入。

此事件上的监听器接收传递给 postMessage()value 参数的克隆,不再接收其他参数。

事件: 'messageerror'#

  • error <Error> 一个 Error 对象

反序列化消息失败时,会发出 'messageerror' 事件。

目前,当在接收端实例化已发布的 JS 对象时发生错误时,会发出此事件。 这种情况很少见,但可能会发生,例如,当在 vm.Context 中接收到某些 Node.js API 对象时(其中 Node.js API 目前不可用)。

port.close()#

禁用在此连接的任一侧进一步发送消息。 当在此 MessagePort 上不再进行通信时,可以调用此方法。

在作为通道一部分的两个 MessagePort 实例上都会发出 'close' 事件

port.postMessage(value[, transferList])#

将一个 JavaScript 值发送到此通道的接收方。value 的传输方式与 HTML 结构化克隆算法兼容。

特别是,与 JSON 的显着区别在于

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);

transferList 可以是 <ArrayBuffer>MessagePortFileHandle 对象的列表。 传输后,它们在通道的发送方不再可用(即使它们未包含在 value 中)。 与 子进程不同,当前不支持传输网络套接字之类的句柄。

如果 value 包含 <SharedArrayBuffer> 实例,则可以从任一线程访问这些实例。 它们不能在 transferList 中列出。

value 仍然可以包含不在 transferList 中的 ArrayBuffer 实例; 在这种情况下,底层内存将被复制而不是移动。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);'use strict';

const { MessageChannel } = require('node:worker_threads');
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

消息对象会立即被克隆,并且可以在发布后进行修改,而不会产生副作用。

有关此 API 背后的序列化和反序列化机制的更多信息,请参阅 node:v8 模块的序列化 API

传输 TypedArrays 和 Buffers 时的注意事项#

所有 <TypedArray> | <Buffer> 实例都是底层 <ArrayBuffer> 上的视图。 也就是说,实际上存储原始数据的是 ArrayBuffer,而 TypedArrayBuffer 对象提供了一种查看和操作数据的方法。 在同一个 ArrayBuffer 实例上创建多个视图是可能的且常见的。 使用传输列表传输 ArrayBuffer 时必须格外小心,因为这样做会导致共享同一 ArrayBuffer 的所有 TypedArrayBuffer 实例变得不可用。

const ab = new ArrayBuffer(10);

const u1 = new Uint8Array(ab);
const u2 = new Uint16Array(ab);

console.log(u2.length);  // prints 5

port.postMessage(u1, [u1.buffer]);

console.log(u2.length);  // prints 0 

具体而言,对于 Buffer 实例,底层 ArrayBuffer 是否可以传输或克隆完全取决于实例的创建方式,而这通常无法可靠地确定。

可以使用 markAsUntransferable() 标记 ArrayBuffer,以指示它应始终被克隆,而永不传输。

根据 Buffer 实例的创建方式,它可能拥有也可能不拥有其底层 ArrayBuffer。 除非已知 Buffer 实例拥有 ArrayBuffer,否则不得传输它。 特别是,对于从内部 Buffer 池创建的 Buffer(例如,使用 Buffer.from()Buffer.allocUnsafe()),无法传输它们,并且它们始终被克隆,这将发送整个 Buffer 池的副本。 此行为可能会带来意外的更高内存使用率和可能的安全问题。

有关 Buffer 池的更多详细信息,请参见 Buffer.allocUnsafe()

使用 Buffer.alloc()Buffer.allocUnsafeSlow() 创建的 Buffer 实例的 ArrayBuffer 始终可以传输,但这样做会使那些 ArrayBuffer 的所有其他现有视图变得不可用。

使用原型、类和访问器克隆对象时的注意事项#

由于对象克隆使用 HTML 结构化克隆算法,因此非枚举属性、属性访问器和对象原型不会被保留。 特别是,<Buffer> 对象将在接收方被读取为纯 <Uint8Array>s,并且 JavaScript 类的实例将被克隆为纯 JavaScript 对象。

const b = Symbol('b');

class Foo {
  #a = 1;
  constructor() {
    this[b] = 2;
    this.c = 3;
  }

  get d() { return 4; }
}

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new Foo());

// Prints: { c: 3 } 

此限制扩展到许多内置对象,例如全局 URL 对象

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new URL('https://example.org'));

// Prints: { } 

port.hasRef()#

如果为 true,则 MessagePort 对象将保持 Node.js 事件循环处于活动状态。

port.ref()#

unref() 相反。 如果调用先前 unref() 过的端口上的 ref(),则如果它是唯一剩余的活动句柄(默认行为),则不会让程序退出。 如果端口已 ref(),则再次调用 ref() 无效。

如果使用 .on('message') 附加或删除侦听器,则端口会根据事件的侦听器是否存在而自动 ref()unref()

port.start()#

开始在此 MessagePort 上接收消息。 将此端口用作事件发射器时,一旦附加了 'message' 侦听器,就会自动调用此方法。

此方法的存在是为了与 Web MessagePort API 保持一致。 在 Node.js 中,它仅在没有事件侦听器时忽略消息时才有用。 Node.js 在处理 .onmessage 时也有所不同。 设置它会自动调用 .start(),但是取消设置它会使消息排队,直到设置了新的处理程序或丢弃了端口。

port.unref()#

如果端口是事件系统中唯一的活动句柄,则对端口调用 unref() 允许线程退出。 如果端口已 unref(),则再次调用 unref() 无效。

如果使用 .on('message') 附加或删除侦听器,则端口会根据事件的侦听器是否存在而自动 ref()unref()

类: Worker#

Worker 类表示一个独立的 JavaScript 执行线程。 大多数 Node.js API 都可以在其中使用。

Worker 环境内部的显着差异是

在其他 Worker 内部创建 Worker 实例是可能的。

Web Workersnode:cluster 模块类似,可以通过线程间消息传递来实现双向通信。 在内部,Worker 有一对内置的 MessagePort,它们在创建 Worker 时已彼此关联。 虽然父端上的 MessagePort 对象未直接公开,但其功能通过 worker.postMessage()父线程上 Worker 对象上的 worker.on('message') 事件公开。

要创建自定义消息传递通道(建议不要使用默认的全局通道,因为它有助于分离关注点),用户可以在任一线程上创建一个 MessageChannel 对象,并通过一个预先存在的通道(例如全局通道)将该 MessageChannel 上的 MessagePort 之一传递给另一个线程。

有关消息如何传递以及哪种 JavaScript 值可以成功通过线程屏障传输的更多信息,请参阅port.postMessage()

import assert from 'node:assert';
import {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} from 'node:worker_threads';
if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}'use strict';

const assert = require('node:assert');
const {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} = require('node:worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}

new Worker(filename[, options])#

  • filename <string> | <URL> Worker 的主脚本或模块的路径。必须是绝对路径或以 ./../ 开头的相对路径(即,相对于当前工作目录),或者是使用 file:data: 协议的 WHATWG URL 对象。当使用 data: URL 时,数据将根据 MIME 类型使用 ECMAScript 模块加载器进行解释。如果 options.evaltrue,则这是一个包含 JavaScript 代码而不是路径的字符串。
  • options <Object>
    • argv <any[]> 将被字符串化并附加到 Worker 中的 process.argv 的参数列表。这与 workerData 大致相似,但这些值在全局 process.argv 上可用,就像它们作为 CLI 选项传递给脚本一样。
    • env <Object> 如果设置,则指定 Worker 线程中 process.env 的初始值。作为一个特殊值,可以使用worker.SHARE_ENV来指定父线程和子线程应该共享它们的环境变量;在这种情况下,对一个线程的 process.env 对象的更改也会影响另一个线程。 默认: process.env
    • eval <boolean> 如果为 true 且第一个参数是 string,则将构造函数的第一个参数解释为脚本,该脚本在 Worker 上线后执行一次。
    • execArgv <string[]> 传递给 Worker 的 node CLI 选项列表。不支持 V8 选项(例如 --max-old-space-size)和影响进程的选项(例如 --title)。如果设置,这将在 Worker 中作为 process.execArgv 提供。默认情况下,选项是从父线程继承的。
    • stdin <boolean> 如果设置为 true,则 worker.stdin 提供一个可写流,其内容显示为 Worker 内部的 process.stdin。默认情况下,不提供任何数据。
    • stdout <boolean> 如果设置为 true,则 worker.stdout 不会自动通过管道传递到父进程中的 process.stdout
    • stderr <boolean> 如果设置为 true,则 worker.stderr 不会自动通过管道传递到父进程中的 process.stderr
    • workerData <any> 克隆并作为 require('node:worker_threads').workerData 提供的任何 JavaScript 值。克隆按照 HTML 结构化克隆算法进行,如果对象无法克隆(例如,因为它包含 function),则会引发错误。
    • trackUnmanagedFds <boolean> 如果设置为 true,则 Worker 会跟踪通过 fs.open()fs.close() 管理的原始文件描述符,并在 Worker 退出时关闭它们,类似于通过 FileHandle API 管理的网络套接字或文件描述符等其他资源。 此选项由所有嵌套的 Worker 自动继承。 默认: true
    • transferList <Object[]> 如果一个或多个类似于 MessagePort 的对象在 workerData 中传递,则这些项目需要 transferList,否则会抛出 ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST。 有关更多信息,请参阅port.postMessage()
    • resourceLimits <Object> 新 JS 引擎实例的可选资源限制集。达到这些限制会导致 Worker 实例终止。这些限制仅影响 JS 引擎,而不影响任何外部数据,包括 ArrayBuffer。即使设置了这些限制,如果进程遇到全局内存不足的情况,它仍然可能会中止。
      • maxOldGenerationSizeMb <number> 主堆的最大大小(以 MB 为单位)。 如果设置了命令行参数 --max-old-space-size,它将覆盖此设置。
      • maxYoungGenerationSizeMb <number> 最近创建的对象的堆空间的最大大小。 如果设置了命令行参数 --max-semi-space-size,它将覆盖此设置。
      • codeRangeSizeMb <number> 用于生成的代码的预分配内存范围的大小。
      • stackSizeMb <number> 线程的默认最大堆栈大小。 小的值可能会导致 Worker 实例不可用。 默认: 4
    • name <string> 一个可选的 name,用于替换线程名称和 Worker 标题,以用于调试/识别目的,使最终标题为 [worker ${id}] ${name}。 此参数具有允许的最大大小,具体取决于操作系统。 如果提供的名称超出限制,它将被截断
      • 最大大小
        • Windows:32,767 个字符
        • macOS:64 个字符
        • Linux:16 个字符
        • NetBSD:限制为 PTHREAD_MAX_NAMELEN_NP
        • FreeBSD 和 OpenBSD:限制为 MAXCOMLEN 默认: 'WorkerThread'

事件: 'error'#

如果 Worker 线程抛出未捕获的异常,则会发出 'error' 事件。 在这种情况下,Worker 会被终止。

事件: 'exit'#

一旦 Worker 停止,就会发出 'exit' 事件。 如果 Worker 通过调用 process.exit() 退出,则 exitCode 参数是传递的退出代码。 如果 Worker 被终止,则 exitCode 参数为 1

这是任何 Worker 实例发出的最终事件。

事件: 'message'#

  • value <any> 传输的值

当 Worker 线程调用 require('node:worker_threads').parentPort.postMessage() 时,会发出 'message' 事件。 有关更多详细信息,请参阅 port.on('message') 事件。

从 Worker 线程发送的所有消息都会在 'exit' 事件Worker 对象上发出之前发出。

事件: 'messageerror'#

  • error <Error> 一个 Error 对象

反序列化消息失败时,会发出 'messageerror' 事件。

事件: 'online'#

当 Worker 线程开始执行 JavaScript 代码时,会发出 'online' 事件。

worker.getHeapSnapshot([options])#

  • options <Object>
    • exposeInternals <boolean> 如果为 true,则在堆快照中公开内部信息。 默认: false
    • exposeNumericValues <boolean> 如果为 true,则在人工字段中公开数值。 默认: false
  • 返回值: <Promise> 一个 Readable Stream 的 Promise,其中包含 V8 堆快照

返回一个可读流,用于 Worker 当前状态的 V8 快照。 有关更多详细信息,请参阅 v8.getHeapSnapshot()

如果 Worker 线程不再运行(这可能发生在 'exit' 事件触发之前),则返回的 Promise 会立即被拒绝,并抛出一个 ERR_WORKER_NOT_RUNNING 错误。

worker.getHeapStatistics()#

此方法返回一个 Promise,它将解析为一个与 v8.getHeapStatistics() 相同的对象,如果 worker 不再运行,则会拒绝并抛出一个 ERR_WORKER_NOT_RUNNING 错误。 此方法允许从实际线程外部观察统计信息。

worker.performance#

一个可以用来查询 worker 实例的性能信息的对象。类似于 perf_hooks.performance

performance.eventLoopUtilization([utilization1[, utilization2]])#
  • utilization1 <Object> 前一次调用 eventLoopUtilization() 的结果。
  • utilization2 <Object>utilization1 之前,前一次调用 eventLoopUtilization() 的结果。
  • 返回值: <Object>

perf_hooks eventLoopUtilization() 的调用相同,不同之处在于返回的是 worker 实例的值。

一个区别是,与主线程不同,worker 中的引导(bootstrapping)是在事件循环中完成的。 因此,一旦 worker 的脚本开始执行,事件循环利用率立即可用。

一个不增加的 idle 时间并不表示 worker 卡在引导中。 以下示例显示了 worker 的整个生命周期从未积累任何 idle 时间,但仍然能够处理消息。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}'use strict';

const { Worker, isMainThread, parentPort } = require('node:worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}

worker 的事件循环利用率仅在 'online' 事件触发后才可用,如果在之前或 'exit' 事件之后调用,则所有属性的值都为 0

worker.postMessage(value[, transferList])#

向 worker 发送一条消息,该消息通过 require('node:worker_threads').parentPort.on('message') 接收。 有关更多详细信息,请参阅 port.postMessage()

worker.ref()#

unref() 相反,在一个之前 unref() 的 worker 上调用 ref() 不会使程序在它是唯一剩余的活动句柄时退出(默认行为)。 如果 worker 已被 ref(),则再次调用 ref() 不会产生任何影响。

worker.resourceLimits#

提供此 Worker 线程的 JS 引擎资源约束集。 如果将 resourceLimits 选项传递给 Worker 构造函数,则此值与其值匹配。

如果 worker 已停止,则返回值是一个空对象。

worker.stderr#

这是一个可读流,其中包含写入到 worker 线程中的 process.stderr 的数据。 如果没有将 stderr: true 传递给 Worker 构造函数,则数据会被传递到父线程的 process.stderr 流。

worker.stdin#

如果将 stdin: true 传递给 Worker 构造函数,则这是一个可写流。 写入此流的数据将在 worker 线程中作为 process.stdin 提供。

worker.stdout#

这是一个可读流,其中包含写入到 worker 线程中的 process.stdout 的数据。 如果没有将 stdout: true 传递给 Worker 构造函数,则数据会被传递到父线程的 process.stdout 流。

worker.terminate()#

尽快停止 worker 线程中的所有 JavaScript 执行。 返回一个 Promise,用于在触发 'exit' 事件时完成退出代码。

worker.threadId#

被引用线程的整数标识符。 在 worker 线程中,它作为 require('node:worker_threads').threadId 可用。 此值对于单个进程中的每个 Worker 实例是唯一的。

worker.unref()#

在一个 worker 上调用 unref() 允许线程退出,如果这是事件系统中唯一的活动句柄。 如果 worker 已经被 unref(),则再次调用 unref() 不会产生任何影响。

说明#

stdio 的同步阻塞#

Worker 使用通过 <MessagePort> 的消息传递来实现与 stdio 的交互。 这意味着来自 Workerstdio 输出可能会被接收端阻塞 Node.js 事件循环的同步代码阻塞。

import {
  Worker,
  isMainThread,
} from 'node:worker_threads';

if (isMainThread) {
  new Worker(new URL(import.meta.url));
  for (let n = 0; n < 1e10; n++) {
    // Looping to simulate work.
  }
} else {
  // This output will be blocked by the for loop in the main thread.
  console.log('foo');
}'use strict';

const {
  Worker,
  isMainThread,
} = require('node:worker_threads');

if (isMainThread) {
  new Worker(__filename);
  for (let n = 0; n < 1e10; n++) {
    // Looping to simulate work.
  }
} else {
  // This output will be blocked by the for loop in the main thread.
  console.log('foo');
}

从预加载脚本启动 worker 线程#

从预加载脚本(使用 -r 命令行标志加载和运行的脚本)启动 worker 线程时要小心。 除非显式设置 execArgv 选项,否则新的 Worker 线程会自动继承运行进程中的命令行标志,并将预加载与主线程相同的预加载脚本。 如果预加载脚本无条件地启动一个 worker 线程,则每个生成的线程都会生成另一个线程,直到应用程序崩溃。