我想创建一个包含某些对象的池(无限)的应用程序。当池的大小达到某个最小阈值时,我想启动我的池填充服务并生成一定数量的新对象(直到达到最大阈值)。
这是 producer-consumer
问题的略微修改版本,但不知何故我卡住了。由于创建大小有限的 BlockingQueue
并保持其填充很容易,我不知道如何解决我的问题。
我尝试使用 ReentrantLock
并且它是 Condition
对象,但是 Condition#signal()
方法要求我在锁内,这完全是对我来说是不必要的。
在我看来,最好的解决方案是 CountDownLatch
。消费者会减少计数器并最终触发池填充服务。这里的问题是 CountDownLatch
无法自行重启。
有什么想法吗?
换句话说:我有一堆消费者线程和一个生产者线程。生产者应该等到达到最小阈值,生产一些对象,然后再次等待。
最佳答案
Semaphore
可以作为生产者的屏障并且可以重复使用。当 Semaphore
与 AtomicBoolean
组合时,生产者可以在不影响消费者的情况下工作。它确实需要池来处理填充逻辑。
在下面的实现中,生产者立即开始填充池,然后等待池达到其最小大小。
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
// http://stackoverflow.com/q/32358084/3080094
public class RandomWell {
public static void main(String[] args) {
try {
final FilledPool<Integer> pool = new FilledPool<Integer>(100, 1000);
final CountDownLatch syncStart = new CountDownLatch(3);
Thread consumer = new Thread() {
@Override public void run() {
// just to do something, keep track of amount of positive ints from pool
int positiveInt = 0;
int totalInts = 0;
try {
syncStart.countDown();
syncStart.await();
for(;;) {
int i = pool.take();
if (i > 0) {
positiveInt++;
}
totalInts++;
Thread.yield();
}
} catch (InterruptedException e) {
System.out.println("Consumer stopped: " + positiveInt + " / " + (totalInts - positiveInt));
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumer.start();
Thread producer = new Thread() {
@Override public void run() {
try {
Random r = new Random();
syncStart.countDown();
syncStart.await();
for(;;) {
int fillTotal = 0;
while (!pool.isMinFilled()) {
int fill = pool.getFillSize();
for (int i = 0; i < fill; i++) {
pool.offer(r.nextInt());
}
fillTotal += fill;
// System.out.println("Pool size: " + pool.sizeFast());
}
System.out.println("Filled " + fillTotal);
pool.awaitNewFilling();
}
} catch (InterruptedException e) {
System.out.println("Producer stopped.");
} catch (Exception e) {
e.printStackTrace();
}
}
};
producer.start();
syncStart.countDown();
syncStart.await();
Thread.sleep(100);
producer.interrupt();
consumer.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
}
static class FilledPool<E> {
private final LinkedBlockingQueue<E> pool;
private final int minSize;
private final int maxSize;
private final Semaphore needFilling = new Semaphore(0);
// producer starts filling initially
private final AtomicBoolean filling = new AtomicBoolean(true);
public FilledPool(int minSize, int maxSize) {
super();
this.minSize = minSize;
this.maxSize = maxSize;
pool = new LinkedBlockingQueue<E>();
}
public E take() throws InterruptedException {
triggerFilling();
E e = pool.take();
return e;
}
private void triggerFilling() {
if (!isFilling() && !isMinFilled() && filling.compareAndSet(false, true)) {
needFilling.release();
System.out.println("Filling triggered.");
}
}
public void offer(E e) { pool.offer(e); }
public void awaitNewFilling() throws InterruptedException {
// must check on minimum in case consumers outpace producer
if (isMinFilled()) {
filling.set(false);
needFilling.acquire();
}
}
public int size() { return pool.size(); }
public boolean isMinFilled() { return minSize < size(); }
public int getFillSize() { return maxSize - size(); }
public boolean isFilling() { return filling.get(); }
}
}
更新:我还设法使用 ConcurrentLinkedQueue
而不是 LinkedBlockingQueue
让它工作,这大约使吞吐量增加了一倍,但同时也使代码的复杂性增加了一倍。
关于java - 具有最小和最大阈值设置的池填充服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32358084/