c++ - boost asio Deadline_timer async_wait(N秒)在N秒内两次导致操作取消

标签 c++ boost-asio zeromq

我想要的是当一个消息队列接收到一个int N时,处理函数将在N秒后被调用。下面是我的代码。

如果两个邻近消息队列的持续时间秒数大于 int N,则运行正常,但当两个接收到的消息队列之间的持续时间秒数小于 N 时,处理程序将在一个处理程序中打印“操作已取消”,即这不是我想要的。

非常感谢您的帮助。

#include <boost/asio.hpp>
#include <zmq.h>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;

void* context = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);


void handler(const boost::system::error_code &ec) {
    std::cout << "hello, world" << "\t" << ec.message() << std::endl;
}

void run() {
    io_service.run();
}

void thread_listener() {

     int nRecv;
     boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0));
     while( true ) {
         zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
         std::cout << nRecv << std::endl;
         timer.expires_from_now(boost::posix_time::seconds(nRecv));
         timer.async_wait(handler);
     }

 }

 int main(int argc, char* argv[]) {

     boost::asio::io_service::work work(io_service);

     zmq_bind(sock_pull, "tcp://*:60000");
     boost::thread tThread(thread_listener);
     boost::thread tThreadRun(run);
     tThread.join();
     tThreadRun.join();
     return 0;

 }

最佳答案

当你打电话时

timer.expires_from_now(boost::posix_time::seconds(nRecv));

这个,as the documentation states ,取消任何挂起的异步计时器。

如果您希望在给定时间有重叠的请求在运行,那么一个计时器显然是不够的。幸运的是,Asio 中的绑定(bind)共享指针有一个众所周知的模式,您可以使用它来模拟每个响应的“ session ”。

假设您定义一个 session 来包含它自己的私有(private)计时器:

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

现在,替换使用硬编码计时器实例的代码很简单:

 timer.expires_from_now(boost::posix_time::seconds(nRecv));
 timer.async_wait(handler);

session 开始:

 boost::make_shared<session>(io_service, nRecv)->start();

一个完整的工作示例(带有适当的 stub ZMQ 内容): Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <iostream>

boost::asio::io_service io_service;

/////////////////////////////////////////////////////////////////////////
// I love stubbing out stuff I don't want to install just to help others
enum { ZMQ_PULL };
static void* zmq_ctx_new()         { return nullptr; }
static void* zmq_socket(void*,int) { return nullptr; }
static void  zmq_bind(void*,char const*) {}
static void  zmq_recv(void*,int*data,size_t,int) 
{ 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
    *data = 2;
}
// End of stubs :)
/////////////////////////////////////////////////////////////////////////

void* context  = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

    ~session() {
        std::cout << "bye (session end)\n";
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

void run() {
    io_service.run();
}

void thread_listener() {
    int nRecv = 0;
    for(int n=0; n<4; ++n) {
        zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
        std::cout << nRecv << std::endl;

        boost::make_shared<session>(io_service, nRecv)->start();
    }
}

int main() {
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service);

    zmq_bind(sock_pull, "tcp://*:60000");
    boost::thread tThread(thread_listener);
    boost::thread tThreadRun(run);

    tThread.join();
    work.reset();

    tThreadRun.join();
}

关于c++ - boost asio Deadline_timer async_wait(N秒)在N秒内两次导致操作取消,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25960268/

相关文章:

c++ - bad_weak_ptr 同时使用从 boost::asio::io_context::service 继承的类

java - ZeroMQ 运行服务器 Java

c++ - 避免在 ZMQ 中缓冲一段确定的时间?

c++ - C++中使用json-spirit读取json字符串

C++ 模板接口(interface)自动解析赋值类型

c++ - 有没有一种方法可以显式销毁给定 boost::asio::io_context 上挂起的所有处理程序?

c++ - boost asio 类似信号量的解决方案

c++ - ZMQ C++ 从特定工作人员发送和接收

c++ - 我的 test.cpp 文件使用默认的运算符<<;这个签名有什么问题?

c++ - 字符串段的Qt模式识别