Java NIO 问题/对 isReadable 工作方式的误解

标签 java sockets nio socketchannel

我发现,除了简单的案例之外,NIO 的记录充其量也很少。即便如此,我已经完成了教程和几次重构,并最终回到了最简单的情况,我仍然偶尔会遇到 isReadable 因 0 字节 SocketChannel 读取而触发。并不是每次执行都会发生这种情况。

我曾经在一个单独的线程中调用从附加对象的读取,并认为这可能是竞争条件,但我已经让读取发生在选择器的线程中,但问题仍然存在。我想这可能是我的测试客户端,但我不确定什么会不一致地触发它,因为客户端套接字在收到服务器的响应之前不应关闭。

因此,在包含的代码中,此代码段发送的“hello”消息每次都能顺利通过,正如我所期望的那样

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();

在此之后,我偶尔会得到一个 0 长度的套接字 channel 。有时可以从此片段中得到正确的响应:

        out.write(dataServerCredentials.getBytes());
        out.write(EOT);
        out.flush();

对此有任何见解将不胜感激,它正在慢慢地杀死我。我已经尝试在这里寻找答案,但似乎相关的一个问题并没有真正阐明我的问题。

提前致谢!

代码片段如下:

选择方法:

public void execute()
{
    initializeServerSocket();

    for (;;)
    {
        try
        {
            System.out.println("Waiting for socket activity");

            selector.select();

            Iterator<SelectionKey> selectedKeys = 
                this.selector.selectedKeys().iterator();
            while(selectedKeys.hasNext())
            {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) 
                {
                    continue;
                }

                if (key.isAcceptable())
                {   // New connection
                    // TODO: Create helper method for this that configures user info?
                    System.out.println("Accepting connection");

                    ServerSocketChannel serverSocketChannel =
                        (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel =
                        serverSocketChannel.accept();

                    socketChannel.socket().setSoTimeout(0);
                    socketChannel.configureBlocking(false);
                    SelectionKey newKey = 
                        socketChannel.register(selector, SelectionKey.OP_READ);

                    // Create and attach an AuthProcessor to drive the states of this
                    // new Authentication request
                    newKey.attach(new AuthenticationRequestProcessor(newKey));

                }
                else if (key.isReadable())
                {   // Socket has incoming communication
                    AuthenticationRequestProcessor authProcessor =
                        (AuthenticationRequestProcessor)key.attachment();

                    if (authProcessor == null)
                    {   // Cancel Key
                        key.channel().close();
                        key.cancel();
                        System.err.print("Cancelling Key - No Attachment");
                    }
                    else
                    {   
                        if (authProcessor.getState() ==
                            AuthenticationRequestProcessor.TERMINATE_STATE)
                        {   // Cancel Key
                            key.channel().close();
                            key.cancel();
                        }
                        else
                        {   // Process new socket data
                            authProcessor.process(readStringFromKey(key));
                        }
                    }
                }                    
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

读取方法(忽略这里的一些愚蠢之处,这是从另一个线程中拉出来的)

protected String readStringFromKey(SelectionKey key)
{
    SocketChannel socketChannel = (SocketChannel)key.channel();

    readBuffer.clear();

    String message = null;

    try
    {
        final int bytesRead = socketChannel.read(readBuffer);

        if (-1 == bytesRead)
        {   // Empty/Closed Channel
            System.err.println("Error - No bytes to read on selected channel");
        }
        else
        {   // Convert ByteBuffer into a String
            System.out.println("Bytes Read: " + bytesRead);
            readBuffer.flip();
            message = byteBufferToString(readBuffer, bytesRead);
            readBuffer.clear();
        }
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block

        e.printStackTrace();
    }

    // Trim EOT off the end of the message
    return message.trim();
}

客户端片段:

    public void connect()
{
    boolean connectionStatus = false;
    String connectionHost = null;
    int connectionPort = 0;
    String connectionAuthKey = null;

    try
    {   // Login
        authenticationSocket = new Socket(AUTH_HOST, AUTH_PORT);
        out = authenticationSocket.getOutputStream();
        in = new BufferedInputStream(authenticationSocket.getInputStream());

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();


        StringBuilder helloResponse = new StringBuilder();

        // Read response off socket
        int currentByte = in.read();

        while (currentByte > -1 && currentByte != EOT)
        {
            helloResponse.append((char)currentByte);
            currentByte = in.read();
        }

        outgoingResponses.offer(Plist.fromXml(helloResponse.toString()));
        System.out.println("\n" + helloResponse.toString());

        out.write(credentials.getBytes());
        out.write(EOT);
        out.flush();

        // Read status
        int byteRead;

        StringBuilder command = new StringBuilder();

        do 
        {
            byteRead = in.read();
            if (0 < byteRead) 
            {
                if (EOT == byteRead)
                {
                    Logger.logData(command.toString());

                    Map<String, Object> plist = Plist.fromXml(command.toString());
                    outgoingResponses.offer(plist);

                    // Connection info for Data Port
                    connectionStatus = (Boolean)plist.get(STATUS_KEY);
                    connectionHost = (String)plist.get(SERVER_KEY);
                    connectionPort = (Integer)plist.get(PORT_KEY);
                    connectionAuthKey = (String)plist.get(AUTH_KEY);

                    Logger.logData("Server =>" + plist.get("server"));

                    command = new StringBuilder();

                }
                else
                {
                    command.append((char)byteRead);
                }
            }
        } 
        while (EOT != byteRead);
    }
    catch (UnknownHostException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (XmlParseException e)
    {
        Logger.logData("Invalid Plist format");
        e.printStackTrace();
    }
    finally
    {   // Clean up handles
        try
        {
            authenticationSocket.close();
            out.close();
            in.close();
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    System.out.println("Connection status =>" + connectionStatus);
    System.out.println("Connection host =>" + connectionHost);
    System.out.println("Connection port =>" + connectionPort);

    if (connectionStatus)
    {
        dataServerHost = connectionHost;
        dataServerPort = connectionPort;
        dataServerAuthKey = connectionAuthKey;
        System.out.println("Connecting to data server @: " + dataServerHost + ":" + dataServerPort);
        connectToDataServer();
    }
}

最佳答案

我记得虚假选择器唤醒是可能的。

虽然很有趣的是,当您被告知有东西可读时却没有任何东西可读,但这对于程序来说通常不是问题。程序在读取 TCP 流时通常应该期望任意数量的字节;而0字节的情况通常不需要特殊处理。

你的程序理论上是错误的。您不能指望可以立即阅读整条消息。一次读取可能只返回其中的一部分。可能只有 1 个字节。没有任何保证。

“正确”的方法是累积缓冲区中读取的所有字节。在缓冲区中查找 EOT。如果消息是碎片化的,则在整个消息到达之前可能需要多次读取。

loop 
  select();
  if readable
     bytes = read()
     buffer.append(bytes)
     while( buffer has EOT at position i)
       msg = buffer[0-i]
       left shift buffer by i

在此流程中您可以看到,read() 是否读取 0 字节并不重要。这确实与蔚来无关。即使在传统的阻塞 TCP IO 中,也必须做到这一策略理论上是正确的。

但是,实际上,如果您确实观察到整个消息始终是一个整体,那么您就不需要这么复杂;您的原始代码在您的环境中实际上是正确的。

现在您观察到有时会读取 0 字节。那么你之前的实际假设就必须修改。您可以添加一些特殊分支来忽略 0 字节 block 。

关于Java NIO 问题/对 isReadable 工作方式的误解,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6273146/

相关文章:

Java - 在不关闭的情况下中断 I/O

java - Java Socket 是否缓存 DNS?

java - 如何在 Java 8 中设置最小元空间

java - 在 javaFX 中切换场景

java - spring-boot:数据库中断后jdbc重新连接

JAVA:http post 请求

C 套接字 : handle variable length packets

mysql - 无法连接到 docker mysql unix 套接字

java - 无法摆脱使用套接字java发送文件中的接收文件循环

java - 使用 java.nio 时抛出 IllegalArgumentException