我遇到的情况是有 2 个阻塞队列。首先,我插入一些我执行的任务。当每个任务完成时,它会将一个任务添加到第二个队列,并在其中执行它们。
所以我的第一个队列很简单:我只需检查以确保它不为空并执行,否则我中断():
public void run() {
try {
if (taskQueue1.isEmpty()) {
SomeTask task = taskQueue1.poll();
doTask(task);
taskQueue2.add(task);
}
else {
Thread.currentThread().interrupt();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
第二个我执行以下操作,正如您所知,这不起作用:
public void run() {
try {
SomeTask2 task2 = taskQueue2.take();
doTask(task2);
}
catch (InterruptedException ex) {
}
Thread.currentThread().interrupt();
}
您将如何解决这个问题,以便第二个 BlockingQueue 不会在 take() 上阻塞,但仅在它知道没有更多项目要添加时才完成。如果第二个线程可以看到第一个阻塞队列,并检查它是否为空并且第二个队列也为空,那么它会中断,那就太好了。
我也可以使用 Poison 对象,但更喜欢其他东西。
注意:这不是确切的代码,只是我在这里写的一些内容:
最佳答案
听起来好像处理第一个队列的线程知道,一旦队列耗尽,就不会再有任何任务了。这听起来很可疑,但无论如何我都会相信你的话并提出解决方案。
定义 AtomicInteger
对两个线程都可见。将其初始化为正数。
按如下方式定义第一个线程的操作:
- 循环播放
Queue#poll()
. - 如果
Queue#poll()
返回null,调用AtomicInteger#decrementAndGet()
在共享整数上。- 如果
AtomicInteger#decrementAndGet()
返回零,通过Thread#interrupt()
中断第二个线程。 (这处理没有元素到达的情况。) - 无论哪种情况,都退出循环。
- 如果
- 否则,处理提取的项目,调用
AtomicInteger#incrementAndGet()
在共享整数上,将提取的项目添加到第二个线程的队列中,然后继续循环。
按如下方式定义第二个线程的操作:
BlockingQueue#take()
上的循环阻塞.- 如果
BlockingQueue#take()
抛出InterruptedException
,捕获异常,调用Thread.currentThread().interrupt()
,然后退出循环。 - 否则,处理提取的项目。
- 调用
AtomicInteger#decrementAndGet()
在共享整数上。- 如果
AtomicInteger#decrementAndGet()
返回零,退出循环。 - 否则,继续循环。
- 如果
在尝试编写实际代码之前,请确保您理解这个想法。约定是第二个线程继续等待队列中的更多项目,直到预期任务的计数达到零。此时,生产线程(第一个线程)将不再将任何新项目推送到第二个线程的队列中,因此第二个线程知道可以安全地停止为其队列提供服务。
当没有任务到达第一个线程的队列时,就会出现奇怪的情况。由于第二个线程仅在处理项目后递减并测试计数,因此如果它永远没有机会处理任何项目,则它永远不会考虑停止。我们使用线程中断来处理这种情况,但代价是第一个线程的循环终止步骤中的另一个条件分支。幸运的是,该分支只会执行一次。
有很多设计可以在这里发挥作用。我只是描述了一种仅引入了一个附加实体(共享原子整数)的方法,但即便如此,它仍然很繁琐。我认为使用毒丸会更干净,尽管我承认这两者都不是 Queue#add()
也不BlockingQueue#put()
接受 null 作为有效元素(由于 Queue#poll()
的返回值约定)。否则很容易将 null 用作毒丸。
关于Java BlockingQueue 在 take() 上阻塞,略有不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8720766/