我正在学习 ZeroMQ 并从头开始制作指南中的所有示例(在 NodeJS 中)。但我使用的是一个创建本地集群来处理作业的堆栈,如果它无法再处理,则将作业发送到通过其他套接字连接的“云对等点”( “Putting it All Together” example )。
我请求您帮助调试代码,以了解为什么客户端的消息未被云对等点处理(并返回):
- 当workers连接时,发送一条READY消息让localBE有多少。此消息已发布给所有云对等点(附有对等点名称)。
- 当客户端发送请求时,localFE 会接收该请求。
- 如果有本地工作人员,则由 LocalFE 路由至 localBE。否则路由到随机的 cloudBE 对等点。
- 据推测,cloudFE 会收到该消息并路由到其本地工作人员(如果可用)。然后它应该返回到原始对等方的客户端 (¡!)
如果您clone and execute my repo (cd
到 Chapter3,然后使用两个终端,例如 nodepeering3.js me you
和 nodepeering3.js you me
),您可以跟踪谁发送和接收(Get
)数据。
您可以使用 NBR_CLIENTS
和 NBR_WORKERS
(第 12 和 13 行)并查看作业未正确发送/返回...
如果您能看一下我的代码,我将非常感激!
提前致谢...
最佳答案
您在云代理之间进行寻址时遇到问题。您正在使用本地workers
ID 在云代理之间进行寻址。对代码进行注释以突出问题。
// - Route any request locally if we can, else to cloud.
localfe.on('message', function() {
var args = Array.apply(null, arguments);
console.log('LocalFE: Get ' + args.toString());
if (localCapacity > 0) {
console.log('LocalFE: Send '+ workers.shift() + ',\'\',' + args[0]+ ',\'\','+ args[2] + ' to LocalBE');
localCapacity--;
statebe.send(localCapacity);
localbe.send([workers.shift(), '', args[0], '', args[2]]);
} else {
// Route to random broker peer automatically
var randomPeer = randomBetween(3, argc);
var peer = process.argv[randomPeer];
/////////////////
// why are you referencing `workers` here, that is only for local workers
// You correctly route to `peer` here, though, so that should be fine
// however, you've removed a local worker from the queue erroneously
/////////////////
console.log('LocalFE: Send '+ workers.shift() + ',\'\',' + args[0]+ ',\'\','+ args[2] + ' to CloudBE at peer ' + peer);
cloudbe.send([peer, '', args[0], '', args[2]]);
}
});
... cloudfe
然后正确地将消息路由到本地工作线程,但在执行此操作之前它不会检查 workers
队列中的可用工作线程,因此,如果所有本地工作线程都被占用,那么你就会陷入困境,本地 workers
队列将为空,并且消息将不会发送到任何地方。您还会丢失对云对等点的引用,因此当消息返回时,无法知道它需要返回云对等点:
cloudfe.on('message', function() {
var args = Array.apply(null, arguments);
console.log('CloudFE: Get ' + args.toString());
/////////////////
// if `workers` is already empty, `shift()`ing it will get you `undefined`
// also, you're removing the ID from the queue, which causes problems below
/////////////////
console.log('CloudFE: Send '+ workers.shift() + ',\'\',' + args[2]+ ',\'\','+ args[4] + ' to LocalBE');
localCapacity--;
statebe.send(localCapacity);
/////////////////
// you're now sending it to a *different* worker than you logged above
// and you've removed *two* workers from the queue instead of one
// as said above, if `workers` is already empty, you'll route it nowhere
// and lose the message
// further, nowhere in here are you logging the identity of the cloud peer, so you've
// lost the ability to route it back to the cloud peer that has the client
/////////////////
localbe.send([workers.shift(), '', args[2], '', args[4]]);
});
...工作人员应该处理消息并将其成功发送回其本地代理,至少对于前两条消息(不是 5 条消息,因为我们只将其发送给工作人员 2 和 4)。但我们不仅丢失了对之前向我们发送消息的云代理的引用,而且当我们从工作线程收到消息时,我们甚至没有尝试将其发回:
// Reply from local worker.
localbe.on('message', function() {
var args = Array.apply(null, arguments);
//console.log('LocalBE: Get ' + args.toString());
workers.push(args[0]); // Add its identity to the array.
// We broadcast new capacity messages to other peers.
localCapacity++;
statebe.send(localCapacity);
// If it's not READY message, route the reply to client.
if (args[2].toString() != WORKER_READY) {
console.log('LocalBE: Send ' + args[2].toString() + ',\'\', ' + args[4].toString());
/////////////////
// you're attempting to send it directly back to the client, but the client
// you're addressing is not `connect()`ed to this broker, it's connected to
// the cloud broker, so it goes nowhere
/////////////////
localfe.send([args[2], '', args[4]]);
}
});
所以:
- 当
localfe
发送到云代理时,不要shift()
您的工作人员排队 - 您可能会在有机会向他们发送工作之前就失去所有本地工作人员 - 开始在
cloudfe
上接收消息时,shift()
将工作人员 ID 移入局部变量一次,并在需要时使用该局部变量 - 捕获您的云对等点 ID 并将其添加到消息中,以便您知道哪个对等点发起了云请求。
- 如果队列中没有可用的工作线程,请保留它并执行
setTimeout()
重试,或者将其发送到新的云对等点。为了简单起见,我建议使用前者,否则您必须跟踪消息中潜在的一整套云对等 ID。 - 收到工作人员发回的消息时,检查云对等 ID,如果找到,则将其适当路由回,而不是盲目地将其路由回可能通过不同云代理连接的客户端。
关于node.js - 使用 NodeJS 发送到 ZeroMQ 云后端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31581908/