c - c (libzmq) 中的 Zeromq pub/sub 示例带有工作过滤器(在 Windows 上)?

标签 c windows zeromq

看这个主题: zeromq pub/sub example in c (libzmq)

那里的代码可以在 Windows 中运行(Visual Studio 2019 编译器和 vcpkg 安装了 Zeromq - Zeromq:x64-windows 4.3.4#6)

#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"

int main(int argc, char const* argv[])
{
    DWORD thread_id = GetCurrentThreadId();
    // create a context that will be shared by all threads
    void* context = zmq_ctx_new();
    const char* endpoint = "tcp://127.0.0.1:4040";

    std::thread publisher_thread = std::thread([&]
    {
        void* publisher = zmq_socket(context, ZMQ_PUB);
        int rc = zmq_bind(publisher, endpoint);
        //assert(rc == 0);

        while (1)
        {
            rc = zmq_send(publisher, "Hello World!", 12, 0);
            //assert(rc == 12);
        }

        zmq_close(publisher);
    });

    // quick hack to make sure publisher thread is up and running before subscribers connect.
    Sleep(1000);

    std::thread subscriber_thread_all = std::thread([&]
    {
        void* subscriber = zmq_socket(context, ZMQ_SUB);
        int rc = zmq_connect(subscriber, endpoint);
        //assert(rc == 0);
        rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
        //assert(rc == 0);

        char message[12];

        while (1)
        {
            rc = zmq_recv(subscriber, message, 12, 0);
            //assert(rc != -1);
            printf("%s\n", message);
        }
        zmq_close(subscriber);
    });

    subscriber_thread_all.join();
    publisher_thread.join();

    zmq_ctx_destroy(context);
    return 0;
}

这可以工作并产生预期的结果,子线程接收来自发布者的所有消息。

但是,如果我添加除全部之外的任何订阅,则不会发生过滤。例如,对上面进行轻微修改以在发布者中发送交替消息并订阅以“H”开头的消息:

#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"

int main(int argc, char const* argv[])
{
    DWORD thread_id = GetCurrentThreadId();
    // create a context that will be shared by all threads
    void* context = zmq_ctx_new();
    const char* endpoint = "tcp://127.0.0.1:4040";

    std::thread publisher_thread = std::thread([&]
    {

        void* publisher = zmq_socket(context, ZMQ_PUB);
        int rc = zmq_bind(publisher, endpoint);
        //assert(rc == 0);

        int message_counter(0);
        while (1)
        {
            if (message_counter % 2 == 0)
            {
                rc = zmq_send(publisher, "Hello World!", 12, 0);
            }
            else
            {
                rc = zmq_send(publisher, "Goodbye World!", 12, 0);
            }
            ++message_counter;
            //assert(rc == 12);
        }

        zmq_close(publisher);
    });

    // quick hack to make sure publisher thread is up and running before subscribers connect.
    Sleep(1000);

    std::thread subscriber_thread_all = std::thread([&]
    {
        void* subscriber = zmq_socket(context, ZMQ_SUB);
        int rc = zmq_connect(subscriber, endpoint);
        //assert(rc == 0);
        rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
        //assert(rc == 0);

        char message[12];

        while (1)
        {
            rc = zmq_recv(subscriber, message, 12, 0);
            //assert(rc != -1);
            printf("%s\n", message);
        }
        zmq_close(subscriber);
    });

    subscriber_thread_all.join();
    publisher_thread.join();

    zmq_ctx_destroy(context);
    return 0;
}

结果仍然是所有消息都传递给订阅者,而不仅仅是那些以 H 开头的消息。

我也尝试过使用消息信封和其他传输协议(protocol),但这些似乎也不起作用。

最佳答案

您已经编辑了用于订阅所有内容的行:

rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);

进入此:

rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);

但是您忘记将选项值的大小从 0 更改为 1。由于长度仍然为零,因此您仍然订阅了所有内容。当然应该是:

rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 1);

关于c - c (libzmq) 中的 Zeromq pub/sub 示例带有工作过滤器(在 Windows 上)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75526592/

相关文章:

c - 程序崩溃-段错误

windows - 如何在 Windows cmd 中将文件内容作为命令参数传递?

python - ZMQ : No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

c - 将 ZMQ 上下文从 C 传递到嵌入式 Lua

c - Valgrind 内存泄漏指向本地时间

c - 使用 ATMEGA328p 的移位寄存器

c - 从 C 使用 gnuplot

c++ - 如何执行快速用户切换

windows - 如何安装 .pm 以运行 perl 脚本

使用 ZeroMQ 时从线程调用 system()