我发现,除了简单的案例之外,NIO 的记录充其量也很少。即便如此,我已经完成了教程和几次重构,并最终回到了最简单的情况,我仍然偶尔会遇到 isReadable 因 0 字节 SocketChannel 读取而触发。并不是每次执行都会发生这种情况。
我曾经在一个单独的线程中调用从附加对象的读取,并认为这可能是竞争条件,但我已经让读取发生在选择器的线程中,但问题仍然存在。我想这可能是我的测试客户端,但我不确定什么会不一致地触发它,因为客户端套接字在收到服务器的响应之前不应关闭。
因此,在包含的代码中,此代码段发送的“hello”消息每次都能顺利通过,正如我所期望的那样
out.write("hello".getBytes()); out.write(EOT); out.flush();
在此之后,我偶尔会得到一个 0 长度的套接字 channel 。有时可以从此片段中得到正确的响应:
out.write(dataServerCredentials.getBytes()); out.write(EOT); out.flush();
对此有任何见解将不胜感激,它正在慢慢地杀死我。我已经尝试在这里寻找答案,但似乎相关的一个问题并没有真正阐明我的问题。
提前致谢!
代码片段如下:
选择方法:
public void execute()
{
initializeServerSocket();
for (;;)
{
try
{
System.out.println("Waiting for socket activity");
selector.select();
Iterator<SelectionKey> selectedKeys =
this.selector.selectedKeys().iterator();
while(selectedKeys.hasNext())
{
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid())
{
continue;
}
if (key.isAcceptable())
{ // New connection
// TODO: Create helper method for this that configures user info?
System.out.println("Accepting connection");
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel)key.channel();
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.socket().setSoTimeout(0);
socketChannel.configureBlocking(false);
SelectionKey newKey =
socketChannel.register(selector, SelectionKey.OP_READ);
// Create and attach an AuthProcessor to drive the states of this
// new Authentication request
newKey.attach(new AuthenticationRequestProcessor(newKey));
}
else if (key.isReadable())
{ // Socket has incoming communication
AuthenticationRequestProcessor authProcessor =
(AuthenticationRequestProcessor)key.attachment();
if (authProcessor == null)
{ // Cancel Key
key.channel().close();
key.cancel();
System.err.print("Cancelling Key - No Attachment");
}
else
{
if (authProcessor.getState() ==
AuthenticationRequestProcessor.TERMINATE_STATE)
{ // Cancel Key
key.channel().close();
key.cancel();
}
else
{ // Process new socket data
authProcessor.process(readStringFromKey(key));
}
}
}
}
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
读取方法(忽略这里的一些愚蠢之处,这是从另一个线程中拉出来的)
protected String readStringFromKey(SelectionKey key)
{
SocketChannel socketChannel = (SocketChannel)key.channel();
readBuffer.clear();
String message = null;
try
{
final int bytesRead = socketChannel.read(readBuffer);
if (-1 == bytesRead)
{ // Empty/Closed Channel
System.err.println("Error - No bytes to read on selected channel");
}
else
{ // Convert ByteBuffer into a String
System.out.println("Bytes Read: " + bytesRead);
readBuffer.flip();
message = byteBufferToString(readBuffer, bytesRead);
readBuffer.clear();
}
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
// Trim EOT off the end of the message
return message.trim();
}
客户端片段:
public void connect()
{
boolean connectionStatus = false;
String connectionHost = null;
int connectionPort = 0;
String connectionAuthKey = null;
try
{ // Login
authenticationSocket = new Socket(AUTH_HOST, AUTH_PORT);
out = authenticationSocket.getOutputStream();
in = new BufferedInputStream(authenticationSocket.getInputStream());
out.write("hello".getBytes());
out.write(EOT);
out.flush();
StringBuilder helloResponse = new StringBuilder();
// Read response off socket
int currentByte = in.read();
while (currentByte > -1 && currentByte != EOT)
{
helloResponse.append((char)currentByte);
currentByte = in.read();
}
outgoingResponses.offer(Plist.fromXml(helloResponse.toString()));
System.out.println("\n" + helloResponse.toString());
out.write(credentials.getBytes());
out.write(EOT);
out.flush();
// Read status
int byteRead;
StringBuilder command = new StringBuilder();
do
{
byteRead = in.read();
if (0 < byteRead)
{
if (EOT == byteRead)
{
Logger.logData(command.toString());
Map<String, Object> plist = Plist.fromXml(command.toString());
outgoingResponses.offer(plist);
// Connection info for Data Port
connectionStatus = (Boolean)plist.get(STATUS_KEY);
connectionHost = (String)plist.get(SERVER_KEY);
connectionPort = (Integer)plist.get(PORT_KEY);
connectionAuthKey = (String)plist.get(AUTH_KEY);
Logger.logData("Server =>" + plist.get("server"));
command = new StringBuilder();
}
else
{
command.append((char)byteRead);
}
}
}
while (EOT != byteRead);
}
catch (UnknownHostException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (XmlParseException e)
{
Logger.logData("Invalid Plist format");
e.printStackTrace();
}
finally
{ // Clean up handles
try
{
authenticationSocket.close();
out.close();
in.close();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Connection status =>" + connectionStatus);
System.out.println("Connection host =>" + connectionHost);
System.out.println("Connection port =>" + connectionPort);
if (connectionStatus)
{
dataServerHost = connectionHost;
dataServerPort = connectionPort;
dataServerAuthKey = connectionAuthKey;
System.out.println("Connecting to data server @: " + dataServerHost + ":" + dataServerPort);
connectToDataServer();
}
}
最佳答案
我记得虚假选择器唤醒是可能的。
虽然很有趣的是,当您被告知有东西可读时却没有任何东西可读,但这对于程序来说通常不是问题。程序在读取 TCP 流时通常应该期望任意数量的字节;而0字节的情况通常不需要特殊处理。
你的程序理论上是错误的。您不能指望可以立即阅读整条消息。一次读取可能只返回其中的一部分。可能只有 1 个字节。没有任何保证。
“正确”的方法是累积缓冲区中读取的所有字节。在缓冲区中查找 EOT。如果消息是碎片化的,则在整个消息到达之前可能需要多次读取。
loop
select();
if readable
bytes = read()
buffer.append(bytes)
while( buffer has EOT at position i)
msg = buffer[0-i]
left shift buffer by i
在此流程中您可以看到,read() 是否读取 0 字节并不重要。这确实与蔚来无关。即使在传统的阻塞 TCP IO 中,也必须做到这一策略理论上是正确的。
但是,实际上,如果您确实观察到整个消息始终是一个整体,那么您就不需要这么复杂;您的原始代码在您的环境中实际上是正确的。
现在您观察到有时会读取 0 字节。那么你之前的实际假设就必须修改。您可以添加一些特殊分支来忽略 0 字节 block 。
关于Java NIO 问题/对 isReadable 工作方式的误解,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6273146/