c++ - 无限执行boost asio async_read_until

标签 c++ asynchronous boost network-programming boost-asio

在 boost::asio::async_read_until 之后,我面临周期性的无限等待处理程序调用。这出现在两种情况下:

  • 在服务器上,尽管客户端 boost::asio::async_write 处理程序是 调用没有任何错误。此行为仅在客户端时出现 连接到服务器后发送它的第一个命令。
  • 在客户端上,经过几分钟的命令交换,在这种情况下 服务器端 boost::asio::async_write 处理程序被调用,但是客户端 继续等待 boost::asio::async_read_until 处理程序。

鉴于不是每次都有,在我看来,我使用 async_read_until、async_write 和 boost::asio::strand 不正确。

这是简化的服务器架构:

  • 读取链下套接字
  • 接收到新命令后,另一个异步读取开始,接收到的命令在另一个线程中异步处理(并且没有链)
  • 处理命令后,使用 strand 下的 async_write 将结果发送回同一个套接字

(一些发送到服务器的命令不需要响应,有时需要在服务器上发生某些事件后向客户端发送命令,即不从客户端获取数据。值得澄清的是在同一个套接字上永远不要同时运行多个 async_read_until 操作和多个 async_write 操作。)

还有客户的:

  • 连接后向服务器发送命令
  • 收到服务器命令后,返回一个应答

服务器代码:

#include <iostream>
#include <istream>
#include <ostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

class server_session
    : public boost::enable_shared_from_this<server_session>
{
public:
    server_session(boost::asio::io_service& io_service)
        : _io_service(io_service)
        , _socket(io_service)
        , _strand(io_service)
        , _delimiter('\b')
    {}

    boost::asio::ip::tcp::socket& socket()
    {
        return _socket;
    }

    void read()
    {
        boost::asio::async_read_until(
            _socket,
            _streambuf,
            _delimiter,
            _strand.wrap(
                boost::bind(
                    &server_session::handle_read,
                    shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred
                    )
                )
            );
    }

    void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
    {
        if (ec)
        {
            std::cerr << "async_read_until error: " << ec.message() << std::endl;
            return;
        }

        std::istream is(&_streambuf);
        std::string msg;
        std::getline(is, msg, _delimiter);

        read();

        _io_service.post(
            boost::bind(
                &server_session::proc_msg,
                shared_from_this(),
                msg
                )
            );
    }

    void proc_msg(const std::string msg)
    {
        // command proc here
        std::cout << "received: " << msg << std::endl;

        static std::uint64_t i = 0;
        write("resp_" + std::to_string(i++));
    }

    void write(const std::string& msg)
    {
        _strand.post(
            boost::bind(
                &server_session::write_impl,
                shared_from_this(),
                boost::shared_ptr<std::string>(new std::string(msg + _delimiter))
                )
            );
    }

private:
    void write_impl(const boost::shared_ptr<std::string>& text_ptr)
    {
        boost::asio::async_write(
            _socket,
            boost::asio::buffer(text_ptr->data(), text_ptr->size()),
            _strand.wrap(
                    boost::bind(
                        &server_session::handle_write,
                        shared_from_this(),
                        text_ptr,
                        boost::asio::placeholders::error
                        )
                    )
            );
    }

    void handle_write(const boost::shared_ptr<std::string>, const boost::system::error_code& ec)
    {
        if (ec)
        {            
            std::cerr << "async_write error: " << ec.message() << std::endl;
        }
    }

    boost::asio::io_service& _io_service;
    boost::asio::ip::tcp::socket _socket;
    boost::asio::strand _strand;
    boost::asio::streambuf _streambuf;
    char _delimiter;
};

class server
{
public:
    server(int port)
        : _acceptor(
            _io_service,
            boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),port)
            )
    {
        start_accept();

        boost::thread_group thd_grp;

        for (int i = 0; i < 2; ++i)
        {
            thd_grp.create_thread(
                boost::bind(&boost::asio::io_service::run, &_io_service)
                );
        }

        thd_grp.join_all();
    }

private:
    void start_accept()
    {
        _session_ptr.reset(new server_session(_io_service));

        _acceptor.async_accept(
            _session_ptr->socket(),
            boost::bind(
                &server::handle_accept,
                this,
                boost::asio::placeholders::error
                )
            );
    }

    void server::handle_accept(const boost::system::error_code& ec)
    {
        if (ec)
        {
            std::cerr << "handle_accept error: " << ec.message() << std::endl;
            return;
        }

        _session_ptr->read();
        start_accept();
    }

    boost::asio::io_service _io_service;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::shared_ptr<server_session> _session_ptr;
};

