2019年1月18日 星期五

node.js 進程池

const cp = require('child_process')

function doWork(job, callback) {
    // TODO worker 是 工作进程,下一步进行实现
    let child = cp.fork('./worker')
    // 用来跟踪回调的执行状态,防止出现多次回调调用
    let cbTriggered = false;

    child.once('error', function (err) {
        if (!cbTriggered) {
            callback(err)
            cbTriggered = true;
        }
        // 异常错误时,杀死子进程
        child.kill();
    })
        .once('exit', function (code, signal) {
            if (!cbTriggered) {
                callback(new Error('Child exited with code: ' + code));
            }
        })
        .once('message', function (result) {
            callback(null, result)
            cbTriggered = true;
        })
        .send(job);
}

//--------------------------------------
// pooler.js

const cp = require('child_process');
// 取到cpu 数量
const cpus = require('os').cpus().length;

module.exports = function (workModule) {

    // 当没有空闲的cpu时,用来存放任务队列
    let awaiting = [];
    // 存放准备就绪的工作进程
    let readyPool = [];
    // 存放现有的工作者进程的数量
    let poolSize = 0;

    return function doWork(job, callback) {

        // 如果没有空闲的工作进程并且工作进程达到了限制,就把后续任务放入队列等待执行
        if (!readyPool.length && poolSize > cpus) {
            return awaiting.push([doWork, job, callback])
        }

        // 获取下一个可用的子进程或者fork一个新的进程并增加池子大小
        let child = readyPool.length
            ? readyPool.shift()
            : (poolSize++ , cp.fork(workModule));

        let cbTriggered = false;

        // 移除子进程上的所有监听,保证每一个子进程每次只有一个监听
        child
            .removeAllListeners()
            .once('error', function (err) {
                if (!cbTriggered) {
                    callback(err)
                    cbTriggered = true;
                }
                child.kill();
            })
            .once('exit', function (code, signal) {

                if (!cbTriggered) {
                    callback(new Error('Child exited with code: ' + code));
                }

                poolSize--;
                // 如果子进程由于某种原因退出,保证在readyPool中也被移除了
                let childIdx = readyPool.indexOf(child);
                if (childIdx > -1) {
                    readyPool.splice(childIdx, 1);
                }

            })
            .once('message', function (result) {
                callback(null, result)

                cbTriggered = true;

                // 把空闲的子进程再次放入readyPool,等待接下来的任务
                readyPool.push(child);
                if (awaiting.length) {
                    setImmediate.apply(null, awaiting.shift());
                }
            })
            .send(job);
    }
}