我正在开发一个 node.js 组件,它监听消息队列 (ActiveMQ) 并将接收到的消息分批添加到 Redis(每批必须 20 条)。
当从 ActiveMQ 接收到的消息数量为每秒 10 条或更少时,没有问题。
我的问题是消息每 4 毫秒添加到队列中。这导致添加到批处理的记录数有时每批处理超过 20 条。
const stompit = require('stompit');
var uuid = require('node-uuid');
var Redis = require('ioredis');
var redis = new Redis();
var pipeline = redis.pipeline();
var batchCounter = 0;
stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) {
client.subscribe({ destination: 'MyQueue' }, function(err2, msg) {
msg.readString('UTF-8', function(err3, body) {
if (batchCounter >= 20){
pipeline.exec(function(err4, results) {
pipeline = redis.pipeline();
batchCounter = 0;
console.log(results);
});
}
batchCounter++;
pipeline.set(uuid.v1(), JSON.stringify(body));
//client.disconnect();
});
});
});
如何解决这个问题? 谢谢
最佳答案
在调用 .exec
方法之前尝试重置管道,我认为这是一个异步方法。因为 .exec
在未来的某个时间点运行,增量和 pipeline.set
可以在它之前运行。
以下代码保存当前管道并在.exec
if (batchCounter >= 20){
let fullpipeline = pipeline;
pipeline = redis.pipeline();
batchCounter = 0;
fullpipeline.exec(function(err4, results) {
console.log(err4, results);
});
}
然后新消息应该只附加到新管道。
关于Node.js应用监听消息队列并异步添加消息到redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42221874/