c++ - 为什么 boost::asio::async_read 无法读取请求的字节数?

标签 c++ boost boost-asio

我遇到了一个问题,boost::asio::async_read 在第二次调用时以一种奇怪的方式失败了:

std::atomic<error_code> ec(asio::error::would_block);
size_t len = 0;

// 1st call
asio::async_read(socket,
                 asio::buffer(buffer+X),
                 asio::transfer_exactly(512-X),
                 [&] (error_code const& err, size_t bytesTransferred)
                 {
                     len = bytesTransferred;
                     ec.store(err, std::memory_order_release);
                 }
);
/////// ... wait for read to complete ...
// 2nd call
asio::async_read(socket,
                 asio::buffer(buffer),
                 asio::transfer_exactly(512),
                 [&] (error_code const& err, size_t bytesTransferred)
                 {
                     len = bytesTransferred;
                     ec.store(err, std::memory_order_release);
                 }
);

常量 X 是因为我已经有一些数据是通过另一种方式获得的, 所以第一次阅读较小。假设 X=364,那么第一次 bytesTransferred 将是 148。然而,我的问题是第二次读取再次返回了 148 个字节,即使那个读取正好是 512 个字节。

我很困惑。第二次调用没有错误条件(我检查了 err)。 bytesTransferredaync_read传给我的一个参数,两次都是148字节。第一次,它与堆栈中更高的 asio::transfer_exactly(148) 匹配。第二次调用堆栈显然有一个 asio::transfer_exactly(512)。这里发生了什么?

不过,第二次调用很特别,第三次调用再次读取 512 字节,但也获得了 512 字节。

[MCVE]

#include <iostream>
#include <atomic>

#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/read.hpp>

// Minimal example, code that works has error checking removed. Class members turned itno globals etc.
namespace {
    boost::asio::io_service io_service;
    boost::asio::ip::tcp::resolver resolver(io_service);
    boost::asio::ip::tcp::socket sock(io_service);
    std::vector<char> data(512);
    boost::asio::mutable_buffers_1 buffer(&data[0], data.size());
    unsigned read_counter = 1;
    std::atomic<unsigned> read_timeout;
}

boost::system::error_code openSocket(const std::string &server,
                                     const std::string &port)
{
    boost::system::error_code error = boost::asio::error::host_not_found;
    using boost::asio::ip::tcp;
    tcp::resolver::query query(server, port);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;

    while (error && endpoint_iterator != end)
    {
        sock.close();
        sock.connect(*endpoint_iterator++, error);
    }
    if (error)
    {
        std::cerr << "No route\n";
        sock.close(); // Would be wrong to leave it open.
    }
    return error;
}

int read(size_t bytesNeeded)
{
    size_t buffer_len = boost::asio::buffer_size(buffer);
    size_t byteShift = buffer_len - bytesNeeded; // Read into back of buffer.

    const int timeoutSeconds = 10;
    boost::asio::deadline_timer deadline(io_service);
    deadline.expires_from_now(boost::posix_time::seconds(timeoutSeconds)); // This will reset any outstanding timer
    read_counter += 2; // If we'd use +1, after 4 billion cycles it would reset to 0
    read_timeout.store(0, std::memory_order_release); // 0 = no timeout.
    unsigned read_counter_copy = read_counter; // Can't capture global.
    deadline.async_wait([read_counter_copy](boost::system::error_code const&) {
        // read_timeout is very intentionally captured by value - timeout events are numbered
        read_timeout.store(read_counter_copy, std::memory_order_release); }
    );

    // Start reading "asynchronously", wait for completion or timeout:
    std::atomic<boost::system::error_code> ec(boost::asio::error::would_block);
    size_t len = 0;

    boost::asio::async_read(sock, boost::asio::buffer(buffer + byteShift), boost::asio::transfer_exactly(bytesNeeded),
        [&, bytesNeeded](boost::system::error_code const& err, size_t bytesTransferred)
        {
            if (bytesTransferred != bytesNeeded) {
                std::cout << bytesTransferred << " , " << err.message() << std::endl;
            }
            len = bytesTransferred;
            ec.store(err, std::memory_order_release);
        }
    );

    do {
        io_service.run_one();
    } while (read_timeout.load(std::memory_order_acquire) != read_counter && // Continue if the **last** read didn't time out
        (ec.load(std::memory_order_acquire) == boost::asio::error::would_block) && // ec.store() not called,
        !io_service.stopped()); // and program still running.

    deadline.cancel(); // This will set read_timeout, if it wasn't set yet. But we ignore it from now on.

    if (ec.load(std::memory_order_acquire))
    {
        std::cerr << "oops\n"; // Real error handling omitted.
        throw std::runtime_error("");
    }
    else if (read_timeout == read_counter)
    {
        std::cerr << "timeout\n";
    }
    else if (len != bytesNeeded)
    {
        // This is the real problem.
        std::cerr << "Asked " << bytesNeeded << " got " << len;
    }
    return (int)len;
}

int main(int argc, char* argv[])
{
    do try {
        ::openSocket("192.168.6.30", "80");

        read(148); // Assume that data[] already has 364 bytes on the first call.
        for (;;)
        {
            read(512); // Full buffers on every subsequent call.
            // Do something with data[] here.
        }
    }
    catch (std::runtime_error) { } while (true);
}

do try catch while 是必需的,因为错误只会在我拔下另一端后发生。第二次调用 read(148) 后,下一个 read(512)` 失败。

[更新] 这不仅仅是 transfer_exactly。使用 transfer_at_least(512) 我也遇到了同样的问题,一个多余的 148 字节读取。 (两者的行为应该相同,因为将至少 512 个字节读入缓冲区,而只有 512 个字节不能读取更多或更少的字节)

最佳答案

通过忽略不正确的读取操作暂时“解决”了它。我很幸运,因为我可以处理未知数量的丢失数据,并在稍后与流重新同步。但看起来我以后将不得不放弃 Boost::Asio,因为我不能再容忍丢失的数据。

关于c++ - 为什么 boost::asio::async_read 无法读取请求的字节数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35658228/

相关文章:

c++ - 用于切换 .h 和相应 .cpp 的 Visual Studio 2017 按钮

c++ - 使用 OpenCV 保存 DFT 的频谱

c++ - boost asio,如何取消异步操作

c++ - "Body requirements not met"将请求传递给 c++ 中的方法(Boost Beast 库)

c++ - 将套接字绑定(bind)到具有未知 Ip 的特定接口(interface)

c++ - 传递 boost::asio::ip::tcp::socket

c++ - 构造函数继承的虚拟继承

c++ - C++中的空嵌套初始化列表

c++ - boost::interprocess 互斥锁崩溃而不是等待锁?

c++ - Boost.Asio - 具有自定义缓冲区的多个缓冲区