java - 具有最小和最大阈值设置的池填充服务

标签 java multithreading concurrency producer-consumer

我想创建一个包含某些对象的池(无限)的应用程序。当池的大小达到某个最小阈值时,我想启动我的池填充服务并生成一定数量的新对象(直到达到最大阈值)。

这是 producer-consumer 问题的略微修改版本,但不知何故我卡住了。由于创建大小有限的 BlockingQueue 并保持其填充很容易,我不知道如何解决我的问题。

我尝试使用 ReentrantLock 并且它是 Condition 对象,但是 Condition#signal() 方法要求我在锁内,这完全是对我来说是不必要的。

在我看来,最好的解决方案是 CountDownLatch。消费者会减少计数器并最终触发池填充服务。这里的问题是 CountDownLatch 无法自行重启。

有什么想法吗?

换句话说:我有一堆消费者线程和一个生产者线程。生产者应该等到达到最小阈值,生产一些对象,然后再次等待。

最佳答案

Semaphore 可以作为生产者的屏障并且可以重复使用。当 SemaphoreAtomicBoolean 组合时,生产者可以在不影响消费者的情况下工作。它确实需要池来处理填充逻辑。
在下面的实现中,生产者立即开始填充池,然后等待池达到其最小大小。

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

// http://stackoverflow.com/q/32358084/3080094
public class RandomWell {

    public static void main(String[] args) {

        try {

            final FilledPool<Integer> pool = new FilledPool<Integer>(100, 1000);
            final CountDownLatch syncStart = new CountDownLatch(3);

            Thread consumer = new Thread() {
                @Override public void run() {
                    // just to do something, keep track of amount of positive ints from pool
                    int positiveInt = 0;
                    int totalInts = 0;
                    try {
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int i = pool.take();
                            if (i > 0) {
                                positiveInt++;
                            }
                            totalInts++;
                            Thread.yield();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Consumer stopped: " + positiveInt + " / " + (totalInts - positiveInt));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            consumer.start();

            Thread producer = new Thread() {
                @Override public void run() {
                    try {
                        Random r = new Random();
                        syncStart.countDown();
                        syncStart.await();
                        for(;;) {
                            int fillTotal = 0;
                            while (!pool.isMinFilled()) {
                                int fill = pool.getFillSize();
                                for (int i = 0; i < fill; i++) {
                                    pool.offer(r.nextInt());
                                }
                                fillTotal += fill;
                                // System.out.println("Pool size: " + pool.sizeFast());
                            }
                            System.out.println("Filled " + fillTotal);
                            pool.awaitNewFilling();
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Producer stopped.");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            producer.start();
            syncStart.countDown();
            syncStart.await();

            Thread.sleep(100);

            producer.interrupt();
            consumer.interrupt();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class FilledPool<E> {

        private final LinkedBlockingQueue<E> pool;
        private final int minSize;
        private final int maxSize;
        private final Semaphore needFilling = new Semaphore(0);

        // producer starts filling initially
        private final AtomicBoolean filling = new AtomicBoolean(true);

        public FilledPool(int minSize, int maxSize) {
            super();
            this.minSize = minSize;
            this.maxSize = maxSize;
            pool = new LinkedBlockingQueue<E>();
        }

        public E take() throws InterruptedException {

            triggerFilling();
            E e = pool.take();
            return e;
        }

        private void triggerFilling() {

            if (!isFilling() && !isMinFilled() && filling.compareAndSet(false, true)) {
                needFilling.release();
                System.out.println("Filling triggered.");
            }
        }

        public void offer(E e) { pool.offer(e); }

        public void awaitNewFilling() throws InterruptedException {

            // must check on minimum in case consumers outpace producer
            if (isMinFilled()) {
                filling.set(false);
                needFilling.acquire();
            }
        }

        public int size() { return pool.size(); }
        public boolean isMinFilled() { return minSize < size(); }
        public int getFillSize() { return maxSize - size(); } 
        public boolean isFilling() { return filling.get(); }
    }
}

更新:我还设法使用 ConcurrentLinkedQueue 而不是 LinkedBlockingQueue 让它工作,这大约使吞吐量增加了一倍,但同时也使代码的复杂性增加了一倍。

关于java - 具有最小和最大阈值设置的池填充服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32358084/

相关文章:

java jdom xml 复制一段xml

java - 使用 Hibernate 和 spring 从不同表形成对象列表的最佳方法是什么

java - 创建对象而不导致堆栈溢出错误?

java - JSON getString() 从字符串中删除空格?

objective-c - block 在异步线程上执行两次

multithreading - goroutines(以及运行它们的操作系统线程)在被 IO 绑定(bind)操作阻塞时如何表现?

c++ - 计算机如何将操作从一个线程切换到另一个线程

python - 线程和条件 : debugging the acquirement

concurrency - 为什么条件变量有时会错误唤醒?

android - Executors.newFixedThreadPool(size) 是要考虑 CPU 内核,还是我应该考虑。值得吗?