c++ - 如何消除动态销毁 boost::asio 实体时的崩溃?

标签 c++ boost boost-asio asio

注意!!!这个问题是针对 boost::asio 库 方面的专家的。不幸的是,我无法将代码做得更紧凑,它包含描述问题的最小量。该代码是示例,人工创建的。已知崩溃的地方并在评论中进行了描述,它旨在说明崩溃! 不需要任何调试代码的帮助...

问题是如何设计 asio 服务器,而不是 - 它在哪里崩溃!!!

这个例子接近官方 boost::asio 文档中的“聊天服务器”设计。但是,与官方示例不同,在官方示例中,仅动态创建/销毁连接类的对象,在我的示例中,服务器及其连接类实体都是动态创建/销毁的......我确信这种模式的实现应该在asio爱好者中是众所周知的,下面描述的问题应该已经有人解决了......

请查看代码。 在这里,CAsioServer 和 CAsioConnection 的实体是动态创建和销毁的。

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>


class CAsioConnection
    : public std::enable_shared_from_this<CAsioConnection>
{
public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
        : socket_(std::move(socket)), connections_(connections)
    {
        std::cout << "-- CAsioConnection is creating, socket: " << socket_.native_handle() << "\n";
    }

    virtual ~CAsioConnection()
    {
        std::cout << "-- CAsioConnection is destroying , socket: " << socket_.native_handle() << "\n";
    }

    void read() { do_read(); }

private:
    void do_read(void)
    {
        uint8_t buff[3];

        asio::async_read(socket_, asio::buffer(buff,3),
            [this](std::error_code ec, std::size_t  /*length*/) {
            if (!ec)
            {
                do_read();
            }
            else
            {
                std::cout << "-- CAsioConnection::do_read() error : " << ec.message() << "\n";
                // Here is the crash N2
                connections_.erase(shared_from_this());
                // Crash may be fixed by the code below
                //if (ec.value() != 1236) // (winerror.h) #define ERROR_CONNECTION_ABORTED 1236L
                //  connections_.erase(shared_from_this());
            }
        });
    }

    asio::ip::tcp::socket socket_;
    std::set<CAsioConnection::PtrType>& connections_;
};

class CAsioServer
    : public std::enable_shared_from_this<CAsioServer>
{
public:
    using PtrType = std::shared_ptr<CAsioServer>;

    CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
        : port_(port), acceptor_(io, endpoint)
    {
        std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
    }

    virtual ~CAsioServer()
    {
        std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
    }

    int port(void) { return port_; }

    void accept(void) { do_accept(); }
private:
    void do_accept()
    {
        acceptor_.async_accept([this](std::error_code ec, asio::ip::tcp::socket socket) {
            if (!ec)
            {
                std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
                auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
                connections_.insert(c);
                c->read();
            }
            else
            {
                // Here is the crash N1
                std::cout << "-- CAsioServer::do_accept() error : " << ec.message() << "\n";
                // Crash may be fixed by the code below
                //if (ec.value() == 995) // (winerror.h) #define ERROR_OPERATION_ABORTED 995L
                //  return;
            }
            // Actually here is the crash N1 )), but the fix is above...
            do_accept();
        });
    }

    int port_;
    asio::ip::tcp::acceptor acceptor_;
    std::set<CAsioConnection::PtrType> connections_;
};

//*****************************************************************************

class CTcpBase
{
public:
    CTcpBase()
    {
        // heart beat timer to keep it alive
        do_heart_beat();
        t_ = std::thread([this] {
            std::cout << "-- io context is RUNNING!!!\n";
            io_.run();
            std::cout << "-- io context has been STOPED!!!\n";
        });
    }

    virtual ~CTcpBase()
    {
        io_.stop();

        if (t_.joinable())
            t_.join();
    }

    void add_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    return;

            auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port);
            auto s = std::make_shared<CAsioServer>(port, io_, endpoint);
            s->accept();
            servers_.insert(s);
        });
    }

    void remove_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    { servers_.erase(s); return; }
        });
    }

private:

    void do_heart_beat(void)
    {
        std::cout << "-- beat\n";
        auto timer = std::make_shared<asio::steady_timer>(io_, asio::chrono::milliseconds(3000));
        timer->async_wait([timer, this](const asio::error_code& ec) {
            do_heart_beat();
        });
    }

    asio::io_context io_;
    std::thread t_;
    std::set<CAsioServer::PtrType> servers_;
};

//*****************************************************************************

int main(void)
{
    CTcpBase tcp_base;

    std::cout << "CONNECT the server to port 502\n";
    tcp_base.add_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(20));
    
    std::cout << "REMOVE the server from port 502\n";
    tcp_base.remove_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}

假设 CTcpBase::add_server()CTcpBase::remove_server() 将由不同线程的外部客户端调用。 asio 上下文在它自己的线程中处理它。 让我们考虑两种情况:

  1. 启动应用程序并等待半分钟。 崩溃发生在 CAsioServer::do_accept() 中,请参阅下面的输出。 Debug Console Output
  2. 开始申请。由任何外部客户端连接到端口 502,并等待不到 20 秒。 崩溃发生在 CAsioConnection::do_read() 中,请参阅下面的输出。 Debug Console Output

当其类的实体已被销毁时,asio 框架似乎会调用推迟的 asio::async_read()acceptor_.async_accept() 处理程序。

我已经通过错误检查修复了处理程序,但该解决方案似乎并不可靠。谁知道可能还会出现什么其他错误和场景...有时,当客户端断开连接时,我需要清理 asio::async_read() 设置的 connection_,我该怎么办确定服务器或连接对象仍然存在?...

