这篇文章将为大家详细讲解有关Node.js中多进程模型的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Cluster 模块
Node.js 提供了 Cluster 模块解决上述问题,通过该模块,开发者可以通过创建子进程的模式创建一个集群,充分利用机器或容器的资源,同时该模块允许多个子进程监听同一个端口。
示例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
通过代码解析创建子进程的过程
首先从 const cluster = require('cluster')
说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码:
'use strict';
const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrPrimary}`);
不同的文件意味着两种进程在执行中的表现也不一样,例如:
// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;
// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;
这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程
主进程
在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内
// lib/internal/cluster/master.js
// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR; // Master distributes connections.
// 自增的子进程id
let ids = 0;
// 向cluster添加fork方法
cluster.fork = function(env) {
// 初始化cluster.settings
cluster.setupMaster();
// 为当前fork的子进程生成当前cluster内的唯一id
const id = ++ids;
// 创建子进程
const workerProcess = createWorkerProcess(id, env);
// 创建对应的worker实例
const worker = new Worker({
id: id,
process: workerProcess
});
// 省略一些worker的事件监听....
// 监听内部消息事件,并交由onmessage处理
worker.process.on('internalMessage', internal(worker, onmessage));
// cluster发出fork事件
process.nextTick(emitForkNT, worker);
// 将worker实例放在cluster.workers中维护
cluster.workers[worker.id] = worker;
// 返回worker
return worker;
};
// 创建子进程函数
function createWorkerProcess(id, env) {
// 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
// 执行参数
const execArgv = [...cluster.settings.execArgv];
// 省略debug模式相关逻辑...
// 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
// 内部消息事件处理函数
function onmessage(message, handle) {
const worker = this;
if (message.act === 'online')
online(worker);
// 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
else if (message.act === 'queryServer')
queryServer(worker, message);
else if (message.act === 'listening')
listening(worker, message);
else if (message.act === 'exitedAfterDisconnect')
exitedAfterDisconnect(worker, message);
else if (message.act === 'close')
close(worker, message);
}
// 获取server
function queryServer(worker, message) {
// Stop processing if worker already disconnecting
if (worker.exitedAfterDisconnect)
return;
// 创建当前子进程监听地址信息的key
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
// 在handles map中查询是否有已经创建好的该监听地址的server
let handle = handles.get(key);
// 没有对应的server则进行创建
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (message.port < 0 && typeof address === 'string' &&
process.platform !== 'win32') {
address = path.relative(process.cwd(), address);
if (message.address.length < address.length)
address = message.address;
}
// 主、子进程处理连接的方式,默认为轮询
let constructor = RoundRobinHandle;
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
// 将监听地址信息传入构造函数创建监听实例
handle = new constructor(key, address, message);
// 缓存监听实例
handles.set(key, handle);
}
// 向server添加自定义信息,用于server发出listening事件后透传到worker
if (!handle.data)
handle.data = message.data;
// 添加server发出listening事件后的回调函数通知子进程
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
// lib/internal/cluster/round_robin_handle.js
// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
// 初始化handle
this.key = key;
this.all = new SafeMap();
this.free = new SafeMap();
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
// 监听文件描述符,不讨论
if (fd >= 0)
this.server.listen({ fd });
// 监听ip:port
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
// 监听UNIX socket,不讨论
} else
this.server.listen(address); // UNIX socket path.
// 注册server发出listening事件的回调函数
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
send(err.errno, null);
});
};
// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
const existed = this.all.delete(worker.id);
if (!existed)
return false;
this.free.delete(worker.id);
if (this.all.size !== 0)
return false;
for (const handle of this.handles) {
handle.close();
}
this.handles = [];
this.handle.close();
this.handle = null;
return true;
};
// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
ArrayPrototypePush(this.handles, handle);
const [ workerEntry ] = this.free; // this.free is a SafeMap
if (ArrayIsArray(workerEntry)) {
const { 0: workerId, 1: worker } = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
}
};
// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}
const handle = ArrayPrototypeShift(this.handles);
if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}
// 向该worker发出newconn事件
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};
子进程
在每个子进程中,我们都创建了一个 HTTP Server,然后执行 listen
函数监听 8000 端口,而 HTTP Server 实例是由 Net Server 原型链继承得到的,listen
函数即为 Net Server 原型上的 listen
函数,具体如下:
// lib/_http_server.js
function Server(options, requestListener) {
....
}
ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js
Server.prototype.listen = function(...args) {
// 由于篇幅原因,省略一些参数nomolize和其他监听的处理
// 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
if (typeof options.port === 'number' || typeof options.port === 'string') {
validatePort(options.port, 'options.port');
backlog = options.backlog || backlogFromArgs;
// start TCP server listening on host:port
if (options.host) {
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive, flags);
} else { // Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for master server
listenInCluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
}
return this;
}
// 省略...
};
// 集群监听函数
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive, flags) {
exclusive = !!exclusive;
if (cluster === undefined) cluster = require('cluster');
// 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
if (cluster.isMaster || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
// 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
// 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse master's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (this._handle) {
debug('setupListenHandle: have a handle already');
} else {
debug('setupListenHandle: create a handle');
let rval = null;
// 篇幅原因,创建监听句柄的代码...
this._handle = rval;
}
// 在this上设置的faux handle上设置onconnection函数用于监听连接进入
this._handle.onconnection = onconnection;
}
同时,在开始解析的时候我们说过,在引入 Cluster 模块的时候,会根据当前进程的env中是否包含NODE_UNIQUE_ID去判断是否为子进程,若为子进程,则执行 child.js
文件
Tips:IPC 通信中发送的message.cmd的值如果以NODE为前缀,它将响应一个内部事件internalMessage
// lib/internal/cluster/child.js
// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;
// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
// 初始化worker实例
const worker = new Worker({
id: +process.env.NODE_UNIQUE_ID | 0,
process: process,
state: 'online'
});
cluster.worker = worker;
// 处理断开连接事件
process.once('disconnect', () => {
worker.emit('disconnect');
if (!worker.exitedAfterDisconnect) {
// Unexpected disconnect, master exited, or some such nastiness, so
// worker exits immediately.
process.exit(0);
}
});
// IPC 内部通信事件监听
process.on('internalMessage', internal(worker, onmessage));
send({ act: 'online' });
function onmessage(message, handle) {
// 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
ReflectApply(_disconnect, worker, [true]);
}
};
// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
let address = options.address;
// Resolve unix socket paths to absolute paths
if (options.port < 0 && typeof address === 'string' &&
process.platform !== 'win32')
address = path.resolve(address);
// 生成地址信息的的key
const indexesKey = ArrayPrototypeJoin(
[
address,
options.port,
options.addressType,
options.fd,
], ':');
// 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
let index = indexes.get(indexesKey);
if (index === undefined)
index = 0;
else
index++;
// 设置 indexesKey 与 index的对应关系
indexes.set(indexesKey, index);
// 传递地址信息及index
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData)
message.data = obj._getServerData();
// 向主进程发送queryServer消息
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
// 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
// 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = (address && address.port) || options.port;
send(message);
});
};
// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
let key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
function getsockname(out) {
if (key)
ObjectAssign(out, message.sockname);
return 0;
}
// 创建Faux handle
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles.has(key) === false);
// 保存faux handle
handles.set(key, handle);
// 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
cb(0, handle);
}
// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
// 获取faux handle的key
const key = message.key;
// 获取faux hadle
const server = handles.get(key);
const accepted = server !== undefined;
send({ ack: message.seq, accepted });
// 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
if (accepted)
server.onconnection(0, handle);
}
至此,所有的内容都联系起来了。
为什么多个子进程可以监听同一个端口
在之前的代码分析中我们可以知道,Cluster 集群会在 Master 进程中创建 Net Server,在 Worker 进程运行创建 HTTP Server 的时候,会将监听地址的信息传入 cluster._getServer
函数创建一个 faux handle
并设置到子进程的 Net Server 上,在 Worker 进程初始化的时候会注册 IPC 通信回调函数,在回调函数内 ,调用在子进程中 Net Server 模块初始化后的 {faux handle}.onconnection
函数,并将传过来的连接的 handle 传入完成请求响应。
如何保证集群工作的健壮性
我们可以在 Master 进程中监听 Worker 进程的 error
、disconntect
、exit
事件,在这些事件中去做对应的处理,例如清理退出的进程并重新 fork
,或者使用已经封装好的 npm 包,例如 cfork
Egg.js 多进程模型
在 Egg.js 的多进程模型中,多了另外一个进程类型,即 Agent 进程,该进程主要用于处理多进程不好处理的一些事情还有减少长链接的数量,具体关系如下:
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
在 egg-cluster
包内,使用了 cfork
包去保证 Worker 进程挂掉后自动重启
问题记录
在我们的一个 Egg 应用内,日志系统并没有使用 Egg 原生的日志,使用了一个内部基于
log4js
包的日志库,在使用的时候,将需要用到的 Logger 扩展至 Application 对象上,这样的话每个 Worker 进程在初始化的时候都会创建新的 Logger,也就是会存在多进程写日志的问题,但是并没有出现多进程写日志的错误问题
在追踪源码的过程中发现,log4js
虽然提供了 Cluster 模式,但是在上层封装中并没有开启 log4js
的 Cluster 模式,所以每个 Logger 的 appender 都使用 flag a
打开一个写入流,到这里并没有得到答案
后来在 CNode 中找到了答案,在 unix 下使用 flag a
打开的可写流对应的 libuv 文件池实现是 UV_FS_O_APPEND
,即 O_APPEND
,而 O_APPEND
本身在 man 手册里就定义为原子操作,内核保证了对这个可写流的并发写是安全的不需要在应用层额外加锁(除了在 NFS 类的文件系统上并发写会造成文件信息丢失或者损坏),NFS 类的网络挂载的文件系统主要是靠模拟掉底层的 api 来实现的类本地操作,显然无法在竞争条件下完美还原这类的原子操作 api,所以如果你的日志要写到类似 oss 云盘挂载本地的这种就不能这么干,多进程写的话必须在应用层自己手动加锁
关于“Node.js中多进程模型的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。