c++ - Deadline_timers 的非阻塞 boost io_service

标签 c++ multithreading boost-asio

阅读 boost::asio::deadline_timer 的文档后,似乎 io_service::run() 和处理程序方法是在同一线程上调用的。有没有一种方法可以在后台线程上运行 io_service 对象时在一个线程上创建计时器?

最佳答案

为了乐趣和荣耀,这里介绍了如何将线程队列与 asio 截止时间计时器结合起来,从截止时间计时器分派(dispatch)非阻塞任务:

Live On Coliru

#ifndef HEADER_GUARD_CUSTOM_THREADPOOL_HPP
#define HEADER_GUARD_CUSTOM_THREADPOOL_HPP
#include <boost/function.hpp>
#include <boost/optional.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <boost/phoenix.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
#include <string>
#include <deque>

namespace custom {
    using namespace boost;

    class thread_pool
    {
    private:
        mutex mx;
        condition_variable cv;

        typedef function<void()> job_t;
        std::deque<job_t> _queue;

        thread_group pool;

        boost::atomic_bool shutdown;
        static void worker_thread(thread_pool& q)
        {
            while (optional<job_t> job = q.dequeue())
                (*job)();
        }

    public:
        thread_pool() : shutdown(false) {
            //LOG_INFO_MESSAGE << "Number of possible Threads: " << boost::thread::hardware_concurrency() << std::endl;
            for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i){
                pool.create_thread(bind(worker_thread, ref(*this)));
            }
        }

        void enqueue(job_t job)
        {
            lock_guard<mutex> lk(mx);
            _queue.push_back(job);

            cv.notify_one();
        }

        optional<job_t> dequeue()
        {
            unique_lock<mutex> lk(mx);
            namespace phx = boost::phoenix;

            cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

            if (_queue.empty())
                return none;

            job_t job = _queue.front();
            _queue.pop_front();
            return job;
        }

        ~thread_pool()
        {
            shutdown = true;
            {
                lock_guard<mutex> lk(mx);
                cv.notify_all();
            }

            pool.join_all();
        }
    };
}

#endif // HEADER_GUARD_CUSTOM_THREADPOOL_HPP

以及简单的测试程序:

#include <boost/asio.hpp>

namespace a = boost::asio;
using error = boost::system::error_code;

void timer_loop(a::deadline_timer& tim, custom::thread_pool& pool) {
    static boost::atomic_int count(0);

    tim.expires_from_now(boost::posix_time::milliseconds(10));
    tim.async_wait([&](error ec) {
        if (!ec && (++count < 100)) {
            int id = count;

            pool.enqueue([id] { 
                std::cout << "timer callback " << id << " started on thread " << boost::this_thread::get_id() << "\n";
                boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
                std::cout << "timer callback " << id << " completed\n";
            });

            std::cout << "Job " << id << " enqueued" << "\n";
            timer_loop(tim, pool);
        }
    });
}

int main()
{
    a::io_service svc;
    a::deadline_timer tim(svc);
    custom::thread_pool pool;

    timer_loop(tim, pool);

    svc.run();
}

关于c++ - Deadline_timers 的非阻塞 boost io_service,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26720271/

相关文章:

c++ - Android NDK - 多线程正在减慢渲染速度

thread-safety - asio隐式链和数据同步

sockets - 在 Android 中我应该选择 Boost Asio 还是 Async Socket 线程?

c++ - 为什么需要 `std::function::operator=(F &&)` 来制作临时 `std::function` ?

c++ - 如何包装模板函数以处理 const 和非常量数据

c - 线程如何知道共享变量的地址

c++ - 使用 Boost Asio 设置帖子队列大小限制?

c++ - 返回的字符串和获取的字符串不匹配

java - 我如何将 openCv CvScalar 变量转换为 java?

java - 同步块(synchronized block)和变量作用域