我的设置和环境:
- Win10
- TCP 客户端:带有
asio
库的 C++(无提升),在后台线程中运行。 - TCP 服务器:带有
SocketServer
模块的 Python3,在后台线程中运行。 - 目前所有人都使用阻塞 I/O
要求:
- 客户端偶尔向服务器发送有关用户交互的字符串命令。
- 服务器接收命令并执行操作。
问题:
- 客户端可能会在
read()
处挂起。 - 在客户端挂起后,服务器可能会在
recv()
处挂起。
“净”结果是服务器总是得到我最初的“Hello”握手,但它的响应字符串 ACK
从未被客户端接收到。看起来客户端和服务器之间需要某种读/写协调。
来自类似 https://stackoverflow.com/a/1480246/987846 的问题和 Does the TCPServer + BaseRequestHandler in Python's SocketServer close the socket after each call to handle()?了解到可能涉及两个问题
- Python TCP 服务器总是在接收时关闭连接,这样就不会连续调用处理程序。
- TCP 连接在因超时而关闭之前需要一个保持事件机制。
我想知道如何解决这个问题以及什么是满足我的要求的最佳策略
- 在服务器和客户端之间设计一个周期性的“完整性检查”乒乓数据传输。
- 求助于我不熟悉的非阻塞 I/O。
或者这只是我的一个错误?
服务器代码:
import socketserver
import sys
import threading
_dostuff = True
class CmdHandler(socketserver.StreamRequestHandler):
def handle(self):
while True:
data = self.request.recv(1024)
s = data.decode('utf-8')
if s == 'Hello':
print('HANDSHAKE: ACK', flush=True)
self.request.send('ACK\x00'.encode())
if s == 'Stop':
print('Cmd: Mute', flush=True)
with threading.Lock():
_dostuff = False
if s == 'Start':
with threading.Lock():
_dostuff = True
return
if __name__ == '__main__':
import socket
import time
# Command server
address = ('localhost', 1234) # let the kernel assign a port
cmd_server = socketserver.TCPServer(address, CmdHandler)
cmd_ip, cmd_port = cmd_server.server_address # what port was assigned?
t1 = threading.Thread(target=cmd_server.serve_forever)
t1.setDaemon(True) # don't hang on exit
t1.start()
while True:
time.sleep(1)
客户端代码(部分)
virtual bool Connect() override {
bool isInitialized = false;
try {
asio::io_context io_context;
asio::ip::tcp::resolver resolver(io_context);
asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
asio::ip::tcp::socket socket(io_context);
asio::connect(socket, endpoint_iterator);
while (true) {
std::array<char, 128> buf;
asio::error_code error;
// Handshaking
// - on connection, say hello to cmd-server; wait for ACK
if ( ! isInitialized ) {
debug("CmdClient {}: handshaking ...", m_id.c_str());
std::string handshake("Hello");
asio::write(socket, asio::buffer(handshake.c_str(), handshake.length()));
if (error == asio::error::eof)
continue; // Connection closed cleanly by peer; keep trying.
else if (error)
throw asio::system_error(error); // Some other error.
// ***PROBLEM: THIS MAY BLOCK FOREVER***
size_t len = asio::read(socket, asio::buffer(buf), error);
// ***PROBLEM END***
if (len <= 0) {
debug("CmdClient {}: No response", m_id.c_str());
}
std::string received = std::string(buf.data());
if (received == std::string("ACK")) {
debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
isInitialized = true;
Notify("ACK");
}
else {
debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
}
continue;
}
SendCommand(socket);
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
isInitialized = false;
}
return true;
}
void SendCommand(asio::ip::tcp::socket& socket) {
std::string cmd("");
switch (m_cmd) {
case NoOp:
break;
case Stop:
cmd = "Stop";
break;
case Start:
cmd = "Start";
break;
default:
break;
}
if (cmd.size() > 0) {
debug("CmdClient {}: Send command: {}", m_id.c_str(), cmd.c_str());
size_t len = asio::write(socket, asio::buffer(cmd.c_str(), cmd.length()));
debug("CmdClient {}: {} bytes written.", m_id.c_str(), len);
m_cmd = NoOp; // Avoid resend in next frame;
}
}
如果我删除服务器端的 while 循环,那么它看起来像
class CmdHandler(socketserver.StreamRequestHandler):
# timeout = 5
def handle(self):
data = self.request.recv(1024)
s = data.decode('utf-8')
if s == 'Hello':
self.request.send('ACK\x00'.encode())
if s == 'Stop':
with threading.Lock():
_dostuff = False
if s == 'Start':
with threading.Lock():
_dostuff = True
return
然后
- 客户端收到服务器的
ACK
。 - 但是Client发送的后续消息不会被Server接收。
最佳答案
所以我终于解决了这个问题。非常感谢@bigdataolddriver 的离线帮助。我学到了很多关于 ncat
调试的知识。
我基本上
- 在服务器端:放弃了使用 Python 的
socketserver
模块的想法。首先,我发现 it's synchronous only . - 在客户端:使用
asio::ip::tcp::socket::read_some
/asio::ip::tcp::socket::write_some
asio::read
/asio::write
的。
这是仅基于 socket
模块的新服务器代码。
import socket
import sys
import threading
_dostuff = True
def run_cmd_server():
global _dostuff
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to the port
server_address = ('localhost', 1234)
print('CmdServer: starting up on {} port {}'.format(*server_address))
sock.bind(server_address)
# Listen for incoming connections
sock.listen(1)
while True:
# Wait for a connection
print('CmdServer: waiting for a connection')
connection, client_address = sock.accept()
try:
print('CmdServer: connection from client:', client_address)
# Receive the data in small chunks and retransmit it
while True:
data = connection.recv(1024)
print('received {!r}'.format(data))
if not data:
print('no data from', client_address)
break
cmd = data.decode('utf-8').strip('\x00')
if cmd == 'Hello':
print('Cmd: {}'.format(cmd))
connection.sendall('ACK\x00'.encode('utf-8'))
elif cmd == 'Stop':
print('Cmd: {}'.format(cmd))
_dostuff = False
print('_dostuff : {}'.format(_dostuff ))
elif cmd == 'Start':
_dostuff = True
print('_dostuff : {}'.format(_dostuff ))
else:
print('Misc: {}'.format(cmd))
connection.sendall('ack\x00'.encode('utf-8'))
except:
continue;
finally:
# Clean up the connection
connection.close()
def main():
t1 = threading.Thread(target=run_cmd_server, name='t_cmd', daemon=True)
t1.start()
t1.join()
if __name__ == '__main__':
main()
这是新的客户端代码:
virtual bool Connect() override {
bool isInitialized = false;
try {
asio::io_context io_context;
asio::ip::tcp::resolver resolver(io_context);
asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
asio::ip::tcp::socket socket(io_context);
asio::connect(socket, endpoint_iterator);
while (true) {
std::array<char, 1024> readBuf{'\0'};
asio::error_code error;
// Handshaking
// - on connection, say hello to cmd-server; wait for ACK
if ( ! isInitialized ) {
debug("CmdClient {}: handshaking ...", m_id.c_str());
std::string handshake("Hello");
size_t len = socket.write_some(asio::buffer(handshake.c_str(), handshake.length()), error);
if (error == asio::error::eof) {
asio::connect(socket, endpoint_iterator);
continue; // Connection closed cleanly by peer; keep trying.
}
else if (error)
throw asio::system_error(error); // Some other error.
len = socket.read_some(asio::buffer(readBuf), error);
if (len <= 0) {
debug("CmdClient {}: No response", m_id.c_str());
}
std::string received = std::string(readBuf.data());
if (received == std::string("ACK")) {
debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
isInitialized = true;
Notify("ACK");
}
else {
debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
}
continue;
}
SendCommand(socket);
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
isInitialized = false;
}
return true;
}
void SendCommand(asio::ip::tcp::socket& socket) {
std::string cmd("");
switch (m_cmd) {
case NoOp:
break;
case Hello:
cmd = "Hello";
break;
case Stop:
cmd = "Stop";
break;
case Start:
cmd = "Start";
break;
default:
break;
}
if (cmd.size() > 0) {
size_t len = socket.write_some(asio::buffer(cmd.c_str(), cmd.length()));
m_cmd = NoOp; // Avoid resend in next frame;
}
}
我还没有使用 ASIO 的异步功能(非常害怕在这次调试 session 后立即这样做)。但现在至少这段代码按我的预期工作:服务器可以正常接收来自客户端的命令。
附带说明一下,由于只有一个线程写入全局变量 _dostuff
,因此我删除了线程锁定。
如果有人知道我最初的实现到底哪里出了问题,我将不胜感激。
关于python - 保持 TCP 套接字连接事件和读/写协调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57882042/