python - 保持 TCP 套接字连接事件和读/写协调

标签 python c++ sockets networking

我的设置和环境:

  • Win10
  • TCP 客户端:带有 asio 库的 C++(无提升),在后台线程中运行。
  • TCP 服务器:带有 SocketServer 模块的 Python3,在后台线程中运行。
  • 目前所有人都使用阻塞 I/O

要求:

  • 客户端偶尔向服务器发送有关用户交互的字符串命令。
  • 服务器接收命令并执行操作。

问题:

  • 客户端可能会在 read() 处挂起。
  • 在客户端挂起后,服务器可能会在 recv() 处挂起。

“净”结果是服务器总是得到我最初的“Hello”握手,但它的响应字符串 ACK 从未被客户端接收到。看起来客户端和服务器之间需要某种读/写协调。

来自类似 https://stackoverflow.com/a/1480246/987846 的问题和 Does the TCPServer + BaseRequestHandler in Python's SocketServer close the socket after each call to handle()?了解到可能涉及两个问题

  • Python TCP 服务器总是在接收时关闭连接,这样就不会连续调用处理程序。
  • TCP 连接在因超时而关闭之前需要一个保持事件机制。

我想知道如何解决这个问题以及什么是满足我的要求的最佳策略

  • 在服务器和客户端之间设计一个周期性的“完整性检查”乒乓数据传输。
  • 求助于我不熟悉的非阻塞 I/O。

或者这只是我的一个错误?

服务器代码:

import socketserver
import sys
import threading

_dostuff = True

class CmdHandler(socketserver.StreamRequestHandler):
    def handle(self):
        while True:                        
            data = self.request.recv(1024)
            s = data.decode('utf-8')
            if s == 'Hello':
                print('HANDSHAKE: ACK', flush=True)
                self.request.send('ACK\x00'.encode())
            if s == 'Stop':            
                print('Cmd: Mute', flush=True)
                with threading.Lock():
                    _dostuff = False
            if s == 'Start':
                with threading.Lock():
                    _dostuff = True
        return

if __name__ == '__main__':
    import socket    
    import time

    # Command server
    address = ('localhost', 1234)  # let the kernel assign a port
    cmd_server = socketserver.TCPServer(address, CmdHandler)
    cmd_ip, cmd_port = cmd_server.server_address  # what port was assigned?

    t1 = threading.Thread(target=cmd_server.serve_forever)
    t1.setDaemon(True)  # don't hang on exit
    t1.start()

    while True:            
        time.sleep(1)

客户端代码(部分)

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 128> buf;
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    asio::write(socket, asio::buffer(handshake.c_str(), handshake.length()));
                    if (error == asio::error::eof)
                        continue; // Connection closed cleanly by peer; keep trying.
                    else if (error)
                        throw asio::system_error(error); // Some other error.

                    // ***PROBLEM: THIS MAY BLOCK FOREVER***
                    size_t len = asio::read(socket, asio::buffer(buf), error);
                    // ***PROBLEM END***


                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }                   
                    std::string received = std::string(buf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);
            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            debug("CmdClient {}: Send command: {}", m_id.c_str(), cmd.c_str());
            size_t len = asio::write(socket, asio::buffer(cmd.c_str(), cmd.length()));
            debug("CmdClient {}: {} bytes written.", m_id.c_str(), len);
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

如果我删除服务器端的 while 循环,那么它看起来像

class CmdHandler(socketserver.StreamRequestHandler):
    # timeout = 5
    def handle(self):
        data = self.request.recv(1024)
        s = data.decode('utf-8')
        if s == 'Hello':
            self.request.send('ACK\x00'.encode())
        if s == 'Stop':            
            with threading.Lock():
                _dostuff = False
        if s == 'Start':
            with threading.Lock():
                _dostuff = True
        return

然后

  • 客户端收到服务器的ACK
  • 但是Client发送的后续消息不会被Server接收。

