在线程中运行的 C++ UDP 服务器 io_context 在工作开始之前退出

标签 c++ udp boost-asio

我是 C++ 新手,但到目前为止,大多数 asio 的东西都有意义。然而,我正在努力让我的 UDPServer 正常工作。

我的问题可能类似于:Trying to write UDP server class, io_context doesn't block

我认为我的 UDPServer 在将工作分配给它的 io_context 之前就停止了。但是,我在调用 io_context.run() 之前向上下文发出工作,所以我不明白为什么。

当然,我并不完全确定我的上述说法是否正确,并且希望得到一些指导。这是我的类(class):

template<typename message_T>
    class UDPServer
    {
    public:
        UDPServer(uint16_t port)
            : m_socket(m_asioContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), port))
        {
            m_port = port;
        }

        virtual ~UDPServer()
        {
            Stop();
        }



    public:

        // Starts the server!
        bool Start()
        {
            try
            {
                // Issue a task to the asio context
                WaitForMessages();

                m_threadContext = std::thread([this]() { m_asioContext.run(); });
            }
            catch (std::exception& e)
            {
                // Something prohibited the server from listening
                std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << e.what() << "\n";
                return false;
            }

            std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
            return true;
        }

        // Stops the server!
        void Stop()
        {
            // Request the context to close
            m_asioContext.stop();

            // Tidy up the context thread
            if (m_threadContext.joinable()) m_threadContext.join();

            // Inform someone, anybody, if they care...
            std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
        }


        void WaitForMessages()
        {

            m_socket.async_receive_from(asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
                [this](std::error_code ec, std::size_t length)
                {
                    if (!ec)
                    {
                        
                        std::cout << "[SERVER @ PORT " << m_port << "] Got " << length << " bytes \n Data: " << vBuffer.data() << "\n" << "Address: " << m_endpoint.address() << " Port: " << m_endpoint.port() << "\n" << "Data: " << m_endpoint.data() << "\n";

                    }
                    else
                    {
                        std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << ec.message() << "\n";
                        return;
                    }

                    WaitForMessages();
                }
            );
        }

        void Send(message_T& msg, const asio::ip::udp::endpoint& ep)
        {
            asio::post(m_asioContext,
                [this, msg, ep]()
                {
                    // If the queue has a message in it, then we must
                    // assume that it is in the process of asynchronously being written.
                    
                    bool bWritingMessage = !m_messagesOut.empty();
                    m_messagesOut.push_back(msg);
                    if (!bWritingMessage)
                    {
                        WriteMessage(ep);
                    }
                }
            );
        }

    private:

        void WriteMessage(const asio::ip::udp::endpoint& ep)
        {

            m_socket.async_send_to(asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
                [this, ep](std::error_code ec, std::size_t length)
                {

                    if (!ec)
                    {

                        m_messagesOut.pop_front();

                        // If the queue is not empty, there are more messages to send, so
                        // make this happen by issuing the task to send the next header.
                        if (!m_messagesOut.empty())
                        {
                            WriteMessage(ep);
                        }
                        
                    }
                    else
                    {
                        std::cout << "[SERVER @ PORT " << m_port << "] Write Header Fail.\n";
                        m_socket.close();
                    }
                });
        }

        void ReadMessage()
        {
        }

    private:
        
        uint16_t m_port = 0;
        asio::ip::udp::endpoint m_endpoint;
        std::vector<char> vBuffer = std::vector<char>(21);


    protected:
        TSQueue<message_T> m_messagesIn;
        TSQueue<message_T> m_messagesOut;
        Message<message_T> m_tempMessageBuf;
        asio::io_context m_asioContext;
        std::thread m_threadContext;
        asio::ip::udp::socket m_socket;
    };
}

现在在主函数中调用代码:

enum class TestMsg {
    Ping,
    Join,
    Leave
};

int main() {
    Message<TestMsg> msg; // Message is a pretty basic struct that I'm not using yet. When I was, I was only receiving the first 4 bytes - which led me down this path of investigation
    msg.id = TestMsg::Join;
    msg << "hello";

    UDPServer<Message<TestMsg>> server(60000);
}

调用时,服务器在有机会打印“[SERVER] Started”之前立即退出

enter image description here

我将尝试按照链接帖子的描述添加工作防护,但我仍然想了解为什么 io_context 没有足够快地启动工作。

最佳答案

更新(现在我也阅读了问题而不仅仅是代码) 在WaitForMessages时您可以通过调用 m_socket.async_receive_from 开始收听函数,因为它是异步的,一旦设置监听,该函数就会返回/解除阻塞。因此,只要您实际上没有客户端向您发送某些内容,您的服务器就没有任何作用。仅当它收到某些内容时,才会通过调用 io_context::run 的线程来调用回调。 。所以你需要工作守卫,以便你的线程运行 run启动后不会立即解锁,但只要工作守卫在,就会阻塞。 通常,如果处理程序中抛出异常并且您仍想继续处理服务器,它还会与 try/while 模式结合使用。

此外,在您发布的代码中,您从未真正调用 UDPServer::Start !


这是我对答案的第一个想法:

This is normal behavior of ASIO. io_context::run一旦函数没有工作要做,它就会返回。

因此要更改 run 的行为函数来阻止你必须使用 boost::asio::executor_work_guard<boost::asio::io_context::executor_type>即所谓的工作 guard 。引用您的 io_context 来构造该对象并保留它,即只要您想让服务器运行就不要让它破坏,即不想让io_context::run没工作就回来。

既然如此

    boost::asio::io_context io_context_;
    boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;

然后你可以打电话

      work_guard_{boost::asio::make_work_guard(io_context_)},

const auto thread_count{std::max<unsigned>(std::thread::hardware_concurrency(), 1)};

std::generate_n(std::back_inserter(this->io_run_threads_),
                thread_count,
                [this]() {
                    return std::thread{io_run_loop,
                                       std::ref(this->io_context_), std::ref(this->error_handler_)};
                });

void io_run_loop(boost::asio::io_context &context,
                                    const std::function<void(std::exception &)> &error_handler) {
    while (true) {
        try {
            context.run();
            break;
        } catch (std::exception &e) {
            error_handler(e);
        }
    }

}

然后关闭服务器:

work_guard_.reset();
io_context_.stop();
std::for_each(this->io_run_threads_.begin(), this->io_run_threads_.end(), [](auto &thread) {
    if (thread.joinable()) thread.join();
});

为了更正常地关闭,您可以省略 stop调用并关闭所有套接字之前。

关于在线程中运行的 C++ UDP 服务器 io_context 在工作开始之前退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69457434/

相关文章:

c++ - 确定套接字中的可用字节数

c++ - Boost套接字读取功能不起作用

c++ - native-app 和 chrome-extension 之间的通信

c++ - 与 double 混合时使用 int 和 unsigned int 之间的速度差异

c++ - 访问嵌套类中变量的值

C# 套接字 : Why does socket. LocalEndPoint 更改?

c++ - GCC 内联 ASM 保证

c - 通过 UDP c 发送结构

tcp - YouTube 是否通过 TCP 流式传输视频?

c++ - 在前一个完成之前再次调用 boost ASIO async_receive()