java - 优雅地结束在阻塞队列中等待的线程

标签 java multithreading blockingqueue

我在作为学术练习而构建的多线程服务器时遇到问题,更具体地说,是让连接正常关闭。

每个连接都由一个 Session 类管理。此类为连接维护 2 个线程,一个 DownstreamThread 和一个 UpstreamThread。

UpstreamThread 在客户端套接字上阻塞并将所有传入的字符串编码为消息以传递给另一层进行处理。 DownstreamThread 阻塞在 BlockingQueue 上,客户端的消息插入到该队列中。当队列中有消息时,Downstream 线程将消息从队列中取出,将其转换为字符串并发送给客户端。在最终的系统中,应用程序层将对传入消息进行操作并将传出消息推送到服务器以发送到适当的客户端,但现在我只有一个简单的应用程序,它在传入消息上 hibernate 一秒钟,然后将其回显作为附加时间戳的传出消息。

我遇到的问题是当客户端断开连接时让整个事情正常关闭。我要解决的第一个问题是正常断开连接,客户端让服务器知道它正在使用 QUIT 命令结束连接。基本伪代码是:

while (!quitting) {
    inputString = socket.readLine () // blocks
    if (inputString != "QUIT") {
        // forward the message upstream
        server.acceptMessage (inputString);
    } else {
        // Do cleanup
        quitting = true;
        socket.close ();
    }
}

上游线程的主循环查看输入字符串。如果它是 QUIT,则线程设置一个标志,表示客户端已结束通信并退出循环。这导致上游线程很好地关闭。

只要未设置连接关闭标志,下游线程的主循环就会等待 BlockingQueue 中的消息。当它是,下游线程也应该终止。然而,它没有,它只是坐在那里等待。它的伪代码如下所示:
while (!quitting) {
    outputMessage = messageQueue.take (); // blocks
    sendMessageToClient (outputMessage);
}

当我对此进行测试时,我注意到当客户端退出时,上游线程关闭,但下游线程没有。

经过一番摸索,我意识到下游线程仍在 BlockingQueue 上阻塞,等待永远不会到来的传入消息。上游线程不会在链上进一步转发 QUIT 消息。

如何让下游线程优雅地关闭?想到的第一个想法是在 take() 调用上设置超时。不过我不太热衷于这个想法,因为无论你选择什么值,它都一定不会完全令人满意。要么它太长并且僵尸线程在关闭之前在那里停留了很长时间,要么它太短并且已经空闲了几分钟但仍然有效的连接将被杀死。我确实想过将 QUIT 消息发送到链上,但这需要它进行完整的往返到服务器,然后是应用程序,然后再次返回到服务器,最后到 session 。这似乎也不是一个优雅的解决方案。

我确实查看了 Thread.stop() 的文档,但这显然已被弃用,因为它无论如何都不能正常工作,所以看起来它也不是一个真正的选择。我的另一个想法是强制以某种方式在下游线程中触发异常,并让它在 finally block 中进行清理,但这让我觉得这是一个可怕而笨拙的想法。

我觉得两个线程都应该能够自己优雅地关闭,但我也怀疑如果一个线程结束,它还必须以更主动的方式通知另一个线程结束,而不是简单地设置一个标志让另一个线程检查。由于我对 Java 的经验还不是很丰富,所以在这一点上我很没有想法。如果有人有任何建议,将不胜感激。

为了完整起见,我在下面包含了 Session 类的真实代码,尽管我相信上面的伪代码片段涵盖了问题的相关部分。全类约250行。
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;

/**
 * Session class
 * 
 * A session manages the individual connection between a client and the server. 
 * It accepts input from the client and sends output to the client over the 
 * provided socket.
 * 
 */
public class Session {
    private Socket              clientSocket    = null;
    private Server              server          = null;
    private Integer             sessionId       = 0;
    private DownstreamThread    downstream      = null;
    private UpstreamThread      upstream        = null;
    private boolean             sessionEnding   = false;

    /**
     * This thread handles waiting for messages from the server and sending
     * them to the client
     */
    private class DownstreamThread implements Runnable {
        private BlockingQueue<DownstreamMessage>    incomingMessages    = null;
        private OutputStreamWriter                  streamWriter        = null;
        private Session                             outer               = null;

        @Override
        public void run () {
            DownstreamMessage message;
            Thread.currentThread ().setName ("DownstreamThread_" + outer.getId ());

            try {
                // Send connect message
                this.sendMessageToClient ("Hello, you are client " + outer.getId ());

                while (!outer.sessionEnding) {
                    message = this.incomingMessages.take ();
                    this.sendMessageToClient (message.getPayload ());
                }

                // Send disconnect message
                this.sendMessageToClient ("Goodbye, client " + getId ());

            } catch (InterruptedException | IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            } finally {
                this.terminate ();
            }
        }

        /**
         * Add a message to the downstream queue
         * 
         * @param message
         * @return
         * @throws InterruptedException 
         */
        public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
            if (!outer.sessionEnding) {
                this.incomingMessages.put (message);
            }

            return this;
        }