int main(int argc, char** argv)
{
    if (argc != 2)
    {
        std::cerr << "usage: " << argv[0] << " <port>";
        return EXIT_FAILURE;
    }

    server s(std::stoi(argv[1]));
    return EXIT_SUCCESS;
}

客户代码:

#include <iostream>
#include <istream>
#include <ostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>

class client
{
public:
    client(const std::string& host, const std::string& port)
        : _socket(_io_service)
        , _resolver(_io_service)
        , _query(host, port)
        , _delimiter('\b')
    {
        _iterator = _resolver.resolve(_query);

        boost::asio::async_connect(
            _socket, 
            _iterator,
            boost::bind(
                &client::handle_connect,
                this,
                boost::asio::placeholders::error
                )
            );

        _io_service.run();
    }

    void handle_connect(const boost::system::error_code& ec)
    {
        if (ec)
        {
            std::cerr << "async_connect error: " << ec.message() << std::endl;
            return;
        }

        write();
    }

    void write()
    {
        static std::uint64_t i = 0;
        boost::shared_ptr<std::string> msg_ptr(new std::string("req_" + std::to_string(i++) + _delimiter));

        boost::asio::async_write(
            _socket,
            boost::asio::buffer(msg_ptr->data(), msg_ptr->size()),
            boost::bind(
                &client::handle_write,
                this,
                msg_ptr,
                boost::asio::placeholders::error
                )
            );
    }

    void handle_write(const boost::shared_ptr<std::string>, const boost::system::error_code& ec)
    {
        if (ec)
        {
            std::cerr << "async_write error: " << ec.message() << std::endl;
            return;
        }

        read();
    }

    void read()
    {
        boost::asio::async_read_until(
            _socket,
            _streambuf,
            _delimiter,
            boost::bind(
                &client::handle_read,
                this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred
                )
            );
    }

    void handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
    {
        if (ec)
        {            
            std::cerr << "async_read_until error: " << ec.message() << std::endl;
            return;
        }

        std::istream is(&_streambuf);
        std::string msg;
        std::getline(is, msg, _delimiter);

        std::cout << "received: " << msg << std::endl;

        write();
    }

    boost::asio::io_service _io_service;     
    boost::asio::ip::tcp::socket _socket;
    boost::asio::ip::tcp::resolver _resolver;
    boost::asio::ip::tcp::resolver::query _query;
    boost::asio::ip::tcp::resolver::iterator _iterator;
    boost::asio::streambuf _streambuf;
    char _delimiter;
};

int main(int argc, char** argv)
{
    if (argc != 3)
    {
        std::cerr << "usage: " << argv[0] << " <host> <port>";
        return EXIT_FAILURE;
    }

    client c(argv[1], argv[2]);
    return EXIT_SUCCESS;
}

当 socket 上的 async_read_until 已经运行,而 strand 被使用时,async_write 是否正确?

async_read_until 是否有可能以某种方式在内部阻塞套接字,事实上,数据没有发送到客户端?

对于代码可能无法正常工作的任何建议,我将不胜感激

我使用的是 boost 1.58,平台 - Win7 和 Win8。在本地主机和 LAN 上测试,结果如上。

最佳答案

我可以告诉您至少您应该使用 asio::deadline_timer 并在每次开始新的异步操作时重置计时器。您尤其应该使用 async_read_until 之类的操作来执行此操作,因为您正在指定可能永远不会满足的调用完成条件。查看 deadline_timer 示例并在每个异步操作上实现 async_wait,指定该操作的截止日期。在超时操作中,您可以根据需要取消挂起的异步操作、出错或重新启动。

关于c++ - 无限执行boost asio async_read_until,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31063050/

相关文章:

c# - 使用 Task.Run 而不是 Delegate.BeginInvoke

c++ - 最新版本的 boost 和 boost 几何库 (GGL) 之间的冲突

c++ - 接受对象作为参数的构造函数发生了什么?

c++ - Qt QDataStream : operator>> for quint16 - I didn't get it at all

javascript - Sails js回调函数

c# - foreach 循环迭代中的多个异步/等待调用

c++ - 全新下载的 boost 中缺少 Jam 文件

c++ - boost:从机器获取带有当前时区的当前 local_date_time

c++ - 堆数据困惑

c++ - 从另一个窗口上的控件获取文本时出现问题