javascript - kue for node.js 的独特工作

标签 javascript node.js parallel-processing

如果系统中已有相同的作业,我希望 jobs.create 失败。有什么办法可以做到这一点?

我需要每 24 小时运行一次相同的作业,但有些作业可能需要超过 24 小时,因此在添加之前我需要确保该作业不在系统中(事件、排队或失败)

更新: 好的,我将简化问题以便能够在这里进行解释。 至少我有一个分析服务,我必须每天向我的用户发送一次报告。完成这些报告有时(只是少数情况,但有可能)需要几个小时甚至超过一天。

我需要一种方法来了解哪些是当前正在运行的作业,以避免重复的作业。我无法在 ´´´´kue´´´´ API 中找到任何信息来了解当前正在运行的作业。此外,我需要在需要更多作业时触发某种事件,然后调用我的 getMoreJobs 生产者。

也许我的方法是错误的,如果是这样请告诉我一个更好的方法来解决我的问题。

这是我的简化代码:

var kue = require('kue'),   
    cluster = require('cluster'),
    numCPUs = require('os').cpus().length;

numCPUs = CONFIG.sync.workers || numCPUs; 

var jobs = kue.createQueue();

if (cluster.isMaster) {
    console.log('Starting master pid:' + process.pid);
    jobs.on('job complete', function(id){
    kue.Job.get(id, function(err, job){
        if (err || !job) return;
        job.remove(function(err){
            if (err) throw err;
            console.log('removed completed job #%d', job.id);
        });
    });

    function getMoreJobs() {
        console.log('looking for more jobs...');
        getOutdateReports(function (err, reports) {
            if (err) return setTimeout(getMoreJobs, 5 * 60 * 60 * 1000);

            reports.forEach(function(report) {
                jobs.create('reports', {
                    id: report.id,
                    title: report.name,
                    params: report.params
                }).attempts(5).save();
            });

            setTimeout(getMoreJobs, 60 * 60 * 1000);
        });
    }

    //Create the jobs
    getMoreJobs();

    console.log('Starting ', numCPUs, ' workers');
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('death', function(worker) {
        console.log('worker pid:' + worker.pid + ' died!'.bold.red);
    });

} else {
    //Process the jobs
    console.log('Starting worker pid:' + process.pid);
    jobs.process('reports', 20, function(job, done){
        //completing my work here
        veryHardWorkGeneratingReports(function(err) {
            if (err) return done(err);
            return done();
        });
    });
}

最佳答案

您的其中一个问题的答案是,Kue 将它从 redis 队列中弹出的作业置于“事件”状态,除非您寻找它们,否则您永远不会得到它们。

另一个问题的答案是你的分布式工作队列是消费者,而不是任务的生产者。像你一样混合它们是可以的,但是,这是一个困惑的范例。我对 Kue 所做的是为 kue 的 json api 制作一个包装器,以便可以从系统中的任何位置将作业放入队列中。由于您似乎需要将工作铲进去,我建议编写一个单独的生产者应用程序,它除了获取外部工作​​并将它们放入您的 Kue 工作队列外什么都不做。它可以监控工作队列,了解作业何时运行不足并加载一批,或者,我会做的是让它尽可能快地铲除作业,并假脱机使用消费者应用程序的多个实例来处理负载更快。

重申一下:您的关注点分离在这里不是很好。您应该有一个完全独立于您的任务消费者应用程序的任务生产者。这为您提供了更大的灵 active 、易于扩展(只需在另一台机器上启动另一个消费者,您就可以扩展了!)和整体代码管理的简便性。如果可能的话,您还应该允许任何给您这些任务的人访问您的 Kue 服务器的 JSON api,而不是出去寻找它们。工作生产者可以使用 Kue 安排自己的任务。

关于javascript - kue for node.js 的独特工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9032511/

相关文章:

javascript - Ramda - 部分应用的功能取决于完整的应用

javascript - 使用 ng-repeat (AngularJS) 返回属性列表

node.js - 使用 JWT(Node.js + mongoose)在 REST API 中管理用户权限的最佳方法

node.js - AWS amplify google 使用 React 登录 1 小时后不会自动刷新 token ?

algorithm - 将不平衡树转换为生成树

javascript - MongoDB 多重排序属性 : How is precedence determined?

javascript - jquery 数据表根据列名突出显示单元格

javascript - 上传文件 Node.js Express put 方法

java - 将显式和隐式并行与 java-8 流混合

scala - 我们可以在 Scala 中定义一组 DSL 操作,它们可以像在 Linux 中使用管道处理一样并行执行吗?