TCP 和服务器套接字中的 Java 线程

标签 java multithreading sockets serversocket

<分区>

我写了下面两行

ServerSocket mobCom = new ServerSocket(9846);
Socket server = mobCom.accept();

我想创建一个新的 TCP 连接,并由一个新线程处理该连接。例如上面的代码创建了一个服务器套接字。并且有多个客户端。每当客户端连接到服务器时,可能会创建一个新线程来满足来自该特定客户端的请求。我该如何实现。

编辑

我还想将一个线程池限制为 10 个用户。如果出现更多用户,我想向他们发送一条错误消息,而不处理他们的进一步请求。

最佳答案

您可以使用 java util 并发的 SynchronousQueue 获得预期的结果. 创建固定数量的 worker 。使用 take 调用启动对 SynchronousQueue 的 block 读取。因此,如果所有工作人员都各自完成一项工作并忙于处理它们(与套接字通信),则不会从 SynchronousQueue 读取任何内容,因此向同步队列的offer 将失败。检查此失败(这意味着所有固定数量的工作人员都在忙,现在没有人锁定队列),拒绝下一个请求。

以下行中的示例代码 [未经测试 - 为简洁起见避免异常,请根据您的需要进行修改]。

public class BoundedServer 
{
    public static void main(String[] args) 
    {
        /**
         * Port to serve
         */
        final int port = 2013;

        /**
         * Max Workers
         */
        final int maxworkers = 10; 

        /**
         * The server socket.
         */
        ServerSocket mServerSocket = null;

        /**
         * Queue of work units to process if there is a worker available.
         */
        final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>();

        /**
         * Queue of work units to reject if there is no current worker available.
         */
        final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>(); 

        /**
         * A thread pool to handle the work.
         */
        final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers);

        /**
         * Let a single thread take care of rejecting the requests when needed to do so.
         */
        final ExecutorService rejectionservice = Executors.newSingleThreadExecutor();

        try 
        {
            Runnable communicationlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        /**
                         * Set of workers to handle the work.
                         */
                        final CommunicationWorker[] workers = new CommunicationWorker[maxworkers];

                        communicationservice.invokeAll(Arrays.asList(workers));
                    }
                    finally
                    {
                        communicationservice.shutdown();
                    }
                }
            };

            new Thread(communicationlauncher).start();

            Runnable rejectionlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject);

                        rejectionservice.submit(rejectionworker);
                    }
                    finally
                    {
                        rejectionservice.shutdown();
                    }
                }
            };
            new Thread(rejectionlauncher).start();

            mServerSocket = new ServerSocket(port);

            while(true)
            {
                WorkUnit work = new WorkUnit(mServerSocket.accept());

                if(!mQueueToProcess.offer(work))
                {
                    mQueueToReject.add(work);
                }
            }
        } 
        finally
        {
            try
            {
                mServerSocket.close();
            }
        }
    }
}


public class WorkUnit 
{
    private Socket mSocket = null;

    public WorkUnit(Socket socket) 
    {
        super();
        this.setSocket(socket);
    }

    public Socket getSocket() {
        return mSocket;
    }

    public void setSocket(Socket mSocket) {
        this.mSocket = mSocket;
    }
}

public class CommunicationWorker 
implements Callable<Boolean> 
{
    private SynchronousQueue<WorkUnit> mQueueToProcess;

    public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess) 
    {
        super();
        this.mQueueToProcess = queueToProcess;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToProcess.take();

            Socket socket = work.getSocket();

            // Code to handle socket communication and closure.
            // Once the communication is finished, this thread will get blocked to mQueueToProcess.
        }
    }
}


public class RejectionWorker 
implements Callable<Boolean> 
{
    private LinkedBlockingQueue<WorkUnit> mQueueToReject;

    public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject) 
    {
        super();
        this.mQueueToReject = queueToReject;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToReject.take();

            Socket socket = work.getSocket();

            // Code to reject the request.
        }
    }
}

关于TCP 和服务器套接字中的 Java 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15666287/

相关文章:

Java - 4 线程在两个同步方法中操作相同的对象数据

c# - 这个非锁定的 TryGetValue() 字典访问线程安全吗?

java 如何使用socket获得更快的速度

java - 如何通过 byte[] 流读取多个文件的 Socket InputStream?

delphi - 从Windows应用程序向服务应用程序发送大量数据

java - LIBGDX "parsing error emitter"具有 2 个或更多发射器

java - JTable 单元格内的 JDateChooser Enter 键并不总是有效

java - 与 Jackcess 匹配的列数据子字符串

java - 比较 Hibernate Criteria 中的两列

c# - 试图理解 C# 中的多线程