c++ - ASIO - 如何停止简单的基于协程的服务器?

标签 c++ boost-asio boost-coroutine

我有以下简单的基于协程的服务器:

class Server
{
private:
  boost::asio::io_service        Service;
  boost::asio::ip::tcp::acceptor Acceptor;
  boost::asio::ip::tcp::socket   Socket;

private:
  void Accept(boost::asio::yield_context Yield);
  void Write(boost::asio::yield_context Yield);

public:
       Server(): Acceptor(Service), Socket(Service) {}
  void Open(unsigned short PortNum);
  void Run();
  void Stop();
};

void Server::Accept(boost::asio::yield_context Yield)
{
boost::system::error_code ec;

for (;;)
  {
  Socket.close();
  Acceptor.async_accept(Socket,Yield[ec]);
  spawn(Yield,std::bind(&Server::Write,this,Yield[ec]));
  }
}

void Server::Write(boost::asio::yield_context Yield)
{
char InBuffer[1024]= {};
std::size_t Size;
boost::system::error_code ec;
double Data= 6.66;

for (;;)
  {
  boost::asio::streambuf OutBuffer;
  std::ostream os(&OutBuffer);

  Size= Socket.async_read_some(boost::asio::buffer(InBuffer),Yield[ec]);
  if (ec)
    break;
  os.write(reinterpret_cast<const char *>(&Data),sizeof(double));
  Socket.async_write_some(OutBuffer.data(),Yield[ec]);
  if (ec)
    break;
  }
}

void Server::Open(unsigned short PortNum)
{
Acceptor.open(boost::asio::ip::tcp::v4());
Acceptor.bind({{},PortNum});
Acceptor.listen();
}

void Server::Run()
{
spawn(Service,std::bind(&Server::Accept,this,std::placeholders::_1));
Service.run();
}

void Server::Stop()
{
Service.stop();
}

我想在一个线程上运行这个服务器,并在主程序即将完成时干净地停止它:

int main()
{
Server s;

s.Open(1024);

std::thread Thread(&Server::Run,&s);

Sleep(10'000);
s.Stop();
Thread.join();
}

不幸的是,如果有连接的套接字,当我调用 Stop 时会抛出异常 boost::coroutines::detail::forced_unwind

我也曾尝试创建一个显式的 strand 并在停止之前调度一个 Socket.close(),结果相同。

这种做法有什么问题吗?

最佳答案

I’m having trouble trying to stop gracefully a similar server ( stackoverflow.com/questions/50833730/…). – metalfox 4 hours ago

这是一个显示如何处理的最小变化

  • 关闭 session 的 Exit 命令
  • 关闭服务器的 Shutdown 命令(因此它停止接受连接并在最后一个 session 退出后终止)

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>

using boost::asio::ip::tcp;
using boost::system::error_code;
using boost::asio::streambuf;

int main() {
    boost::asio::io_service svc;

    tcp::acceptor a(svc);
    a.open(tcp::v4());
    a.set_option(tcp::acceptor::reuse_address(true));
    a.bind({{}, 6767}); // bind to port 6767 on localhost
    a.listen(5);

    using session = std::shared_ptr<tcp::socket>;

    std::function<void()>        do_accept;
    std::function<void(session)> do_session;

    do_session = [&](session s) {
        // do a read
        auto buf = std::make_shared<boost::asio::streambuf>();
        async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
            if (ec)
                std::cerr << "read failed: " << ec.message() << "\n";
            else {
                std::istream is(buf.get());
                std::string line;
                while (getline(is, line)) // FIXME being sloppy with partially read lines
                {
                    async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
                        if (ec) std::cerr << "write failed: " << ec.message() << "\n";
                    });
                    if (line == "Exit") {
                        std::cout << "Exit received\n";
                        return;
                    }
                    if (line == "Shutdown") {
                        std::cout << "Server shutdown requested\n";
                        a.close();
                        return;
                    }
                }

                do_session(s); // full duplex, can read while writing, using a second buffer
            }

        });
    };

    do_accept = [&] {
        auto s = std::make_shared<session::element_type>(svc);

        a.async_accept(*s, [&,s](error_code ec) {
            if (ec)
                std::cerr << "accept failed: " << ec.message() << "\n";
            else {
                do_session(s);
                do_accept(); // accept the next
            }
        });
    };

    do_accept(); // kick-off
    svc.run();   // wait for shutdown (Ctrl-C or failure)
}

