我创建了一个扩展 ArrayBlockingQueue 的 CloseableBlockingQueue:
private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;
public CloseableBlockingQueue(int queueLength) {
super(queueLength);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public boolean offer(E e) {
return closed ? true : super.offer(e);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public void put(E e) throws InterruptedException {
if (!closed) {
super.put(e);
}
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public E poll() {
return closed ? null : super.poll();
}
/***
* Shortcut to do nothing if closed.
* @throws InterruptedException
*/
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
return closed ? null : super.poll(l, tu);
}
/***
* Mark me as closed and clear the queue.
*/
void close() {
closed = true;
// There could be more threads blocking on my queue than there are slots
// in the queue. We therefore MUST loop.
do {
// so keep clearing
clear();
/* Let others in ... more specifically, any collectors blocked on the
* queue should now unblock and finish their put.
*
* Subsequent puts should shortcut but it is the waiting ones I need
* to clear.
*/
Thread.yield();
/* Debug code.
// Did yield achieve?
if (!super.isEmpty()) {
* This DOES report success.
log("! Thread.yield() successful");
}
*
*/
// Until all are done.
} while (!super.isEmpty());
}
/***
* isClosed
*
* @return true if closed.
*/
boolean isClosed() {
return closed;
}
}
我关心的是 close 方法,它试图恢复队列中阻塞的任何线程。我使用 Thread.yield() 来尝试这样做,但我看到的引用资料表明这种技术可能并不总是有效,因为不能保证任何其他被阻塞的线程会在 yield 期间被唤醒。
队列用于将多个线程的输出集中到单个流中。喂它的线程很容易比队列中的槽多得多,因此队列很可能已满,并且在关闭时有几个线程阻塞在队列上。
我欢迎您的想法。
已添加
感谢下面 Tom 的建议,我已重构为:
- 保存所有可能阻塞的线程的集合。
- 关闭时,中断所有这些。
顺便说一句:由于线程集合主要用于添加对象并几乎立即删除同一对象,因此我从 http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm 获取了 Doug Lea 令人印象深刻的 ConcurrentDoublyLinkedList 的副本。并添加了一些方法来允许我保留添加的节点。删除操作的复杂度应该是 O(1) 而不是 O(n)。
保罗
最佳答案
我认为yield()根本不会影响队列中阻塞的线程。
如果您可以跟踪等待线程(考虑到您正在包装阻塞方法,应该很简单)。当你关闭时,你可以调用它们的interrupt()。
查看这个问题:ArrayBlockingQueue - How to "interrupt" a thread that is wating on .take() method
关于java - Thread.yield() 有更好的解决方案吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7901114/