有什么方法可以要求 boost::asio 框架阻止为已经被销毁的对象调用推迟的处理程序吗?或者如何通过错误代码识别(100%确定)对象已被销毁?或者我在 asio 范围内还有其他解决方案或设计模式 - 如何在一个运行线程中处理动态创建/销毁的服务器及其连接,而无需互斥体和其他东西......

最佳答案

首先检查您的 io_service 严格单线程运行。这从代码中看不出来。如果不是,则共享状态(如 connections_ )需要同步访问。

In fact you can have a logical strand in the form of the accept loop, but to take advantage of this you should make all accesses to connections_ happen there, see e.g.

更新

  • buff是一个局部变量,这会导致未定义的行为,因为它在 async_read 操作的整个时间内都无效。

  • 一般来说,shared_from_this 并没有什么意义。习语还保留一个共享指针的容器,它已经决定了生命周期。

    您的问题似乎是有时CAsioServer被简单地破坏了,这意味着 connections_ 的所有元素被释放,当时他们的 CAsioConnection物体可能会被破坏。它还会破坏CAsioServer .

    每当 Asio 对象被破坏时,任何挂起的异步操作都会失败,并显示 asio::error:operation_aborted ,这确实意味着你已经回应了。然而,当调用完成处理程序时,该对象已经变得无效。

    my comment我刚刚注意到缺少一个关键要素:你永远不会捕获/绑定(bind)共享指针到 CAsioConnection在任何完成处理程序中

    这非常不习惯。

    相反,您使用共享指针来控制生命周期。如果您还需要一个连接列表,则将其设为弱指针列表,以便它仅观察生命周期。

变化点:

  • 无需让服务器启用_shared_from_this

  • connections_应该保存弱指针甚至非拥有指针。弱指针在这里显然要安全得多。事实上,您可以选择删除该容器,因为似乎没有任何东西在使用它。在下面的示例中,我选择保留它,以便您可以看到它的实际效果。

  • 捕获shared_from_this在完成处理程序中,以确保对象在触发时仍然有效:

     asio::async_read(socket_, asio::buffer(buff,3),
         [this, self=shared_from_this()](error_code ec, std::size_t  /*length*/) {
    

简化

注意我选择了std::list因为它消除了对相等/排序的需要(参见 std::owner_less<> ),由于在 CAsioConnection 内存储对容器的引用的方式,这变得很难看。类 - 使其循环依赖(在实例化 CAsioConnection 类之前,owner_less<> 类型尚未完成)。我只是选择退出(不需要的?)复杂性。

<强> Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <list>
#include <memory>

namespace asio = boost::asio;
using error_code = boost::system::error_code; // compat

class CAsioConnection : public std::enable_shared_from_this<CAsioConnection> {
  public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {
        log(__FUNCTION__);
    }

    ~CAsioConnection() { log(__FUNCTION__); }

    void read() { do_read(); }

  private:
    void log(std::string_view msg) const {
        error_code ec;
        std::clog << msg << ", socket: " << socket_.remote_endpoint(ec) << "\n";
    }

    uint8_t buff[256];
    void do_read() {
        asio::async_read(socket_, asio::buffer(buff),
             [this, self = shared_from_this()](error_code ec, std::size_t length) {
                 if (!ec) {
                     log(__FUNCTION__ + (" length: " + std::to_string(length)));
                     do_read();
                 } else {
                     log(__FUNCTION__ + (" error: " + ec.message()));
                 }
             });
    }

    asio::ip::tcp::socket socket_;
};

class CAsioServer {
  public:
    CAsioServer(asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
            : acceptor_(io, endpoint) { log(__FUNCTION__); }

    ~CAsioServer() { log(__FUNCTION__); }
    int port() const { return acceptor_.local_endpoint().port(); }
    void accept() { do_accept(); }

  private:
    void do_accept() {
        acceptor_.async_accept([this](error_code ec,
                                      asio::ip::tcp::socket socket) {
            if (!ec) {
                auto c = std::make_shared<CAsioConnection>(std::move(socket));
                connections_.push_back(c);
                c->read();
            } else {
                log(__FUNCTION__ + (" error: " + ec.message()));
            }

            connections_.remove_if(std::mem_fn(&WeakPtr::expired));

            if (acceptor_.is_open())
                do_accept();
        });
    }

    void log(std::string_view msg) const {
        std::clog << msg << ", port: " << port() << "\n";
    }

    asio::ip::tcp::acceptor acceptor_;
    using WeakPtr = std::weak_ptr<CAsioConnection>;
    std::list<WeakPtr> connections_;
};

int main() {
    boost::asio::io_context io;

    CAsioServer server(io, { {}, 7878 });
    server.accept();

    io.run_for(std::chrono::seconds(10));
}

输出:

./a.out& sleep 1; nc -w 1 127.0.0.1 7878 < main.cpp
CAsioServer, port: 7878
CAsioConnection, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() error: End of file, socket: 127.0.0.1:50628
~CAsioConnection, socket: 127.0.0.1:50628
~CAsioServer, port: 7878

关于c++ - 如何消除动态销毁 boost::asio 实体时的崩溃?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63711285/

相关文章:

c++ - 异步写入套接字和用户值(boost::asio 问题)

c++ - ConnectNamedPipe 和 asio overlapped ptr

udp - 与 char 数组相比,在 boost ASIO 中使用可变缓冲区对象有什么优势?

c++ - 如果不为空,则为指针赋值

c++ - 跳过迭代器

c++ - 如何理解C++ primer 3rd中的句子

c++ - 作为参数给在 boost::thread_group 中调用的成员函数的指针为 null

windows - FindBoost.cmake : Use different Compiler-version (vc90 instead of vc100)

c++ - 为什么要使用 'extern "C+ +"' ?

python - 如何在C++中展平颜色直方图?