Java异步客户端,意外行为

标签 java sockets asynchronous nio

我最近打开了一个异步套接字片段,我对以下客户端代码的错误行为感到惊讶,它是使用 AsynchronousSocketChannel 的基本 PoC 客户端。

理想情况下,这个片段永远不应该达到“我吓坏了”,但它确实如此。 基本上问题是我使用了 ByteBuffer,在循环结束时我将其位置设置为 0,在循环开始时我希望它为 0,但有时它不是。

显示该错误的视频是:
https://www.youtube.com/watch?v=bV08SjYutRw&feature=youtu.be
我可以解决在 .nextLine() 之后调用 .clear() 的问题,但是,我很好奇,这个无辜的代码片段中发生了什么?

package com.melardev.sockets.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;

public class AsyncTcpClientCallbacks {

    // Horrible demo, I would never write asynchronous sockets this way, I would use attachments
    // which allows our code to be cleaner and isolate everything into their own classes
    // This is only here to show you how you could do it without attachments, but you have
    // to expose the socketChannel so it can be accessible from everywhere, my recommendation is not to bother
    // learning, this, go to the demo where I use attachments, it is a lot more readable


    static CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer bytesReaded, ByteBuffer buffer) {
            buffer.flip();
            byte[] receivedBytes = new byte[buffer.limit()];
            // Get into receivedBytes
            buffer.get(receivedBytes);
            String message = new String(receivedBytes);
            System.out.println(message);

            buffer.clear();
            socketChannel.read(buffer, buffer, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.err.println("Error reading message");
            System.exit(1);
        }

    };

    static private CompletionHandler<Integer, Void> writeHandler = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer bytesWritten, Void attachment) {

        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.err.println("Something went wrong");
            System.exit(-1);
        }
    };

    private static AsynchronousSocketChannel socketChannel;

    public static void main(String[] args) {

        try {
            socketChannel = AsynchronousSocketChannel.open();


            //try to connect to the server side
            socketChannel.connect(new InetSocketAddress("localhost", 3002), null
                    , new CompletionHandler<Void, Void>() {
                        @Override
                        public void completed(Void result, Void attachment) {


                            ByteBuffer receivedBuffer = ByteBuffer.allocate(1024);
                            socketChannel.read(receivedBuffer, receivedBuffer, readHandler);


                            Scanner scanner = new Scanner(System.in);
                            System.out.println("Write messages to send to server");
                            ByteBuffer bufferToSend = ByteBuffer.allocate(1024);
                            String line = "";
                            while (!line.equals("exit")) {
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 1");
                                }
                                line = scanner.nextLine();
                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 2");
                                }
                                byte[] bytesToWrite = line.getBytes();
                                // bufferToSend.clear();
                                bufferToSend.put(bytesToWrite);
                                System.out.println(bufferToSend.limit());
                                bufferToSend.flip();
                                System.out.println(bufferToSend.limit());

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 3");
                                }
                                if (bufferToSend.limit() != line.length()) {
                                    System.err.println("I am freaking out 4");
                                }

                                socketChannel.write(bufferToSend, null, writeHandler);


                                bufferToSend.limit(bufferToSend.capacity());
                                bufferToSend.position(0);
                                // The two lines above are the same as
                                // bufferToSend.clear(); // Reuse the same buffer, so set pos=0
                                // limit to the capacity which is 1024

                                if (bufferToSend.position() != 0) {
                                    System.err.println("I am freaking out 5");
                                }


                            }
                        }

                        @Override
                        public void failed(Throwable exc, Void nothing) {
                            System.out.println("Error connection to host");
                        }

                    });

            while (true) {
                try {
                    Thread.sleep(60 * 1000);
                    // Sleep 1 min ... who cares ?
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

“我吓坏了”这句话到底是怎么执行的?我认为不可能,条件永远不应该被评估为 true,我制作了清楚显示错误的视频,有时在 while() 循环开始时缓冲区的位置不同于 0,但它不应该,因为在循环结束时我将其设置为 0。 令人惊讶的是,当我在启动应用程序之前设置断点并逐行跟踪时,这种行为不会发生……怎么可能?
我记录了它,我开始通过调试器进行跟踪,一切工作正常,但是一旦我删除了中断并让调试器运行,之前工作的相同代码现在就不行了。我错过了什么?
显示其与追踪配合使用的视频如下: https://www.youtube.com/watch?v=0H1OJdZO6AY&feature=youtu.be
如果您想要一台服务器来玩,那么这就是视频中使用的服务器

package com.melardev.sockets.servers;

import com.melardev.sockets.Constants;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class AsyncTcpEchoServerKey {

    public static void main(String[] args) {

        try {
            // Create new selector
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().setReuseAddress(true);
            // By default this is true, so set it to false for nio sockets
            serverSocketChannel.configureBlocking(false);
            InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
            // Bind to localhost and specified port
            serverSocketChannel.socket().bind(new InetSocketAddress(loopbackAddress, Constants.SOCKET_PORT));


            // ServerSocketChannel only supports OP_ACCEPT (see ServerSocketChannel::validOps())
            // it makes sense, server can only accept sockets
            int operations = SelectionKey.OP_ACCEPT;

            serverSocketChannel.register(selector, operations);
            while (true) {
                if (selector.select() <= 0) {
                    continue;
                }

                try {
                    processReadySet(selector.selectedKeys());
                } catch (IOException e) {
                    continue;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void processReadySet(Set readySet) throws IOException {
        Iterator iterator = readySet.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = (SelectionKey) iterator.next();
            // After processing a key, it still persists in the Set, we wanna remove it
            // otherwise we will get it back the next time processReadySet is called
            // We would end up processing the same "event" as many times this method is called
            iterator.remove();
            System.out.printf("isAcceptable %b isConnectable %b isReadable %b isWritable %b\n"
                    , key.isAcceptable(), key.isConnectable(), key.isReadable(), key.isWritable());
            if (key.isAcceptable()) {
                ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();

                // Get the client socket channel
                SocketChannel clientSocketChannel = (SocketChannel) ssChannel.accept();
                // Configure it as non-blocking socket
                clientSocketChannel.configureBlocking(false);
                // Register the socket with the key selector, we want to get notified when we have
                // something to read from socket(OP_READ)
                clientSocketChannel.register(key.selector(), SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                // A Remote client has send us a message
                String message = "nothing";
                // Get the socket who sent the message
                SocketChannel sender = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesCount = 0;
                try {
                    bytesCount = sender.read(buffer);

                    if (bytesCount > 0) {

                        // 1. Get manually
                        message = new String(buffer.array(), 0, bytesCount);

                        // 2. Or, use flip
                        // set buffer.position =0 and buffer.limit = bytesCount
                        buffer.flip();
                        byte[] receivedMessageBytes = new byte[bytesCount];
                        buffer.get(receivedMessageBytes);
                        message = new String(receivedMessageBytes);
                        System.out.println("Receive " + message);
                        // Writing
                        // 1. Easy approach, create a new ByteBuffer and send it
                        // ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());
                        // sender.write(outputBuffer);

                        // 2. Or to reuse the same buffer we could
                        // buffer.limit(buffer.position());
                        // buffer.position(0);

                        // 3. Or the same as point 2, but one line
                        buffer.flip();

                        sender.write(buffer);


                    } else {
                        SocketChannel ssChannel = (SocketChannel) key.channel();
                        ssChannel.close();
                        System.out.println("Client disconnected");
                        break;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    try {
                        sender.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }

}

PD:第一个视频显示的内容甚至比我之前所说的还要多,请注意,在设置中断之前,控制台显示我吓坏了 2 和 4,当设置中断时,我还触发了我吓坏了 1,一开始并非如此,不仅如此,当我恢复该过程时,这一次,我吓坏了 1 没有被触发!!!

最佳答案

AsynchronousSocketChannel.write() 的文档说:

Buffers are not safe for use by multiple concurrent threads so care should be taken to not access the buffer until the operation has completed.

你没有这么小心,而是设置了 bufferToSend 的限制和位置,而不管写入是否完成,因此意外的行为可能是由于这种粗心造成的。

关于Java异步客户端,意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55997355/

相关文章:

c++ - 如何在 C++ 中使用 snmpwalk

c - Ubuntu 上的多播

javascript - js中的并行进程

c# - 没有await的using语句中的async,这安全吗?

java - ResponseListener 在 snmp4j 中的工作原理

java - 如何在android中使用volley获取Json对象中的Json数组并在listview代码和Xml Avilavble中设置此json数据

java - 为什么子类转换为父类型默认为私有(private)实例方法的父版本,而不是其他实例方法?

java - java编译器如何在类路径未设置为jdk路径的情况下找到类文件?

android - 如何正确关闭套接字及其InputStream?

java - servlet api 3.0 jar 中缺少异步方法?