        /**
         * Send the given message to the client
         * 
         * @param message
         * @throws IOException 
         */
        private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
            OutputStreamWriter osw;
            // Output to client
            if (null != (osw = this.getStreamWriter ())) {
                osw.write ((String) message);
                osw.write ("\r\n");
                osw.flush ();
            }

            return this;
        }

        /**
         * Perform session cleanup
         * 
         * @return 
         */
        private DownstreamThread terminate () {
            try {
                this.streamWriter.close ();
            } catch (IOException ex) {
                Logger.getLogger (DownstreamThread.class.getName ()).log (Level.SEVERE, ex.getMessage (), ex);
            }
            this.streamWriter   = null;

            return this;
        }

        /**
         * Get an output stream writer, initialize it if it's not active
         * 
         * @return A configured OutputStreamWriter object
         * @throws IOException 
         */
        private OutputStreamWriter getStreamWriter () throws IOException {
            if ((null == this.streamWriter) 
            && (!outer.sessionEnding)) {
                BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream ());
                this.streamWriter       = new OutputStreamWriter (os, "UTF8");
            }

            return this.streamWriter;
        }

        /**
         * 
         * @param outer 
         */
        public DownstreamThread (Session outer) {
            this.outer              = outer;
            this.incomingMessages   = new LinkedBlockingQueue ();
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * This thread handles waiting for client input and sending it upstream
     */
    private class UpstreamThread implements Runnable {
        private Session outer   = null;

        @Override
        public void run () {
            StringBuffer    inputBuffer = new StringBuffer ();
            BufferedReader  inReader;

            Thread.currentThread ().setName ("UpstreamThread_" + outer.getId ());

            try {
                inReader    = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream (), "UTF8"));

                while (!outer.sessionEnding) {
                    // Read whatever was in the input buffer
                    inputBuffer.delete (0, inputBuffer.length ());
                    inputBuffer.append (inReader.readLine ());
                    System.out.println ("Input message was: " + inputBuffer);

                    if (!inputBuffer.toString ().equals ("QUIT")) {
                        // Forward the message up the chain to the Server
                        outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString ()));
                    } else {
                        // End the session
                        outer.sessionEnding = true;
                    }
                }

            } catch (IOException | InterruptedException e) {
                Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
            } finally {
                outer.terminate ();
                outer.server.deleteSession (outer.getId ());
            }
        }

        /**
         * Class constructor
         * 
         * The Core Java volume 1 book said that a constructor such as this 
         * should be implicitly created, but that doesn't seem to be the case!
         * 
         * @param outer 
         */
        public UpstreamThread (Session outer) {
            this.outer  = outer;
            System.out.println ("Class " + this.getClass () + " created");
        }
    }

    /**
     * Start the session threads
     */
    public void run () //throws InterruptedException 
    {
        Thread upThread     = new Thread (this.upstream);
        Thread downThread   = new Thread (this.downstream);

        upThread.start ();
        downThread.start ();
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
        this.downstream.acceptMessage (message);
        return this;
    }

    /**
     * Accept a message to send to the client
     * 
     * @param message
     * @return
     * @throws InterruptedException 
     */
    public Session acceptMessage (String message) throws InterruptedException {
        return this.acceptMessage (new DownstreamMessage (this.getId (), message));
    }

    /**
     * Terminate the client connection
     */
    private void terminate () {
        try {
            this.clientSocket.close ();
        } catch (IOException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, e.getMessage (), e);
        }
    }

    /**
     * Get this Session's ID
     * 
     * @return The ID of this session
     */
    public Integer getId () {
        return this.sessionId;
    }

    /**
     * Session constructor
     * 
     * @param owner The Server object that owns this session
     * @param sessionId The unique ID this session will be given
     * @throws IOException 
     */
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {

        this.server         = owner;
        this.clientSocket   = clientSocket;
        this.sessionId      = sessionId;
        this.upstream       = new UpstreamThread (this);
        this.downstream     = new DownstreamThread (this);

        System.out.println ("Class " + this.getClass () + " created");
        System.out.println ("Session ID is " + this.sessionId);
    }
}

最佳答案

而不是调用 Thread.stop使用 Thread.interrupt .这将导致 take抛出 InterruptedException 的方法你可以用它来知道你应该关闭。

关于java - 优雅地结束在阻塞队列中等待的线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16325886/

相关文章:

java - Java Mobile 中的蓝牙 : Handling connections that go out of range

java - 努力从 Firebase 实时数据库中获取值

ios - 如何在 iOS 编程中保持方法执行的顺序?

java - 具有唯一 ID 的 ArrayBlockingQueue

java - Maven 从公司仓库中提取而不是公开可用

java - JDOQL 在搜索成员列表变量中是否存在一个或多个元素时,有什么方法可以避免查询中的多个 .contains() 调用吗?

c++ - 未收到 ZMQ 消息

Java:在创建 ConcurrentHashMap 对象时使用 synchronized(this) 是一种可取的做法吗?

java - 为什么线程池独立地而不是并发地执行任务?

java - BlockingQueue 中阻塞的线程