Node.js应用监听消息队列并异步添加消息到redis

标签 node.js redis activemq

我正在开发一个 node.js 组件,它监听消息队列 (ActiveMQ) 并将接收到的消息分批添加到 Redis(每批必须 20 条)。

当从 ActiveMQ 接收到的消息数量为每秒 10 条或更少时,没有问题。

我的问题是消息每 4 毫秒添加到队列中。这导致添加到批处理的记录数有时每批处理超过 20 条。

const stompit = require('stompit');
var uuid = require('node-uuid');
var Redis = require('ioredis');

var redis = new Redis();
var pipeline = redis.pipeline();
var batchCounter = 0;

stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) {

client.subscribe({ destination: 'MyQueue' }, function(err2, msg) {
    msg.readString('UTF-8', function(err3, body) {

        if (batchCounter >= 20){
            pipeline.exec(function(err4, results) {
                pipeline = redis.pipeline();
                batchCounter = 0;
                console.log(results);
            });
        }

        batchCounter++;
        pipeline.set(uuid.v1(), JSON.stringify(body));
        //client.disconnect();
      });
  });
  });

如何解决这个问题? 谢谢

最佳答案

在调用 .exec 方法之前尝试重置管道,我认为这是一个异步方法。因为 .exec 在未来的某个时间点运行,增量和 pipeline.set 可以在它之前运行。

以下代码保存当前管道并在.exec

之前同步创建一个新管道
if (batchCounter >= 20){
    let fullpipeline = pipeline;
    pipeline = redis.pipeline();
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) {
        console.log(err4, results);
    });
}

然后新消息应该只附加到新管道。

关于Node.js应用监听消息队列并异步添加消息到redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42221874/

相关文章:

Node.js:如何测试我的 API,模拟由我的 API 调用的第三方 API

node.js - 将 CSV 转换为充满数字数组的 JSON 对象

python - Redis - 解析远程服务器提供的数据流

python - 如何使用 STOMP 配置 ActiveMQ?

mysql - 如何防止mysql数据库中出现重复数据

java - RMI 和 ActiveMQ 的区别

javascript - 在 NodeJS 中显示 HTML

javascript - 将从包返回的数据分配给变量

heroku - 使用 Redis 解析 LiveQuery 独立服务器

redis - 将数据从 Redis 从站传播到 SQL 数据库