const cp = require('child_process')
function doWork(job, callback) {
// TODO worker 是 工作进程,下一步进行实现
let child = cp.fork('./worker')
// 用来跟踪回调的执行状态,防止出现多次回调调用
let cbTriggered = false;
child.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
// 异常错误时,杀死子进程
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
})
.send(job);
}
//--------------------------------------
// pooler.js
const cp = require('child_process');
// 取到cpu 数量
const cpus = require('os').cpus().length;
module.exports = function (workModule) {
// 当没有空闲的cpu时,用来存放任务队列
let awaiting = [];
// 存放准备就绪的工作进程
let readyPool = [];
// 存放现有的工作者进程的数量
let poolSize = 0;
return function doWork(job, callback) {
// 如果没有空闲的工作进程并且工作进程达到了限制,就把后续任务放入队列等待执行
if (!readyPool.length && poolSize > cpus) {
return awaiting.push([doWork, job, callback])
}
// 获取下一个可用的子进程或者fork一个新的进程并增加池子大小
let child = readyPool.length
? readyPool.shift()
: (poolSize++ , cp.fork(workModule));
let cbTriggered = false;
// 移除子进程上的所有监听,保证每一个子进程每次只有一个监听
child
.removeAllListeners()
.once('error', function (err) {
if (!cbTriggered) {
callback(err)
cbTriggered = true;
}
child.kill();
})
.once('exit', function (code, signal) {
if (!cbTriggered) {
callback(new Error('Child exited with code: ' + code));
}
poolSize--;
// 如果子进程由于某种原因退出,保证在readyPool中也被移除了
let childIdx = readyPool.indexOf(child);
if (childIdx > -1) {
readyPool.splice(childIdx, 1);
}
})
.once('message', function (result) {
callback(null, result)
cbTriggered = true;
// 把空闲的子进程再次放入readyPool,等待接下来的任务
readyPool.push(child);
if (awaiting.length) {
setImmediate.apply(null, awaiting.shift());
}
})
.send(job);
}
}