最佳答案

所以我终于解决了这个问题。非常感谢@bigdataolddriver 的离线帮助。我学到了很多关于 ncat 调试的知识。

我基本上

  • 在服务器端:放弃了使用 Python 的 socketserver 模块的想法。首先,我发现 it's synchronous only .
  • 在客户端:使用 asio::ip::tcp::socket::read_some/asio::ip::tcp::socket::write_some asio::read/asio::write 的。

这是仅基于 socket 模块的新服务器代码。

import socket
import sys
import threading

_dostuff = True

def run_cmd_server():
    global _dostuff
    # Create a TCP/IP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the port
    server_address = ('localhost', 1234)
    print('CmdServer: starting up on {} port {}'.format(*server_address))
    sock.bind(server_address)

    # Listen for incoming connections
    sock.listen(1)

    while True:
        # Wait for a connection
        print('CmdServer: waiting for a connection')
        connection, client_address = sock.accept()
        try:
            print('CmdServer: connection from client:', client_address)

            # Receive the data in small chunks and retransmit it
            while True:
                data = connection.recv(1024)
                print('received {!r}'.format(data))
                if not data:
                    print('no data from', client_address)
                    break
                cmd = data.decode('utf-8').strip('\x00')
                if cmd == 'Hello':
                    print('Cmd: {}'.format(cmd))
                    connection.sendall('ACK\x00'.encode('utf-8'))
                elif cmd == 'Stop':
                    print('Cmd: {}'.format(cmd))                    
                    _dostuff = False
                    print('_dostuff : {}'.format(_dostuff ))
                elif cmd == 'Start':
                    _dostuff = True
                    print('_dostuff : {}'.format(_dostuff ))
                else:
                    print('Misc: {}'.format(cmd))
                connection.sendall('ack\x00'.encode('utf-8'))
        except:
            continue;
        finally:
            # Clean up the connection
            connection.close()


def main():
    t1 = threading.Thread(target=run_cmd_server, name='t_cmd', daemon=True)
    t1.start()
    t1.join()


if __name__ == '__main__':
    main()

这是新的客户端代码:

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 1024> readBuf{'\0'};
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    size_t len = socket.write_some(asio::buffer(handshake.c_str(), handshake.length()), error);
                    if (error == asio::error::eof) {
                        asio::connect(socket, endpoint_iterator);
                        continue; // Connection closed cleanly by peer; keep trying.
                    }
                    else if (error)
                        throw asio::system_error(error); // Some other error.
                    len = socket.read_some(asio::buffer(readBuf), error);
                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }
                    std::string received = std::string(readBuf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);

            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Hello:
            cmd = "Hello";
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            size_t len = socket.write_some(asio::buffer(cmd.c_str(), cmd.length()));
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

我还没有使用 ASIO 的异步功能(非常害怕在这次调试 session 后立即这样做)。但现在至少这段代码按我的预期工作:服务器可以正常接收来自客户端的命令。

附带说明一下,由于只有一个线程写入全局变量 _dostuff,因此我删除了线程锁定。

如果有人知道我最初的实现到底哪里出了问题,我将不胜感激。

关于python - 保持 TCP 套接字连接事件和读/写协调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57882042/

相关文章:

python - 如何将 Array 类型的参数指定到 Django 命令中?

c++ - 我可以将 Boost.Geometry.index.rtree 与线程一起使用吗?

c++ - 在具有变体数据结构的 C++ 中使用 msgpack

java - Controller 和远程工作机器之间的连接

c++ system()挂起使用netcat连接到不同线程中的套接字

python - 没有线条和误差线的 Pandas 线图(来自带有剪切的 groupby)

Python - 一个简单的套接字脚本出错

c++ - OpenGL 3.3 - 2 个三角形的不同旋转

java - socket.connect(endpoint) 和 new Socket(ip,port) 有什么区别

python - 从 MEDIA_ROOT 下载不工作