java - 限制java套接字服务器中的连接数

标签 java multithreading sockets serversocket threadpoolexecutor

我创建了一个Java套接字服务器,它在指定端口上创建一个套接字服务器,然后生成一个RecordWriter对象来对从每个连接获得的数据流执行一些操作。

我使用端口 61000 和 numthreads 2 启动程序。 我还启动了 3 个客户端来连接它。 在客户端,我可以看到所有 3 个都连接到接收器,但是接收器日志表明只有其中两个连接。

netstat -an|grep 61000|grep -i 已建立

表示客户端和服务器在同一台计算机上运行时总共有 6 个连接。 我的疑问是:

  1. 为什么客户端日志第三次显示它可以连接到 61000 上的程序,而我正在使用 2 个积压线程。另外,Executors.newFixedThreadPool(numThreads); 只允许 2 个线程要连接的客户端。
  2. 虽然 server.accept 发生在 MyWriter.java 中,并且日志中没有指示第三个客户端可以连接,但为什么 netstat 将其显示为已建立的连接

这是我的代码:

MyReceiver.java

package com.vikas;

import java.net.ServerSocket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MyReceiver{

    protected int serverPort = -1;
    protected int numThreads = -1;

    protected boolean isStopped = false;
    protected Thread runningThread = null;
    protected ExecutorService threadPool = null;

    protected static Logger logger = LogManager.getLogger(MyReceiver.class);
    protected static ServerSocket serverSocket = null;
    protected static Map<String, String> mapConnections = new ConcurrentHashMap<String, String>();

    public MyReceiver(int port){
        this.serverPort = port;
    }

    public void run(int numThreads){
        this.threadPool = Executors.newFixedThreadPool(numThreads);

        try {
            logger.info("Starting server on port " + this.serverPort);
            MyReceiver.serverSocket = new ServerSocket(this.serverPort, numThreads);
        } catch (IOException e) {
            //throw new RuntimeException("Cannot open port " + this.serverPort, e);
            logger.error("Cannot open port " + this.serverPort, e);
        }

        while(!isStopped()){
            this.threadPool.execute(new MyWriter());
        }


        if(MyReceiver.mapConnections.isEmpty()){
            this.threadPool.shutdown();
            //System.out.println("Server Stopped after shutdown.") ;
            logger.info("Server Stopped after shutdown.");
        }
    }


    public synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop(){
        this.isStopped = true;
        try {
            MyReceiver.serverSocket.close();
        } catch (IOException e) {
            //throw new RuntimeException("Error closing server", e);
            logger.error("Error closing server", e);
        }
    }

    public static void main(String[] args) {
        if(args.length != 2){
            System.out.println("Number of input arguements is not equal to 4.");
            System.out.println("Usage:  java -cp YOUR_CLASSPATH -Dlog4j.configurationFile=/path/to/log4j2.xml com.vikas.MyReceiver  <port>  <number of threads>");
            System.out.println("java -cp \"$CLASSPATH:./MyReceiver.jar:./log4j-api-2.6.2.jar:./log4j-core-2.6.2.jar\" -Dlog4j.configurationFile=log4j2.xml com.vikas.MyReceiver  61000 2");
        }
        int port = Integer.parseInt(args[0].trim());
        int numThreads = Integer.parseInt(args[1].trim());

        final MyReceiver myConnection = new MyReceiver(port, topic, brokers);
        myConnection.run(numThreads);

        /*Thread t = new Thread(myConnection);
        t.start();*/


        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            //e.printStackTrace();
            logger.error("Something went wrong", e);
        }
        //System.out.println("Stopping Server");
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
            @Override
            public void run()
            {
                logger.info("SocketServer - Receive SIGINT!!!");
                logger.info("Stopping Server");

                if(!myConnection.isStopped()){
                    myConnection.stop();
                }
                logger.info("Server Stopped successfully");

                try
                {
                    Thread.sleep(1000);
                }
                catch (Exception e) {}
            }
        });
        //myConnection.stop();
    }
}

MyWriter.java

package com.vikas;

import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.Socket;
import java.util.Properties;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;



public class MyWriter implements Runnable{

    protected String topic = null;
    protected String brokers = null;

    protected static Logger logger = LogManager.getLogger(MyWriter.class);

    public MyWriter () {

    }


    public void run() {
        while(!MyReceiver.serverSocket.isClosed()){
            Socket server = null;
            try {
                server = MyReceiver.serverSocket.accept();
                //System.out.println("Just connected to " + server.getRemoteSocketAddress());
                logger.info("Just connected to " + server.getRemoteSocketAddress());
                MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");

                //change for prod deployment //change implemented
                String key = null;
                String message = null;

                char ch;
                StringBuilder msg = new StringBuilder();
                int value = 0;


                try {
                    BufferedReader in = new BufferedReader(new InputStreamReader(server.getInputStream())); 
                    while((value = in.read()) != -1){
                        ch = (char)value;
                        if(ch == 0x0a){
                            //msg.append(ch);
                            //System.out.println(msg);

                            message = msg.toString().trim();

                            //code change as part of testing in prod
                            if(message.length() != 0){
                                //do something
                                msg.setLength(0);
                            }
                            else{
                                logger.error("Blank String received");
                                msg.setLength(0);
                            }
                        }
                        else{
                            msg.append(ch);
                        }
                    }
                    logger.info("Closing connection for client :" + server.getRemoteSocketAddress());
                    //System.out.println("Closing connection for client :" + this.getClientSocket().getRemoteSocketAddress());
                    server.close();
                    MyReceiver.mapConnections.remove(server.getRemoteSocketAddress());
                } catch (IOException e) {
                    //report exception somewhere.
                    //e.printStackTrace();
                    logger.error("Something went wrong!!", e);
                }
                finally{
                    producer.close();
                }

            } catch (IOException e) {
                if(MyReceiver.serverSocket.isClosed()) {
                    //System.out.println("Server was found to be Stopped.");
                    logger.error("Server was found to be Stopped.");
                    logger.error("Error accepting client connection", e);
                    break;
                }
            }           
        }   
    }
}

最佳答案

ServerSocket 构造函数的 backlog 参数限制传入连接队列的大小,而不是允许成功调用 accept() 的总次数。如果您想限制 Activity 连接的数量,则需要跟踪已接受的连接数量,然后当您达到阈值时,不要再次调用 accept(),直到至少满足以下条件之一: Activity 连接已关闭。

while(!MyReceiver.serverSocket.isClosed()){
        Socket server = null;
        try {
            server = MyReceiver.serverSocket.accept();
            //System.out.println("Just connected to " + server.getRemoteSocketAddress());
            logger.info("Just connected to " + server.getRemoteSocketAddress());
            MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");

            if (activeConnections == maxConnections) break; // exit accept loop

关于java - 限制java套接字服务器中的连接数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39032639/

相关文章:

java - 使用 MongoDB 更新数组

java - 用于 Java Swing 应用程序的 UI 自动化工具,具有记录和回放以及屏幕捕获功能

c# - 异步/同步和 TCP 或 UDP

node.js 为机器人聊天创建 'server side socket' (聊天脚本)

java - 正则表达式是否必须是 "pattern/calculation"而不是完全匹配?

java - Spring MVC中无法通过ajax调用绑定(bind)ModelMap

java - 我如何在Android中停止线程

multithreading - 在多个 qt 线程中使用单个 QSqlDatabase 连接

java - 线程在运行完成时结束?

java - Android socket原理及异常