node.js - 作业完成时的 Kue 回调

标签 node.js callback kue

我的主 Node 实例派生了一个工作进程,该进程通过 IPC 接受消息(使用内置 Node process.send()process.on('message'... ),这些对象包含有关要添加到 Kue 的新作业的信息。然后它会处理这些作业。

我的主 Node 实例调用如下内容:

worker.send({jobType:'filesystem', operation: 'delete', path: fileDir});

worker 实例执行如下操作:

jobs.create(message.jobType, message).save();

jobs.process('filesystem', function(job, done) {
    fs.delete(job.data.path, function(err) {
        done(err);
    });
});

并且作业成功完成。

当作业完成时,如何在我的主 Node 实例中获得类似回调的功能?如何从工作实例返回一些结果到主 Node 实例?

最佳答案

我相信我解决了这个问题,但我会保留这个 Unresolved 问题,以防有人可以改进我的解决方案或提供更好的解决方案。

当您使用 Kue 在单独的进程中处理作业时,您不能简单地在作业完成时执行回调。这是两个进程之间通信的示例。我本来希望使用 Kue 自动提供每个作业的 id(我相信它与 Redis 中收到的 id 相同),但 app.js 需要在将作业发送给工作人员之前知道作业的 id,以便它收到消息时可以匹配id。

app.js

var child = require('child_process');
var async = require('async');

var worker = child.fork("./worker.js");

//When a message is received, search activeJobs for it, call finished callback, and delete the job
worker.on('message', function(m) {
    for(var i = 0; i < activeJobs.length; i++) {
        if(m.jobId == activeJobs[i].jobId) {
            activeJobs[i].finished(m.err, m.results);
            activeJobs.splice(i,1);
            break;
        }
    }
});

// local job system
var newJobId = 0;
var activeJobs = [];

function Job(input, callback) {
    this.jobId = newJobId;
    input.jobId = newJobId;
    newJobId++;
    activeJobs.push(this);

    worker.send(input);

    this.finished = function(err, results) {
        callback(err, results);
    }
}


var deleteIt = function(req, res) {
    async.series([
        function(callback) {
            // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed
            // and requires a callback (because of async.series)
            new Job({
                jobType:'filesystem',
                title:'delete project directory',
                operation: 'delete',
                path: '/deleteMe'
            }, function(err) {
                callback(err);
            });
        },
        //Delete it from the database
        function(callback) {
            someObject.remove(function(err) {
                callback(err);
            });
        },
    ],
    function(err) {
        if(err) console.log(err);
    });
};

worker.js

var kue = require('kue');
var fs = require('fs-extra');

var jobs = kue.createQueue();

//Jobs that are sent arrive here
process.on('message', function(message) {
    if(message.jobType) {
        var job = jobs.create(message.jobType, message).save();
    } else {
        console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red);
    }
});

jobs.process('filesystem', function(job, done) {

    if(job.data.operation == 'delete') {
        fs.delete(job.data.path, function(err) {
            notifyFinished(job.data.jobId, err);
            done(err);
        });
    }
});

function notifyFinished(id, error, results) {
    process.send({jobId: id, status: 'finished', error: error, results: results});
}

https://gist.github.com/winduptoy/4991718

关于node.js - 作业完成时的 Kue 回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14950777/

相关文章:

node.js - 如何运行 pm2 以便其他服务器用户能够访问该进程?

node.js - 如何超时终止 docker 容器?

javascript - 使用服务器端渲染 (SSR) 的 React 应用程序中的 Brotli

javascript - 为什么我不能在屏幕上或使用 Forever 运行 node.js 和 socket.io?

python - 使用 lambda 在回调中设置属性

javascript - 我可以用 kue/node.js 持久化数据吗

javascript - Nodejs 生成器异步回调 - slack unity 云构建

ios - 仅使用 inNumberFrames 信息在 AudioBufferList 中分配缓冲区 iOS

node.js - 如何管理nodejs中Redis的kue模块中的TTL超出错误?

node.js - 如何让客户端下载一个非常大的动态生成的文件