JAVA NIO服务器: how to reset all connections

标签 java sockets nio

我必须在 JBoss 中构建一个 JAVA Nio 服务器应用程序来从 10-200 个传感器盒读取数据。他们打开一个流并一直向我发送数据。通信是双向的。现在,有时可能会发生这些盒子(或服务器)有一些内部错误。为了检测此类问题,观察者线程每 5 秒检查一次,自上次检查以来是否有数据 block 进入。如果在那之前我的盒子都没有发送数据,那么就发生了一些不好的事情,我想重新启动整个套接字通信。

现在,如何与 NIO 建立套接字连接已有详细记录,但很难找到如何清理重置它们的复杂示例。这是我的问题:当我的看门狗检测到过去 5 秒内没有数据传入时,它会调用 close(),然后调用 startEngine()。但之后仍然没有数据到达。有些东西似乎被阻止了,某些资源仍然关联或类似的。如果我重新启动 JBoss,数据就会再次到达。有人可以给我提示吗?

感谢您的宝贵时间! 斯特凡

public class TestServer 
{
  private NIOServer server;
  private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>();

  class NIOServer extends Thread 
  {
        class MessageBuffer
        {
              int [] msgAsByte = new int[msgSize];
              int pos = 0;
              int lastSign = 0;                                    
              int bytesRead = 0;
        }
        private ByteBuffer readBuffer = ByteBuffer.allocate(256);
        private Selector selector;
        private boolean stop = false;
        private int[] ports;
        private int msgSize = 48;
        private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>();

        private List<ServerSocketChannel> channels;
        // Maps a SocketChannel to a list of ByteBuffer instances
        private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>();

        public NIOServer(int[] ports) {
              this.ports = ports;
        }

        private void stopAll()
        {
              stop = true;

              try 
              {
                    server.interrupt();
                    server.join(3000);
              } 
              catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
              }
              closeConnections();
        }

        public void sendData(SocketChannel socket, byte[] data) 
        { 
              // And queue the data we want written
              synchronized (this.pendingDataToWrite) {
                    List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket);
                    if (queue == null) {
                          queue = new ArrayList<ByteBuffer>();
                          this.pendingDataToWrite.put(socket, queue);
                    }
                    queue.add(ByteBuffer.wrap(data));
              }

