看这个主题: zeromq pub/sub example in c (libzmq)
那里的代码可以在 Windows 中运行(Visual Studio 2019 编译器和 vcpkg 安装了 Zeromq - Zeromq:x64-windows 4.3.4#6)
#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"
int main(int argc, char const* argv[])
{
DWORD thread_id = GetCurrentThreadId();
// create a context that will be shared by all threads
void* context = zmq_ctx_new();
const char* endpoint = "tcp://127.0.0.1:4040";
std::thread publisher_thread = std::thread([&]
{
void* publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, endpoint);
//assert(rc == 0);
while (1)
{
rc = zmq_send(publisher, "Hello World!", 12, 0);
//assert(rc == 12);
}
zmq_close(publisher);
});
// quick hack to make sure publisher thread is up and running before subscribers connect.
Sleep(1000);
std::thread subscriber_thread_all = std::thread([&]
{
void* subscriber = zmq_socket(context, ZMQ_SUB);
int rc = zmq_connect(subscriber, endpoint);
//assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
//assert(rc == 0);
char message[12];
while (1)
{
rc = zmq_recv(subscriber, message, 12, 0);
//assert(rc != -1);
printf("%s\n", message);
}
zmq_close(subscriber);
});
subscriber_thread_all.join();
publisher_thread.join();
zmq_ctx_destroy(context);
return 0;
}
这可以工作并产生预期的结果,子线程接收来自发布者的所有消息。
但是,如果我添加除全部之外的任何订阅,则不会发生过滤。例如,对上面进行轻微修改以在发布者中发送交替消息并订阅以“H”开头的消息:
#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"
int main(int argc, char const* argv[])
{
DWORD thread_id = GetCurrentThreadId();
// create a context that will be shared by all threads
void* context = zmq_ctx_new();
const char* endpoint = "tcp://127.0.0.1:4040";
std::thread publisher_thread = std::thread([&]
{
void* publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, endpoint);
//assert(rc == 0);
int message_counter(0);
while (1)
{
if (message_counter % 2 == 0)
{
rc = zmq_send(publisher, "Hello World!", 12, 0);
}
else
{
rc = zmq_send(publisher, "Goodbye World!", 12, 0);
}
++message_counter;
//assert(rc == 12);
}
zmq_close(publisher);
});
// quick hack to make sure publisher thread is up and running before subscribers connect.
Sleep(1000);
std::thread subscriber_thread_all = std::thread([&]
{
void* subscriber = zmq_socket(context, ZMQ_SUB);
int rc = zmq_connect(subscriber, endpoint);
//assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
//assert(rc == 0);
char message[12];
while (1)
{
rc = zmq_recv(subscriber, message, 12, 0);
//assert(rc != -1);
printf("%s\n", message);
}
zmq_close(subscriber);
});
subscriber_thread_all.join();
publisher_thread.join();
zmq_ctx_destroy(context);
return 0;
}
结果仍然是所有消息都传递给订阅者,而不仅仅是那些以 H 开头的消息。
我也尝试过使用消息信封和其他传输协议(protocol),但这些似乎也不起作用。
最佳答案
您已经编辑了用于订阅所有内容的行:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
进入此:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
但是您忘记将选项值的大小从 0 更改为 1。由于长度仍然为零,因此您仍然订阅了所有内容。当然应该是:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 1);
关于c - c (libzmq) 中的 Zeromq pub/sub 示例带有工作过滤器(在 Windows 上)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75526592/