java - 为什么这个非阻塞IO调用会失败呢?

标签 java sockets nio serversocket

背景

  • 我想使用 Java 的非阻塞 SocketChannel 发送大量数据(30MB,但将来可能会更大)
    • 为什么是非阻塞?这样下一个要发送的字节的计算就不会因等待网络而被阻塞
  • 当我在阻塞模式下使用 SocketChannel 时,传输顺利完成
  • 当我将 SocketChannel 设置为非阻塞时,它的完成速度明显更快,但服务器并未收到所有数据
    • 不过,服务器确实收到了一些数据

问题

  • 使用非阻塞 Java NIO SocketChannel 时,为什么我的大文件 (30MB) 传输失败?如何解决该问题?

文件

  • 我删除了程序并编写了示例,以便它可以通过 javac *.java && java Main 一次性运行。

    • 它为服务器创建一个 Future,为客户端创建一个 Future,让客户端向服务器发送 30MB 的随机字节,然后阻塞主线程,直到两个 Future 都完成(尽管服务器永远不会完成)
  • 注意:这主要是关于 TheClient.java

    • 如果注释之间的行<CommentOutToMakeWork></CommentOutToMakeWork>被注释掉,SocketChannel 将被阻塞并且传输将完成

Main.java :

import java.io.IOException;
import java.lang.InterruptedException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
  public static void main(String[] args) throws ExecutionException, IOException, InterruptedException {
    final SocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
    final int size              = 30 * 1000 * 1000;

    ExecutorService executor = Executors.newFixedThreadPool(2);
    TheServer theServer      = new TheServer(address, size);
    TheClient theClient      = new TheClient(address, size);

    Future<String> serverFuture = executor.submit(theServer);
    Thread.sleep(2000);
    Future<String> clientFuture = executor.submit(theClient);

    System.out.println("MAIN: Received from client: " + clientFuture.get());
    System.out.println("MAIN: Received from server: " + serverFuture.get());
    executor.shutdown();
  }
}

TheClient.java :

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;

class TheClient implements Callable<String> {
  private TheClient() {}
  public TheClient(SocketAddress address, int size) {
    this.size = size;
    this.from = new byte[size];
    this.serverAddress = address;
    new Random().nextBytes(from);
  }

  private int           size;
  private byte[]        from;
  private SocketAddress serverAddress;

  public String call() throws IOException {
    SocketChannel socketChannel = SocketChannel.open();
    System.out.println("CLIENT: Attempting to connect to server...");
    socketChannel.connect(serverAddress);
    // <CommentOutToMakeWork>
    socketChannel.configureBlocking(false);
    // </CommentOutToMakeWork>
    System.out.println("CLIENT: Connection established. Sending " + size + " bytes.");

    // For this example, this is one large write, but even my actual
    // program, which uses a loop and puts smaller chunks onto the channel,
    // is too fast for the SocketChannel.

    socketChannel.write(ByteBuffer.wrap(from));

    System.out.println("CLIENT: Write completed.");
    return "CLIENT: Success!";
  }
}

TheServer.java :

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;

class TheServer implements Callable<String> {
  private TheServer() {}
  public TheServer(SocketAddress address, int size) {
    this.size = size;
    this.to = new byte[size];
    this.serverAddress = address;
  }
  private int           size;
  private byte[]        to;
  private SocketAddress serverAddress;

  public String call() throws IOException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open().bind(serverAddress);
    System.out.println("SERVER: Awaiting connection...");
    InputStream clientSocketInputStream = serverChannel.accept().socket().getInputStream();
    System.out.println("SERVER: Connection established. Attempting to read " + size + " bytes.");
    for (int i = 0; i < size; ++i) {
      to[i] = (byte) clientSocketInputStream.read();
    }
    System.out.println("SERVER: Read completed.");
    return "SERVER: Success!";
  }
}

最佳答案

我相信答案就在 WritableByteChannel.write文档:

Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.

所以看起来你需要使用write的返回值来找出已经写入了多少,并在未全部写入时处理这种情况。描述中不清楚的是如何处理这种情况 - 例如,您可能会发现需要进行一些调度才能在套接字输出缓冲区耗尽时继续写入。

关于java - 为什么这个非阻塞IO调用会失败呢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41935902/

相关文章:

java - 如何通过聊天应用程序的套接字将字符串安全地编码为字节

java - 无法使用 Eclipse 从 java 控制台读取值

java - java中什么是阻塞模式

Java 套接字 : can I write a TCP server with one thread?

java - "contains()"方法如何在 List<string> 上真正起作用?

sockets - 使用命令行停止 mosquitto (MQTT) 代理监听端口

java - 如何告诉 gradle run 使用自定义工作目录?

c - Dart RawSockets 有什么用?

java - OutOfMemoryError 和 ObservableList 进入 ComboBox 的问题

java - JAXB 抛出异常,指出属性 "retainReferenceToInfo"无效