node.js - 如何管理nodejs中Redis的kue模块中的TTL超出错误?

标签 node.js redis task-queue kue

我正在开发 NodeJS 应用程序,在该应用程序中,我使用 Redis 的 kue 模块来管理队列以执行任务。

出现错误“超过 TTL”。由于这个错误,整个 redis 任务队列都被阻塞了,它可能不会自动启动队列,也不允许执行队列中的其他待处理任务。

根据 Kue 文档:

Job producers can set an expiry value for the time their job can live in active state, so that if workers didn't reply in timely fashion, Kue will fail it with TTL exceeded error message preventing that job from being stuck in active state and spoiling concurrency.

我不知道如何处理这种情况。我发布了我的代码,我需要帮助来解决问题。

"use strict";
const redisConst = require('../../constants/' + process.env.NODE_ENV + '.json').redis;
const DataConsolidationController = require('../../api/data-consolidation/controller/data-consolidation-controller');
const ContactController = require('../../api/contact/controller/contact-controller');
const GmailController = require('../../api/email/gmail/gmail-controller');
const FileUploadController = require('../../api/file-upload/controller/file-upload-controller');
var fs = require('fs');
var kue = require('kue');
/*
* If you have a huge concurrency in uncompleted jobs,
* turn this feature off and use queue level events for better memory scaling.
*/
var queue = kue.createQueue({
    prefix: 'qt',
    redis: redisConst,
    jobEvents: false,
    removeOnComplete: true
});
var job;
var _io;
var concurrency = 5;

/* queue setting */
queue.on('ready', () => {
    console.info('Queue is ready!');
});

// A job get executed
queue.on('job enqueue', (id, type) => {
    console.log('Job %s got queued', id);
});

// A job get removed
queue.on('job complete', (id, result) => {
    kue.Job.get(id, (err, job) => {
        if (err)
            return;
        job.remove((err) => {
            if (err)
                throw err;
            console.log('Removed completed job #%d', job.id);
        });
    });
});

queue.on("job process", (id, result) => {
    kue.Job.get(id, (err, job) => {
        if (err)
            return;
        console.log("job process is done", job.id);
    });
})

queue.on('error', (err) => {
    // handle connection errors here
    console.error('There was an error in the main queue!');
    console.error(err);
    console.error(err.stack);
});

queue.watchStuckJobs();

process.once('SIGTERM', (sig) => {
    queue.shutdown(5000, (err) => {
        console.log('Kue shutdown: ', err || '');
        process.exit(0);
    });
});


/* workers */
queue.process('import', concurrency, (job, done) => {
    switch (job.data.type) {
        // File upload import
        case 'file-upload-import':
            FileUploadController.csvUploadWithQueueTechifyNew(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
                //FileUploadController.csvUploadWithQueue(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
                .then(result => {
                    fs.unlinkSync(job.data.filePath);
                    done();
                })
                .then(result => {
                    ContactController.recommendationEngine(null, job.data.userId);
                    done();
                })
                .catch(err => {
                    console.log(err);
                    done(err);
                });
            break;
    }
});

function findJobCount() {
    queue.activeCount((err, count) => {
        if (!err)
            console.log('**** Active: ', count);
    });
    queue.inactiveCount((err, count) => {
        if (!err)
            console.log('**** Inactive:', count);
    });
}

module.exports = class QueueController {
    static init(io) {
        _io = io;
    }

    /* producers */
    static createJob(name, data) {
        if (data.type === 'import-salesforce-data') {
            job = queue.create(name, data)
                .delay(1000)
                .ttl(600000)
                .attempts(1)
                .backoff(true)
                .removeOnComplete(true)
                .save((err) => {
                    if (err) {
                        console.error(err);
                        done(err);
                    } else if (!err) {
                        done();
                    }
                });
        } else {
            job = queue.create(name, data)
                .delay(1000)
                .ttl(120000)
                .attempts(1)
                .backoff(true)
                .removeOnComplete(true);
        }

        job
            .on('start', () => {
                console.log('Job', job.id, 'is now running');
                findJobCount();
            })
            .on('complete', () => {
                console.log('Job', job.id, 'is done');
                findJobCount();
            })
            .on('failed', () => {
                console.log('Job', job.id, 'has failed');
                job.remove();
                findJobCount();
            })
            .on("progress", () => {
                console.log("job", job.id, "is progressing");
            });

        job.save((err, result) => {
            if (err) {
                console.log('Error in adding Job: ' + err);
            } else {
                console.log("Job saved");
            }
        });
    }
}

最佳答案

超过 TTL 表示您的工作未在 TTL 内完成。 请检查为什么您的任务没有在 TTL 内完成??
根据我对您的代码的理解 - 在 queue.process 中,所有 data.type 都没有被处理,因此 done 永远不会被调用并且作业长时间保持事件状态并最终 ttl 过期。

此外,您还有 5 分钟和 10 分钟的 ttl,因此每个工作会保持 5 或 10 分钟的事件状态,因此不会长时间给其他工作机会。 尽可能减少 TTL。

因为您的并发数为 5 意味着您的所有 5 个并发作业都处于事件和卡住状态,因此无法安排其他作业。 尽可能提高并发性。

您还可以使用 kue-ui-express 获取作业及其状态的 GUI 表示

而且,如果 kue 中有许多作业等待执行,则 kue 可能会溢出一些作业。

关于node.js - 如何管理nodejs中Redis的kue模块中的TTL超出错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46784301/

相关文章:

java - 在 Redis 上放置 Java 对象 : is JavaNode a correct approach?

python - Google App Engine - Python - 任务队列 - 如何添加任务列表?

redis - 仅当 key 存在时如何重新分配hset

node.js - socket.to(socket.id).emit() 不起作用

node.js - Node 要监听azure ubuntu vm的端口/IP地址

node.js - 导出所需的相同对象会使 Mocha 感到困惑

search - Redis 字典序范围搜索结果不一致

c++ - 有效地收集/分散任务

java - 当 App Engine (Java) 启动新实例时,如何确保任务队列 Memcache cron 在它们之间共享?

javascript - 如何比较路径是否在同一个文件夹中