c# - 无法使用 NetMQ 4.x 使 ReadyReceive pub-sub 工作

标签 c# netmq

我创建了 2 个简单的 C# 控制台项目 (.net 4.5.2),向每个项目添加了 v4.0.0.1 NetMQ Nuget 包,将每个程序加载到单独的 Visual Studio 2017 社区版中,在第 1 行放置了一个断点包含在 OnReceiveReady 回调方法中,首先启动订阅程序,然后启动发布程序。 ReceieveReady 事件未在订阅者中触发。我究竟做错了什么?即使我选择了 subSocket.Subscribe(""),我仍然没有收到任何消息。此外,删除/修改发送/接收 HighWatermarks 也没有改变任何事情。感谢您的帮助!

这是发布者代码:

using System;
using NetMQ;
using NetMQ.Sockets;
using System.Threading;

namespace SampleNQPub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            using (var pubSocket = new PublisherSocket())
            {
                Console.WriteLine("Publisher socket binding.");
                pubSocket.Options.SendHighWatermark = 10;
                pubSocket.Bind(addr);

                for (int i=0; i < 30; i++)
                {
                    pubSocket.SendMoreFrame("NQ").SendFrame(i.ToString());
                    Thread.Sleep(1000);
                }

                pubSocket.Disconnect(addr);
            }
        }
    }
}

这是订阅者代码:

using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace SampleNQSub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.ReceiveReady += OnReceiveReady;
                subSocket.Options.ReceiveHighWatermark = 10;
                subSocket.Connect(addr);
                subSocket.Subscribe("NQ");

                for (int i=0; i < 20; i++)
                {
                    Thread.Sleep(1000);
                }

                subSocket.Disconnect(addr);
            }
        }

        static void OnReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var str = e.Socket.ReceiveFrameString();
        }
    }
}

最佳答案

好的,这是 NetMQ 世界中的一个陷阱问题,我刚刚弄明白了。您必须设置一个 NetMQPoller,它将最终调用您添加到其中的每个 ReceiveReady 回调 (NetMQPoller)。

这里是更正后的代码,它至少(即 ReceiveFrameString 仍然只获取“NQ”部分,但这只是另一个需要修复的方法调用)触发 ReceiveReady 事件:

using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace SampleNQSub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            NetMQPoller poller = new NetMQPoller();

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.ReceiveReady += OnReceiveReady;
                subSocket.Options.ReceiveHighWatermark = 10;
                subSocket.Connect(addr);
                subSocket.Subscribe("NQ");

                poller.Add(subSocket);
                poller.RunAsync();

                for (int i = 0; i < 20; i++)
                {
                    Thread.Sleep(1000);
                }

                subSocket.Disconnect(addr);
            }
        }

        static void OnReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var str = e.Socket.ReceiveFrameString();
            e.Socket.ReceiveMultipartStrings();
        }
    }
}

我注意到 NetMQ 的作者在 4.x 中决定在内部处理 Context 对象,这样用户就不必承担管理它的负担。对于最简单的用例,如果他们也能对用户隐藏这个“轮询泵”代码,那就太好了。

作为比较,使用我在上面发布的发布者控制台应用程序查看使用 NodeJS(带有 zmq 库)的订阅者(将此代码保存到 sub.js 并在 Windows 控制台中键入“node sub.js '):

var zmq = require('zmq'), sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3004');
sock.subscribe('NQ');
console.log('Subscriber connected to port 3004');

sock.on('message', function() {
    var msg = [];
    Array.prototype.slice.call(arguments).forEach(function(arg) {
        msg.push(arg.toString());
    });

    console.log(msg);
});

那么这里的轮询泵机制在哪里? (答案:我不在乎!我只想要在我注册的回调中提供给我的消息。[显然,开玩笑。我知道 NetMQPoller 是多功能的并且可以处理更复杂的问题,但对于基本的“当它到达时在回调中给我一条消息”,如果它由库在内部处理就好了。])

关于c# - 无法使用 NetMQ 4.x 使 ReadyReceive pub-sub 工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44726439/

相关文章:

c# - 每月、每周和每天的 Quartz.Net Cron 表达式

C# 如何调整图像大小并以尽可能高的质量保存它们 - 批处理作业

zeromq - NetMQ 为什么 Req-Rep 需要 "SendReady"?

c# - NetMQ 套接字处理后无法重用端点

c# - 为什么 NetMQ 不能在 NUnit 环境中工作

C# - 通过进程名称的一部分终止进程

c# - 特定 Controller 的路由

c# - Windows Azure - approot 路径中的访问被拒绝

c# - 最新的 NetMQ 多线程示例

sockets - Zeromq 哪个套接字应该绑定(bind)到 PubSub 模式