node.js - 如何让 ROUTER -> DEALER 进行回显?

标签 node.js zeromq

我正在尝试构建 Paranoid Pirate Pattern 的后半部分,一个将工作发送到一组 DEALER Node 的 ROUTER(我可能误解了该图)。现在我只希望经销商回应工作或只是发回一条“完成”的消息。问题是工作 Node (DEALER)从未收到任何消息。

var buildSocket, connectionTemplate, delay, frontPort, log, q, qPort, worker, workerPort, zmq;

zmq = require("zmq");    
frontPort = 5000;    
qPort = 5100;    
workerPort = 5200;    
connectionTemplate = "tcp://127.0.0.1:";    
log = console.log;    
debugger;    
delay = process.argv[2] || 1000;

buildSocket = function(desc, socketType, port) {
  var socket;
  log("creating " + socketType + " socket");
  socket = zmq.socket(socketType);
  socket.identity = "" + desc + "-" + socketType + "-" + process.pid + "-" + port;
  return socket;
};

q = buildSocket('q_output', 'router', qPort);

worker = buildSocket('worker', 'dealer', workerPort);

worker.bind("" + connectionTemplate + workerPort);

q.connect("" + connectionTemplate + workerPort);

q.on('message', function() {
  var args;
  args = Array.apply(null, arguments);
  log('queue received ' + JSON.stringify(arguments));
  return worker.send('work done');
});

worker.on('message', function() {
  var args;
  log('back received ' + JSON.stringify(arguments));
  args = Array.apply(null, arguments);
  return q.send(args);
});

setInterval((function() {
  var value;
  value = Math.floor(Math.random() * 100);
  console.log(q.identity + ": sending " + value);
  q.send(value);
}), delay);

“消息”事件上的队列和工作线程永远不会触发。我的理解是,您设置 ROUTER Node ,将其绑定(bind)到端口(用于返回消息),设置 DEALER Node 并将它们绑定(bind)到端口,然后将 ROUTER 连接到 DEALER 端口并开始发送消息。在实践中,消息被发送但从未被接收:

creating router socket
creating dealer socket
q_output-router-60326-5100: sending 30
q_output-router-60326-5100: sending 25
q_output-router-60326-5100: sending 65
q_output-router-60326-5100: sending 68
q_output-router-60326-5100: sending 50
q_output-router-60326-5100: sending 88

最佳答案

这里的事情有点倒退了。将 DEALER 套接字视为修改后的 REQ 套接字...它应该向路由器发起消息。 ROUTER 套接字更像是修改后的 REP 套接字...它应该响应经销商发送的初始请求。

您不需要严格遵循ROUTER/DEALER配对的模式...但这确实会让事情变得更加容易,所以您应该在学习时坚持下去。

从您的代码中让我印象深刻的第二件事是您的消息处理程序,您发送消息的套接字是错误的。

以这段代码为例(直接复制,无需修改):

q.on('message', function() {
  var args;
  args = Array.apply(null, arguments);
  log('queue received ' + JSON.stringify(arguments));
  return worker.send('work done');
});

...(在伪代码中):

when `q` receives a message from `worker`
  print out the message we received
  now have `worker` send *another* message that says "work done"

您想要的是更像以下内容(简化的):

var zmq = require("zmq");
var q = zmq.socket('router');
var worker = zmq.socket('dealer');

// I've switched it so the router is binding and the worker is connecting
// this is somewhat arbitrary, but generally I'd consider workers to be less
// reliable, more transient, and also more numerous. I'd think of the queue
// as the "server"

// I've used bindSync, which is synchronous, but that's typically OK in the
// startup phase of a script, and it simplifies things.  If you're spinning
// up new sockets in the middle of your script, using the async bind()
// is more appropriate
q.bindSync('tcp://127.0.0.1:5200');

worker.connect('tcp://127.0.0.1:5200');

q.on('message', function() {
  var args;
  args = Array.apply(null, arguments);
  log('queue received ' + JSON.stringify(arguments));

  // if we've received a message at our queue, we know the worker is ready for
  // more work, so we ready some new data, regardless of whether we
  // received work back
  var value = Math.floor(Math.random() * 100);

  // note that the socket that received the message is responding back
  if (args[1].toString() == 'ready') console.log('worker now online');
  else console.log('work received: '+args[1].toString());

  // we need to specify the "ID" of our worker as the first frame of
  // the message
  q.send([args[0], value]);

  // we don't need to return anything, the return value of a
  // callback is ignored
});

worker.on('message', function() {
  var args;
  log('back received ' + JSON.stringify(arguments));
  args = Array.apply(null, arguments);

  // we're just echoing back the "work" we received from the queue
  // for additional "workiness", we wait somewhere between 10-1000
  // milliseconds to respond
  setTimeout(function() {
    worker.send(args[0].toString());
  }, parseInt(args[0].toString())*10);
});

setTimeout((function() {
  var value;
  console.log('WORKER STARTING UP');

  // the worker starts the communication, indicating it's ready
  // rather than the queue just blindly sending work
  worker.send('ready'); // sending the first message, which we catch above
}), 500); // In my experience, half a second is more than enough, YMMV

...如您所见,模式是:

  1. 工作人员表示已准备就绪
  2. 队列发送可用工作
  3. 工作人员完成工作并发回
  4. 队列接收已完成的工作并发回更多工作
  5. 转到3

关于node.js - 如何让 ROUTER -> DEALER 进行回显?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24918797/

相关文章:

python - PyZMQ PUSH 套接字不会阻塞 send()

node.js - 如何在node.js/socket.io中使用removeListener来防止多次触发函数

node.js - Azure Web应用程序 Node 部署,更改应用程序根目录?

node.js - 无法从公司代理服务器使用 Bower,如何解决?

javascript - 为什么 NodeJS v7 中的 try catch block 比正常速度更快

python - 使用 IPC、Twisted 或 ZeroMQ 的架构方法?

node.js - Paypal REST API : shipping_address does not understand utf8 characters

python - ZMQ连接后 sleep ?

zeromq - 为什么 LINGER=0 和 SNDTIMEO=0 用于 Zyre Actor 的 PAIR 套接字?

messaging - ZeroMQ生产准备就绪了吗?