在我的应用程序中,有两个阶段,一个阶段下载一些大数据,另一个阶段进行操作。
所以我创建了2个实现可运行的类:ImageDownloader和ImageManipulator,它们共享一个downloadedBlockingQueue:
public class ImageDownloader implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private ArrayBlockingQueue<String> imgUrlsBlockingQueue;
public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;
}
@Override
public void run() {
while (!this.imgUrlsBlockingQueue.isEmpty()) {
try {
String imgUrl = this.imgUrlsBlockingQueue.take();
ImageBean imageBean = doYourThing(imgUrl);
this.downloadedImagesBlockingQueue.add(imageBean);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ImageManipulator implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private AtomicInteger capacity;
public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
AtomicInteger capacity) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.capacity = capacity;
}
@Override
public void run() {
while (capacity.get() > 0) {
try {
ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
capacity.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
// ....
}
}
}
public class Main {
public static void main(String[] args) {
String[] imageUrls = new String[]{"url1", "url2"};
int capacity = imageUrls.length;
ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);
ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
downloaderExecutor.execute(worker);
}
downloaderExecutor.shutdown();
ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
manipulatorExecutor.execute(worker);
}
manipulatorExecutor.shutdown();
while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
}
}
}
发生死锁的原因是这种情况:
t1检查其容量1。
t2检查其1。
t3检查其1。
t2占用,将容量设置为0,继续流动并最终退出。
现在,t1和t3处于死锁状态,因为将不会添加任何内容到downloadedImagesBlockingQueue。
最终,我想要这样的事情:当容量达到&&时,队列为空=打破“while”循环,并优雅地终止。
将“设置为空”设置为唯一条件将不起作用,这是因为开始时它是空的,直到某些ImageDownloader将imageBean放入队列中为止。
最佳答案
您可以采取一些措施来防止死锁:
LinkedBlockingQueue
offer
添加到不阻止drainTo
或poll
从队列中取出不会阻止您可能还需要考虑一些技巧:
ThreadPool
:final ExecutorService executorService = Executors.newFixedThreadPool(4);
ThreadPool
,则可以在将数据添加到与ThreadPool
的大小相对应的队列中完成添加后添加"poison pill",并在poll
时将其检查使用
ThreadPool
就像这样简单: final ExecutorService executorService = Executors.newFixedThreadPool(4);
final Future<?> result = executorService.submit(new Runnable() {
@Override
public void run() {
}
});
还有一个鲜为人知的
ExecutorCompletionService
抽象了整个过程。更多信息here。
关于java - Java生产者使用者ArrayBlockingQueue在take()上发生死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37737832/