java - Java生产者使用者ArrayBlockingQueue在take()上发生死锁

标签 java multithreading blockingqueue

在我的应用程序中,有两个阶段,一个阶段下载一些大数据,另一个阶段进行操作。
所以我创建了2个实现可运行的类:ImageDownloader和ImageManipulator,它们共享一个downloadedBlockingQueue:

        public class ImageDownloader implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private ArrayBlockingQueue<String> imgUrlsBlockingQueue;

        public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {

            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;

        }

        @Override
        public void run() {
            while (!this.imgUrlsBlockingQueue.isEmpty()) {
                try {
                    String imgUrl = this.imgUrlsBlockingQueue.take();
                    ImageBean imageBean = doYourThing(imgUrl);
                    this.downloadedImagesBlockingQueue.add(imageBean);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

        public class ImageManipulator implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private AtomicInteger capacity;

        public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
                                AtomicInteger capacity) {
            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.capacity = capacity;
        }

        @Override
        public void run() {
            while (capacity.get() > 0) {
                try {
                    ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
                    capacity.decrementAndGet();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // ....
            }
        }
    }




    public class Main {
        public static void main(String[] args) {
            String[] imageUrls = new String[]{"url1", "url2"};
            int capacity = imageUrls.length;

            ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
            ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);

            ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
                downloaderExecutor.execute(worker);
            }
            downloaderExecutor.shutdown();

            ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
            AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);

            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
                manipulatorExecutor.execute(worker);
            }
            manipulatorExecutor.shutdown();
            while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
            }
        }
    }

发生死锁的原因是这种情况:
t1检查其容量1。

t2检查其1。

t3检查其1。

t2占用,将容量设置为0,继续流动并最终退出。
现在,t1和t3处于死锁状态,因为将不会添加任何内容到downloadedImagesBlockingQueue。

最终,我想要这样的事情:当容量达到&&时,队列为空=打破“while”循环,并优雅地终止。

将“设置为空”设置为唯一条件将不起作用,这是因为开始时它是空的,直到某些ImageDownloader将imageBean放入队列中为止。

最佳答案

您可以采取一些措施来防止死锁:

  • 使用容量为
  • LinkedBlockingQueue
  • 使用offer添加到不阻止
  • 的队列中
  • 使用drainTopoll从队列中取出不会阻止
  • 的项目

    您可能还需要考虑一些技巧:
  • 使用ThreadPool:final ExecutorService executorService = Executors.newFixedThreadPool(4);
  • 如果您使用固定大小的ThreadPool,则可以在将数据添加到与ThreadPool的大小相对应的队列中完成添加后添加"poison pill",并在poll时将其检查

  • 使用ThreadPool就像这样简单:
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
    
        final Future<?> result = executorService.submit(new Runnable() {
            @Override
            public void run() {
    
            }
        });
    

    还有一个鲜为人知的ExecutorCompletionService抽象了整个过程。更多信息here

    关于java - Java生产者使用者ArrayBlockingQueue在take()上发生死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37737832/

    相关文章:

    c++ - 具有自定义阻塞队列实现的未解析外部符号

    java - 如何在 Android Studio 中从 Activity 类外部调用/使用文件?

    java - Java 中的 Monitor.TryEnter 等价物

    java - Spring 注入(inject)的 bean 线程安全

    java - 可以从内存中类的反射创建对象吗?

    java - Queue的并发版本需要异常

    java - 在 JPane 上正确显示多行

    java - 在 Java Hashmap 中显示多个键的多个值

    java - 如何使用流将 Map<String, List<String>> 转换为 Set<String>?