c++ - boost:asio::read 或 boost:asio::async_read 超时

标签 c++ boost-asio asio

是的。我知道在 boost::asio 中围绕这个 time_out 有一些问题。对于 asio 人员来说,我的问题可能太简单了,无法在这里解决。

我正在 TCP 协议(protocol)上使用 boost::asio 来尽可能快地通过网络连续循环读取数据。

在 while 循环中从工作线程 std::thread 连续调用以下函数 ReadData()

std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) {

 boost::system::error_code error_code;
 buffer.resize(size_to_read);

 // Receive body
 std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code);

 if (bytes_read == 0) {
   // log error
   return;
 }

 return bytes_read;
}

效果很好。返回数据。一切都很好。

我想要的就是boost::asio::read使用time_out。我了解到我需要将 boost::asio::async_readboost::asio::async_wait 一起使用,以便 time_out 技术发挥作用。

一个boost example建议使用 boost::asio::async_read_until

我应该使用 boost::asio::async_read 还是 boost::asio::async_read_until

使用 boost::asio::async_readboost::asio::async_read_untilboost::asio::read 并不重要。但我希望在调用我的方法 ReadData 时触发并完成 asio::read 调用,以便客户端代码不会受到影响。

我怎样才能实现这个目标?请推荐

最佳答案

好的,这样的东西应该适合您的目的:

理由:

您似乎想要使用阻止操作。既然是这种情况,那么您很可能没有运行线程来泵送 io 循环。

因此,我们在套接字的 io 循环上启动两个同时异步任务,然后生成一个线程:

a) 重置(实际上重新启动)循环,以防循环已耗尽

b) 运行循环直至耗尽(这里我们可以更聪明,只运行它直到处理程序指示已满足某些条件,但这是另一天的教训)

#include <type_traits>

template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
    using handler_type = std::decay_t<Handler>;
    using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
    using stream_type = Stream;

    struct state_machine : std::enable_shared_from_this<state_machine>
    {
        state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
                : stream_(stream)
                , sequence_(std::move(sequence))
                , handler_(std::move(handler))
        {}
        void start(std::size_t millis)
        {
            timer_.expires_from_now(boost::posix_time::milliseconds(millis));
            timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
                self->handle_timeout(ec);
            }));
            boost::asio::async_read(stream_, sequence_,
                                    strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
                self->handle_read(ec, size);
            }));
        }

        void handle_timeout(boost::system::error_code const& ec)
        {
            if (not ec and not completed_)
            {
                boost::system::error_code sink;
                stream_.cancel(sink);
            }
        }

        void handle_read(boost::system::error_code const& ec, std::size_t size)
        {
            assert(not completed_);
            boost::system::error_code sink;
            timer_.cancel(sink);
            completed_ = true;
            handler_(ec, size);
        }

        stream_type& stream_;
        buffer_sequence_type sequence_;
        handler_type handler_;
        boost::asio::io_service::strand strand_ { stream_.get_io_service() };
        boost::asio::deadline_timer timer_ { stream_.get_io_service() };
        bool completed_ = false;
    };

    auto psm = std::make_shared<state_machine>(stream,
                                               std::forward<ConstBufferSequence>(sequence),
                                               std::forward<Handler>(handler));
    psm->start(millis);
}

std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
                     std::vector<unsigned char> & buffer,
                     unsigned int size_to_read,
                     boost::system::error_code& ec) {

    buffer.resize(size_to_read);

    ec.clear();
    std::size_t bytes_read = 0;
    auto& executor = socket.get_io_service();
    async_read_with_timeout(socket, boost::asio::buffer(buffer),
                            2000, // 2 seconds for example
                            [&](auto&& err, auto size){
        ec = err;
        bytes_read = size;
    });

    // todo: use a more scalable executor than spawning threads
    auto future = std::async(std::launch::async, [&] {
        if (executor.stopped()) {
            executor.reset();
        }
        executor.run();
    });
    future.wait();

    return bytes_read;
}

关于c++ - boost:asio::read 或 boost:asio::async_read 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43192898/

相关文章:

c++ - 尝试解除分配 char 数组 : _BLOCK_TYPE_IS_VALID(pHead->nBlockUse) 的指针时出错

c# - NAudio Asio 不支持的源流格式

c++ - boost 大体的asio post请求

c++ - 如何在 GLUT 上加载 bmp 以将其用作纹理?

c++ - 当用作模板参数时,C++编译器不区分类型和函数名称

c++ - 处理对继承的循环依赖

c++ - Boost::asio socket - 如何在 'throw' 中创建 read_some "timeout"?

c++ - 手动打印 N 字节整数

c++ - boost 从 tcp 套接字接收数据