Node.js rsmq - 在重新启动 Node.js 应用程序之前,新消息不会变得可见

标签 node.js redis node-redis

我正在尝试制作 this打包工作。

redis版本:stable 4.0.6

我是这样连接Redis的,没有问题。

发布订阅.js

var redis = require("redis");
var psRedis = redis.createClient();

psRedis.auth("mypasswordishere", function (callback) {
  console.log("connected");
});

module.exports.psRedis = psRedis;

启动Node.js应用程序后,我可以在控制台上看到“已连接”并执行操作,我已经检查过了。

我的 test.js 文件在下面。

测试.js

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;

var RedisSMQ = require("rsmq");

var psRedis = require("./realtime/operations/pubsub").psRedis;

var rsmq = new RedisSMQ({client: psRedis});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
  if (resp === 1) {
    console.log("queue created");
  }
});

rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
  if (resp) {
    console.log(resp);
  }
});

router.get('/pubsubTest', function (req, res, next) {

  async1.waterfall([
    function (callback) {

      rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, function (err, resp) {
        if (resp) {
          console.log("Message sent. ID:", resp);
        }
      });

      callback(null, 'done!');

    }
  ], function (err, result) {
    res.sendStatus(200);
  });

});

module.exports = router;

但是,当我访问 /pubsubTest 时,只有消息 id 出现在控制台上。

Message sent. ID: exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp

如果我重新启动我的 Node.js 应用程序,我会看到下面的结果,这是预期的。为什么它不立即出现?

{ id: 'exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp',
  message: 'Hello World 1',
  rc: 1,
  fr: 1515802884138,
  sent: 1515802880098 }

谢谢。

最佳答案

receiveMessage 不会“触发”。您需要在发送消息后调用它。 您正在寻找的是 rsmq 提供的实时选项。

var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});

现在,对于通过sendMessage 添加到队列的每条新消息,PUBLISH 消息将发送到rsmq:rt:{qname},内容为{msg}。在您的情况下,sendMessage 将发出一个事件,即 rsmq:rt:myqueue

对此有两种解决方案,都将使用事件 rsmq:rt:myqueue

1.第一个使用redis客户端,可以通过redis提供的subscribe方法订阅这个发布的事件,实现PUB/SUB。

 var redis = require('redis');
    const subscribe = redis.createClient();
    subscribe.subscribe('rsmq:rt:myqueue');
    subscribe.on('message', function(msg) {     //msg=>'rsmq:rt:myqueue'
        rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
            if (resp) {
                console.log(resp);
            }
        });
    });

整个代码看起来像这样:

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
  if (resp === 1) {
    console.log("queue created");
  }
});

const subscribe = redis.createClient( 6379,"127.0.0.1"); //creating new 
worker and pass your credentials
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) {     //msg=>'rsmq:rt:myqueue'
    rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
        if (resp) {
            console.log(resp);
        }
    });
});

router.get('/pubsubTest', function (req, res, next) {
  async1.waterfall([
    function (callback) {
      rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, 
function (err, 
      resp) {
        if (resp) {
          console.log("Message sent. ID:", resp);
        }});
      callback(null, 'done!');
    }
  ], function (err, result) {
    res.sendStatus(200);
  });
});
module.exports = router;

2.第二种解决方案是使用 rsmq-worker,它会为您提供一个message 事件,您可以使用 收听该事件>.on 方法。

var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1}); // this worker 
will poll the queue every .1 second.

worker.on( "message", function( message, next, msgid ){
     if(message){
         console.log(message);
     }
    next();
 });
worker.start();

整个代码看起来像这样:

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis},{ ns: "rsmq",realtime :true});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
    if (resp === 1) {
        console.log("queue created");
    }
});

var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1});
worker.on( "message", function( message, next, msgid ){
        if(message){
            console.log(message);
        }
    next();
});


// optional error listeners
worker.on('error', function( err, msg ){
    console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
    console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();


router.get('/pubsubTest', function (req, res, next) {
    async1.waterfall([
        function (callback) {
            rsmq.sendMessage({qname: "myqueue", message: "Hello World1"}
            ,function (err, resp) {
                if (resp) {
                    console.log("Message sent. ID:", resp);
                }});
        callback(null, 'done!');
        }
    ], function (err, result) {
    res.sendStatus(200);
    });
});
module.exports = router;

注意:在第一个解决方案中,您需要使用 deleteMessage 删除从队列中收到的消息,或者您也可以使用 popMessage 来接收最后一条消息,并且给你删除。如果您不删除该消息,您将再次收到所有消息,直到该特定消息的超时结束。

出于这个原因,我更喜欢使用第二种解决方案,rsmq 会为您处理这些事情,您也可以提供自己的轮询间隔

关于Node.js rsmq - 在重新启动 Node.js 应用程序之前,新消息不会变得可见,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48235491/

相关文章:

javascript - 使用 nodejs 上传 blob

javascript - 访问 JSON 对象中的值

javascript - ionic 应用程序不会连接到 Socket.IO

python - Redisearch 前缀搜索始终返回多个字段索引的总计最大值 200

javascript - Redis递归:超出最大调用堆栈大小

javascript - 保存 promise 的变化变量

python - 不能在 Django 中 pickle 字典

hibernate - 使用 spring boot 和 cloud foundry 将 Redis 集成为非平台 oracle 数据库的 Hibernate 二级缓存

node.js - 有没有办法在redis服务的GEOADD方法中添加Date属性

redis - 我的任务队列 (kue.js) 遇到乱序问题