              SelectionKey key = socket.keyFor(this.selector);
              if(key != null)
                    key.interestOps(SelectionKey.OP_WRITE);
              // Finally, wake up our selecting thread so it can make the required changes
              this.selector.wakeup();
        }

        public void run() 
        {
              try
              {
                    stop = false;
                    selector = Selector.open();
                    channels = new ArrayList<ServerSocketChannel>();
                    ServerSocketChannel serverchannel;
                    for (int port : ports) 
                    {
                          try
                          {
                                serverchannel = ServerSocketChannel.open();
                                serverchannel.configureBlocking(false);
                                try
                                {
                                      serverchannel.socket().setReuseAddress(true);
                                }
                                catch(SocketException se)
                                {
                                      //
                                }
                                serverchannel.socket().bind(new InetSocketAddress(port));
                                serverchannel.register(selector, SelectionKey.OP_ACCEPT);
                                channels.add(serverchannel);
                          }
                          catch(Exception e)
                          {
                                //
                          }
                    }
                    while (!stop) 
                    {

                          SelectionKey key = null;
                          try 
                          {
                                selector.select();
                                Iterator<SelectionKey> keysIterator = selector.selectedKeys()
                                            .iterator();
                                while (keysIterator.hasNext()) 
                                {
                                      key = keysIterator.next();

                                      if(key.isValid())
                                      {
                                            if (key.isAcceptable()) 
                                            {
                                                  accept(key);
                                            } 
                                            else if (key.isReadable()) 
                                            {
                                                  readData(key);
                                            } 
                                            else if (key.isWritable()) 
                                            {
                                                  writeData(key);
                                            }
                                      }
                                      else
                                      {
                                            SocketChannel sc = (SocketChannel) key.channel(); 
                                      }
                                      keysIterator.remove();
                                }
                          }
                          catch ( Exception e) 
                          {
                                if(e instanceof IOException || e instanceof ClosedSelectorException)
                                {
                                      try
                                      {
                                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                                            channels.remove(ssc);
                                            ssc.close();
                                            key.cancel();
                                      }
                                      catch(Exception ex)
                                      {
                                            //
                                      }

                                }
                                else
                                {
                                      //
                                }
                          }
                    } 
              }
              catch(Exception e1)
              {
                    //
              }

              closeConnections();

        }

        private void closeConnections()
        {
              //if thread is stopped, close all 
              try
              {
                    try 
                    {
                          if(this.selector == null || this.selector.keys() == null)
                          {
                                log.debug("No selectors or keys found to close");
                          }
                          else
                          {
                                Iterator<SelectionKey> keys = this.selector.keys().iterator();
                                while(keys.hasNext()) 
                                {
                                      SelectionKey key = keys.next();
                                      key.cancel();
                                }
                          }
                    }
                    catch(Exception ex) {
                          //
                    }
                    if(selector != null)
                          selector.close();
                    if(channels != null)
                    {
                          for(ServerSocketChannel channel:channels)
                          {
                                channel.socket().close();
                                channel.close();
                          }
                    }

                    if(clientsList != null)
                    {
                          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator();
                          while(hfm.hasNext()) 
                          {
                                Map.Entry<String, SocketChannel> s = hfm.next();
                                s.getValue().close();
                          }
                    }
                    clientsList=null;

                    selector = null;
                    channels = null;
                    pendingDataToWrite = null;
              }
              catch(Exception e)
              {
                    //
              }

        }

        private void accept(SelectionKey key) throws IOException 
        {

              ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
              SocketChannel sc = ssc.accept();
              sc.configureBlocking(false);
              sc.register(selector, SelectionKey.OP_READ);

              String ip = sc.socket().getRemoteSocketAddress().toString();
              if(!buffer.containsKey(ip))
                    buffer.put(ip, new MessageBuffer());
        }

        private void readData(SelectionKey key) throws Exception
        {

              SocketChannel sc = (SocketChannel) key.channel();      

              MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString());
              try
              {
                    buf.bytesRead = sc.read(readBuffer); //read into buffer.
              }
              catch(Exception e2)
              {
                    sc.close();
                    buffer.remove(sc);
              }

              //close connection
              if (buf.bytesRead == -1)
              {
                    sc.close();
                    key.cancel();
                    return;
              }

              readBuffer.flip();      //make buffer ready for read

              while(readBuffer.hasRemaining())
              {
                    //Read the data and forward it to another Process...
              }

              readBuffer.compact(); //make buffer ready for writing

        }

        private void writeData(SelectionKey key) throws Exception
        {
              SocketChannel socketChannel = (SocketChannel) key.channel();
              synchronized (this.pendingDataToWrite) {
                    List queue = (List) this.pendingDataToWrite.get(socketChannel);

                    // Write until there's not more data ...
                    while (!queue.isEmpty()) {
                          ByteBuffer buf = (ByteBuffer) queue.get(0);
                          try
                          {
                                socketChannel.write(buf);
                          }
                          catch(Exception e)
                          {
                                //
                          }
                          finally
                          {
                                queue.remove(0);
                          }
                          if (buf.remaining() > 0) {
                                // ... or the socket's buffer fills up
                                break;
                          }
                    }

                    key.interestOps(SelectionKey.OP_READ);
              }
        }
  }



  public void close() {

        if (server != null && server.isAlive()) 
        {      
                    server.stopAll(); 
        }
        if(clientsList != null)
        {
              clientsList.clear();
        }
        server = null;

  }

  public void startEngine(int[] ports) {
        if (ports != null) {
              for (int port : ports)
                    log.info("Listening on port " + port);
              server= new NIOServer(ports);
              server.start();
        }
  }

}

最佳答案

使用 select() 超时。

如果超时,关闭所有已注册的SocketChannel

如果您想要更细粒度,请跟踪每个 channel 上的最后 I/O 时间,并关闭在每个 select() 循环底部过期的那些时间。

NB 您的 OP_WRITE 技术不正确。这里有很多答案展示了如何正确使用它。

关于JAVA NIO服务器: how to reset all connections,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45187090/

相关文章:

java nio 在符号链接(symbolic link)中迭代文件

java - E/AndroidRuntime : FATAL EXCEPTION: AsyncTask #1 java. lang.RuntimeException:执行 doInBackground() 时发生错误

java - Spring @ExceptionHandler 和 HttpMediaTypeNotAcceptableException

java - 无法从文件名中读取特殊字符

c# - 使用 TCP 时,我是否需要使用校验和来保护我的消息?

windows - 如何检测Windows中打开的套接字?

java - 使用java选择器唯一标识客户端

java - ViewModel 问题显示 MV VM 架构中的数据库更改

c - 获取传入套接字连接的源地址

Java WatchService 在观看映射驱动器时不生成事件