c++ - 间歇性地没有数据通过 boost::asio/io 完成端口传递

标签 c++ windows performance boost-asio io-completion-ports

问题

我正在使用 boost::asio对于同一台机器上的两个进程使用 TCP/IP 进行通信的项目。一个生成数据供另一个读取,但我遇到了间歇性地没有数据通过连接发送的问题。我将其归结为下面一个非常简单的示例,基于 async tcp echo server example .

流程(下面的源代码)开始时运行良好,以快速的速度将数据从发送方传送到接收方。然后突然间,在大约五秒钟内根本没有传送任何数据。然后再次传送数据,直到下一次莫名其妙的停顿。在这五秒钟内,进程消耗 0% CPU,并且似乎没有其他进程做任何特别的事情。停顿的长度始终相同 - 五秒。

我正试图弄清楚如何摆脱这些摊位以及造成这些摊位的原因。

整个运行期间的 CPU 使用情况:

CPU usage during a single run

请注意在运行过程中 CPU 使用率如何出现三个下降 - “运行”是服务器进程和客户端进程的单次调用。在这些下降期间,没有数据被传送。下降的次数和时间因运行而异 - 有时根本没有下降,有时很多。

我可以通过更改读取缓冲区的大小来影响这些停顿的“概率”——例如,如果我将读取缓冲区设置为发送 block 大小的倍数,那么这个问题几乎 会消失,但不会完全消失。

来源和测试说明

我已经使用 Visual Studio 2005、Boost 1.43 和 Boost 1.45 编译了以下代码。我已经在 Windows Vista 64 位(四核)和 Windows 7 64 位(四核和双核)上进行了测试。

服务器接受连接,然后简单地读取和丢弃数据。每当执行读取时,都会发出新的读取。

客户端连接到服务器,然后将一堆数据包放入发送队列。在此之后,它一次写入一个数据包。每当写入完成时,将写入队列中的下一个数据包。一个单独的线程监视队列大小并每秒将其打印到标准输出。在 io 停顿期间,队列大小保持完全相同。

我试过使用scatter io(在一个系统调用中写入多个数据包),但结果是一样的。如果我使用 BOOST_ASIO_DISABLE_IOCP 在 Boost 中禁用 IO 完成端口,问题似乎会消失,但代价是吞吐量显着降低。

// Example is adapted from async_tcp_echo_server.cpp which is
// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Start program with -s to start as the server
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif                      

#include <iostream>
#include <tchar.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

#define PORT "1234"
using namespace boost::asio::ip;
using namespace boost::system;

class session {
public:
    session(boost::asio::io_service& io_service) : socket_(io_service) {}

    void do_read() {
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
            boost::bind(&session::handle_read, this, _1, _2));
    }

    boost::asio::ip::tcp::socket& socket() { return socket_; }
protected:
    void handle_read(const error_code& ec, size_t bytes_transferred) {
        if (!ec) {
            do_read();
        } else {
            delete this;
        }
    }

private:
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class server {
public:
    explicit server(boost::asio::io_service& io_service)
        : io_service_(io_service)
        , acceptor_(io_service, tcp::endpoint(tcp::v4(), atoi(PORT)))
    {
        session* new_session = new session(io_service_);
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, _1));
    }

    void handle_accept(session* new_session, const error_code& ec) {
        if (!ec) {
            new_session->do_read();
            new_session = new session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&server::handle_accept, this, new_session, _1));
        } else {
            delete new_session;
        }
    }

private:
    boost::asio::io_service& io_service_;
    boost::asio::ip::tcp::acceptor acceptor_;
};

class client {
public:
    explicit client(boost::asio::io_service &io_service)
        : io_service_(io_service)
        , socket_(io_service)
        , work_(new boost::asio::io_service::work(io_service))
    {
        io_service_.post(boost::bind(&client::do_init, this));
    }

    ~client() {
        packet_thread_.join(); 
    }

protected:

    void do_init() {
        // Connect to the server
        tcp::resolver resolver(io_service_);
        tcp::resolver::query query(tcp::v4(), "localhost", PORT);
        tcp::resolver::iterator iterator = resolver.resolve(query);
        socket_.connect(*iterator);

        // Start packet generation thread
        packet_thread_.swap(boost::thread(
                boost::bind(&client::generate_packets, this, 8000, 5000000)));
    }

    typedef std::vector<unsigned char> packet_type;
    typedef boost::shared_ptr<packet_type> packet_ptr;

    void generate_packets(long packet_size, long num_packets) {
        // Add a single dummy packet multiple times, then start writing
        packet_ptr buf(new packet_type(packet_size, 0));
        write_queue_.insert(write_queue_.end(), num_packets, buf);
        queue_size = num_packets;
        do_write_nolock();

        // Wait until all packets are sent.
        while (long queued = InterlockedExchangeAdd(&queue_size, 0)) {
            std::cout << "Queue size: " << queued << std::endl;
            Sleep(1000);
        }

        // Exit from run(), ignoring socket shutdown
        work_.reset();
    }

    void do_write_nolock() {
        const packet_ptr &p = write_queue_.front();
        async_write(socket_, boost::asio::buffer(&(*p)[0], p->size()),
            boost::bind(&client::on_write, this, _1));
    }

    void on_write(const error_code &ec) {
        if (ec) { throw system_error(ec); }

        write_queue_.pop_front();
        if (InterlockedDecrement(&queue_size)) {
            do_write_nolock();
        }
    }

private:
    boost::asio::io_service &io_service_;
    tcp::socket socket_;
    boost::shared_ptr<boost::asio::io_service::work> work_;
    long queue_size;
    std::list<packet_ptr> write_queue_;
    boost::thread packet_thread_;
};

int _tmain(int argc, _TCHAR* argv[]) {
    try {
        boost::asio::io_service io_svc;
        bool is_server = argc > 1 && 0 == _tcsicmp(argv[1], _T("-s"));
        std::auto_ptr<server> s(is_server ? new server(io_svc) : 0);
        std::auto_ptr<client> c(is_server ? 0 : new client(io_svc));
        io_svc.run();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }
    return 0;
}

所以我的问题基本上是:

如何摆脱这些摊位?

是什么导致了这种情况?

更新:似乎与磁盘事件有某种关联,这与我上面所说的相反,所以看起来如果我在测试运行时在磁盘上启动一个大目录拷贝,这可能会增加io 停顿的频率。这可能表明这是 Windows IO Prioritization那开始了?由于暂停的长度始终相同,这听起来确实有点像 OS io 代码中某处的超时...

最佳答案

  • 调整 boost::asio::socket_base::send_buffer_size 和 receive_buffer_size
  • 将 max_length 调整为更大的数字。由于 TCP 是面向流的,所以不要将其视为接收单个数据包。这很可能会导致 TCP 发送/接收窗口之间出现某种“僵局”。

关于c++ - 间歇性地没有数据通过 boost::asio/io 完成端口传递,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4956468/

相关文章:

c++ - 为什么 O(NlogN) 算法所花费的时间与 O(N^2) 相同?

c++ - 支持 g++ 中的类型属性

Java和C++按值传递和按引用传递

c++ - getElementsBytagName 不适用于 DomDocument60

iphone - glClearColor() 将 iPhone 的渲染利用率推至 27%

c - 虚拟分配粒度和页面大小

linux - 使用 Monogame 设置跨平台项目

java - 将文件类型从 JAVA000064 更改为 JAVA

performance - 一个高效的 Javascript 集合结构

java - 在 Java 中转换为面向列的数组