注意示例 session

echo -en "hello world\nExit\n"     | netcat 127.0.0.1 6767
echo -en "hello world\nShutdown\n" | netcat 127.0.0.1 6767

打印

Ack
Ack
Ack
Ack
Exit received
Server shutdown requested
accept failed: Operation canceled

终止命令

如果您想要一个主动关闭所有打开的 session 关闭服务器的“终止”命令,您必须

  • 保留 session 列表
  • 或使用信号

您可以在此处查看这两种方法的代码:Boost ASIO: Send message to all connected clients

与当前示例集成的最简单方法:

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <list>

using boost::asio::ip::tcp;
using boost::system::error_code;
using boost::asio::streambuf;

int main() {
    boost::asio::io_service svc;

    tcp::acceptor a(svc);
    a.open(tcp::v4());
    a.set_option(tcp::acceptor::reuse_address(true));
    a.bind({{}, 6767}); // bind to port 6767 on localhost
    a.listen(5);

    using session = std::shared_ptr<tcp::socket>;
    using sessref = std::weak_ptr<tcp::socket>;

    std::function<void()>        do_accept;
    std::function<void(session)> do_session;
    std::list<sessref> session_list;

    auto garbage_collect_sessions = [&session_list] {
        session_list.remove_if(std::mem_fn(&sessref::expired));
    };

    do_session = [&](session s) {
        // do a read
        auto buf = std::make_shared<boost::asio::streambuf>();
        async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
            if (ec)
                std::cerr << "read failed: " << ec.message() << "\n";
            else {
                std::istream is(buf.get());
                std::string line;
                while (getline(is, line)) // FIXME being sloppy with partially read lines
                {
                    async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
                        if (ec) std::cerr << "write failed: " << ec.message() << "\n";
                    });
                    if (line == "Exit") {
                        std::cout << "Exit received\n";
                        return;
                    }
                    if (line == "Shutdown") {
                        std::cout << "Server shutdown requested\n";
                        a.close();
                        return;
                    }
                    if (line == "Terminate") {
                        std::cout << "Server termination requested\n";
                        a.close();
                        for (auto wp : session_list) {
                            if (auto session = wp.lock())
                                session->close();
                        }
                        return;
                    }
                }

                do_session(s); // full duplex, can read while writing, using a second buffer
            }

        });
    };

    do_accept = [&] {
        auto s = std::make_shared<session::element_type>(svc);

        a.async_accept(*s, [&,s](error_code ec) {
            if (ec)
                std::cerr << "accept failed: " << ec.message() << "\n";
            else {
                garbage_collect_sessions();

                session_list.push_back(s);
                do_session(s);
                do_accept(); // accept the next
            }
        });
    };

    do_accept(); // kick-off
    svc.run();   // wait for shutdown (Ctrl-C or failure)
}

显然使用 session_list 来实现 “Terminate” 命令:

if (line == "Terminate") {
    std::cout << "Server termination requested\n";
    a.close();
    for (auto wp : session_list) {
        if (auto session = wp.lock())
            session->close();
    }
    return;
}

关于c++ - ASIO - 如何停止简单的基于协程的服务器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50833730/

相关文章:

c++ - 如何在控制台窗口中找到光标的坐标?

c++ - Boost 的 UDP 异步客户端接收自己的数据报

c++ - Boost协程断言失败

c++ - boost 协程 2 的意外输出

c++ - 如何在大型项目中使用 -fsplit-stack

c++ - Botan C++哈希函数generate_bcrypt()

c++ - 将可修改的参数传递给 C++ 函数

c++ - 对 `uuid_generate@UUID_1.0' 的 undefined reference

c++ - 为什么在使用 boost::asio 为 STDIN/STDOUT 管道传输到程序时,read() 会因 EAGAIN 失败?

c++ - boost 中的协程局部变量