c++ - zmq::proxy 示例不起作用 ()

标签 c++ c++11 zeromq

我目前正在学习如何使用 ZeroMQ 库,一位 friend 建议我将其用于个人项目。

阅读文档并计划如何在我的项目中使用该库后,我开始使用文档给出的代码测试该项目。 我使用的测试是 this one 。 不幸的是它不起作用。我做了一些小的修改来测试它。 (我给你了我在测试中得到的确切代码,我很抱歉,但如果没有一切,我认为它没有意义,而且不可能帮助我:/)。

我几乎没有对文档给出的测试进行任何更改,只是添加了一些输出来测试,并且还删除了客户端中的轮询(我认为问题来自这里,因为它阻塞了无限循环,即使认为有一个超时)。

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5570");

            zmq_pollitem_t items;
            items.socket = &client_socket_;
            items.fd = 0;
            items.events = ZMQ_POLLIN;
            items.revents = 0;

            int request_nbr = 0;
            try {
                while (true) {

                    for (int i = 0; i < 100; ++i) {

                        // 10 milliseconds
                        sleep(1);
                        std::cout << "ici" << std::endl;
                        if (items.revents & ZMQ_POLLIN) {
                            printf("\n%s ", identity);
                            s_dump(client_socket_);
                        }

                        char request_string[16] = {};
                        sprintf(request_string, "request #%d", ++request_nbr);
                        client_socket_.send(request_string, strlen(request_string));

                    }
                }

            }
            catch (std::exception &e)
            {}
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);
                    std::cout << "I never arrive here" << std::endl;

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        std::cout << "LA" << std::endl;
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e) {}
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5570");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(&frontend_, &backend_, NULL);
            }
            catch (std::exception &e) {}

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));
        std::thread t4(std::bind(&server_task::run, &st));

        t1.detach();
        t2.detach();
        t3.detach();
        t4.detach();
        std::cout << "ok" << std::endl;
        getchar();
        std::cout << "ok" << std::endl;
        return 0;
    }

我从这段代码中得到的输出如下:

-> CC66-C879
-> 3292-E961
-> C4AA-55D1
ok
ici
ici
ici
... (infinite ici)

我真的不明白为什么它不起作用。 poll在客户端对非socket操作发送异常Socket。 对我来说主要的问题是这是来自官方文档的测试,我无法使其工作。我使用套接字时遇到什么问题?

感谢您的帮助

最佳答案

我发现了问题。

官方文档中存在一个问题(一些明显的错误,例如 zmq_pollitem_t 数组的初始化)以及另一个导致我的测试无法正常工作的问题。

对于 zmq::poll 或 zmq::proxy,您需要将套接字结构转换为 void* 并且不能在套接字上使用指针。 ZMQ poll not working

经过这些修改后,它起作用了。我又发了一篇文章来解释原因 here

这是更正后的代码,没有我的额外测试输出:

        //  Asynchronous client-to-server (DEALER to ROUTER)
    //
    //  While this example runs in a single process, that is to make
    //  it easier to start and stop the example. Each task has its own
    //  context and conceptually acts as a separate process.

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5555");

            zmq_pollitem_t items[1];                
            items[0].socket = static_cast<void *> (client_socket_);
            items[0].fd = 0;
            items[0].events = ZMQ_POLLIN;
            items[0].revents = 0;
            int request_nbr = 0;
            try {
                while (true) {
                    for (int i = 0 ; i < 100; ++i) {

                    zmq::poll(items, 1, 10);
                    if (items[0].revents & ZMQ_POLLIN) {
                            printf("\n%s =>", identity);
                            s_dump(client_socket_);
                        }
                    }

                    char request_string[16] = {};
                    sprintf(request_string, "request #%d", ++request_nbr);
                    client_socket_.send(request_string, strlen(request_string));

                }

            }
            catch (std::exception &e)
            {
                std::cout << "exception :  "  << zmq_errno() << " "<< e.what() << std::endl;
                if (zmq_errno() == EINTR)
                    std::cout << "lol"<< std::endl;
            }
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e)
            {
                std::cout << "Error in worker : " << e.what() << std::endl;
            }
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5555");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), NULL);
            }
            catch (std::exception &e)
            {
                std::cout << "Error in Server : " << e.what() << std::endl;
            }

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t4(std::bind(&server_task::run, &st));
        t4.detach();
        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));

        t1.detach();
        t2.detach();
        t3.detach();

        getchar();
        return 0;
    }

关于c++ - zmq::proxy 示例不起作用 (),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32041538/

相关文章:

c++ - 在 int 数组的情况下可以使用 new 表达式 "overflow"吗?

C++ ofstream,不带 CRLF 的打印

c++ - 生成真正的随机数

c++ - 为什么 "constructor invocation"不触发任何编译器 (g++) 警告?

c++ - 零MQ/ZMQPP : Forward metadata with message

c++ - 使用推力根据索引更改某些元素的值

c++ - 是否有一些技巧可以让我将流操纵器传递给可变参数模板函数?

c++ - lambda 函数中的捕获变量混淆

c# - 是否有很好的教程或示例显示 protobuf-net 和 zeromq 的组合?

node.js - 如何使用 zmq (zeromq) 在两个 NodeJS 服务器之间发送多个请求并路由异步响应?