java - 使用 Java NIO 的非阻塞服务器

标签 java tcp nio

<分区>

我正在使用 this构建没有可写部分的 java nio 服务器的教程。

一切正常,除了一件有趣的事情:

  • 当客户端发送数据包太快时,服务器不会收到所有消息,服务器总是收到第一个和第二个数据包,但不会多于此。
  • 如果客户端发送数据包的速度很慢,服务器会收到所有数据包。

有什么想法吗?

我正在添加服务器类代码,如果您需要下面代码中提到的另一个类,我在这里 :)。

NIOServer 类:

package server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

import javax.xml.parsers.ParserConfigurationException;

import org.xml.sax.SAXException;

public class NioServer implements Runnable {



// The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  //the cach will hundle the messages that came
  private Cache cache;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException {
    this.cache = cache;
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }


  private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
      }

  private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        socketChannel.register(this.selector, SelectionKey.OP_READ);
      }

  private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
          numRead = socketChannel.read(this.readBuffer);
          String test = new String(this.readBuffer.array());
          System.out.print(test);

        } catch (IOException e) {
          // The remote forcibly closed the connection, cancel
          // the selection key and close the channel.
        //  key.cancel();
        //  socketChannel.close();
          return;
        }

        if (numRead == -1) {
          // Remote entity shut the socket down cleanly. Do the
          // same from our end and cancel the channel.
          key.channel().close();
          key.cancel();
          return;
        }

        // Hand the data off to our worker thread
        this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
      }

  public void run() {
        while (true) {
          try {
            // Wait for an event one of the registered channels

            this.selector.select();



            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
              SelectionKey key = (SelectionKey) selectedKeys.next();
              selectedKeys.remove();

              if (!key.isValid()) {
                continue;
              }

              // Check what event is available and deal with it
              if (key.isAcceptable()) {
                this.accept(key);
              } else if (key.isReadable()) {
                this.read(key);
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

  public static void main(String[] args) throws ParserConfigurationException, SAXException {
    try {
        Cache cache = new Cache();
        new Thread(cache).start();
      new Thread(new NioServer(null, 9090,cache)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

最佳答案

如果您正在阅读 UDP,我希望如此。请注意您在 read 方法上处理数据包的速度有多慢。您正在将它们打印到 system.out,这非常慢,而且不确定您能够以多快的速度将数据处理到 processData 方法上的另一个线程。 This library如果这是您滞后的根源,我写的可以帮助您进行线程间非阻塞通信。您还应该检查底层读取套接字缓冲区的大小。它越大,在数据包开始丢失之前,您必须快速 catch 的空间就越大。对于 TCP,如果底层套接字缓冲区已满,您可能会在 channel 上收到 IOException。对于 UDP,数据包会被静默丢弃。

要访问底层读取套接字缓冲区大小,您可以执行以下操作:

final Socket socket = channel.socket();
System.out.println(socket.getReceiveBufferSize());
socket.setReceiveBufferSize(newSize);

注意:据我所知,Linux 可能需要一些操作系统配置才能让您更改底层缓冲区大小。如果 setReceiveBufferSize 没有效果(再次阅读以查看它是否已更改),请搜索它。 :)

关于java - 使用 Java NIO 的非阻塞服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10848742/

相关文章:

java - getCanonicalPath 和 toRealPath 之间的区别

java - 客户端在 websocket 中一段时间​​后自动断开连接

java - JSP - 列表输出

java - 如何在 onNavigationItemSelected 中退出 Google+

java - 日期之间的差异返回奇怪的结果

c++ - 如何在不同的线程中执行 QTcpSocket?

sockets - java.net.ConnectException : Connection refused: connect 异常

linux - 双向套接字到 TCP 通信

java - 向多个客户端发送文件数据?

java - netty分配的direct buffer的内存在哪里,内核空间还是用户空间?