Java BlockingQueue 在 take() 上阻塞,略有不同

标签 java concurrency blockingqueue

我遇到的情况是有 2 个阻塞队列。首先,我插入一些我执行的任务。当每个任务完成时,它会将一个任务添加到第二个队列,并在其中执行它们。

所以我的第一个队列很简单:我只需检查以确保它不为空并执行,否则我中断():

public void run() {
    try {
        if (taskQueue1.isEmpty()) {
            SomeTask task = taskQueue1.poll();
            doTask(task);
            taskQueue2.add(task);
        }
        else {
            Thread.currentThread().interrupt();
        }
    }

    catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

第二个我执行以下操作,正如您所知,这不起作用:

public void run() {
    try {
        SomeTask2 task2 = taskQueue2.take();
        doTask(task2);
    }

    catch (InterruptedException ex) {

    }
    Thread.currentThread().interrupt();

}

您将如何解决这个问题,以便第二个 BlockingQueue 不会在 take() 上阻塞,但仅在它知道没有更多项目要添加时才完成。如果第二个线程可以看到第一个阻塞队列,并检查它是否为空并且第二个队列也为空,那么它会中断,那就太好了。

我也可以使用 Poison 对象,但更喜欢其他东西。

注意:这不是确切的代码,只是我在这里写的一些内容:

最佳答案

听起来好像处理第一个队列的线程知道,一旦队列耗尽,就不会再有任何任务了。这听起来很可疑,但无论如何我都会相信你的话并提出解决方案。

定义 AtomicInteger对两个线程都可见。将其初始化为正数

按如下方式定义第一个线程的操作:

  • 循环播放Queue#poll() .
  • 如果Queue#poll()返回null,调用AtomicInteger#decrementAndGet()在共享整数上。
    • 如果AtomicInteger#decrementAndGet()返回零,通过 Thread#interrupt() 中断第二个线程。 (这处理没有元素到达的情况。)
    • 无论哪种情况,都退出循环。
  • 否则,处理提取的项目,调用AtomicInteger#incrementAndGet()在共享整数上,将提取的项目添加到第二个线程的队列中,然后继续循环。

按如下方式定义第二个线程的操作:

  • BlockingQueue#take() 上的循环阻塞.
  • 如果BlockingQueue#take()抛出InterruptedException ,捕获异常,调用Thread.currentThread().interrupt() ,然后退出循环。
  • 否则,处理提取的项目。
  • 调用 AtomicInteger#decrementAndGet()在共享整数上。
    • 如果AtomicInteger#decrementAndGet()返回零,退出循环。
    • 否则,继续循环。

在尝试编写实际代码之前,请确保您理解这个想法。约定是第二个线程继续等待队列中的更多项目,直到预期任务的计数达到零。此时,生产线程(第一个线程)将不再将任何新项目推送到第二个线程的队列中,因此第二个线程知道可以安全地停止为其队列提供服务。

没有任务到达第一个线程的队列时,就会出现奇怪的情况。由于第二个线程仅在处理项目后递减并测试计数,因此如果它永远没有机会处理任何项目,则它永远不会考虑停止。我们使用线程中断来处理这种情况,但代价是第一个线程的循环终止步骤中的另一个条件分支。幸运的是,该分支只会执行一次。

有很多设计可以在这里发挥作用。我只是描述了一种仅引入了一个附加实体(共享原子整数)的方法,但即便如此,它仍然很繁琐。我认为使用毒丸会更干净,尽管我承认这两者都不是 Queue#add()也不BlockingQueue#put()接受 null 作为有效元素(由于 Queue#poll() 的返回值约定)。否则很容易将 null 用作毒丸

关于Java BlockingQueue 在 take() 上阻塞,略有不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8720766/

相关文章:

java - 指令重新排序如何导致并发问题

java - 如何让 ThreadPoolExecutor 在排队之前将线程增加到最大值?

Java,阻塞队列

java.util.concurrent.LinkedBlockingQueue 不是 FIFO 吗?

java - Android 单线程并发

java - 嵌套异步任务(在java中使用CompletionStage)

java - QueryDSL - 涉及关节的总和

java - 我的实体加载速度超慢有什么问题?

java - 在命中 '/login' url 之前未检测到用户已登录

java - 友好数字函数给出了错误的结果