- 生产者是有限的,消费者也应该是有限的。
- 问题是何时停止,而不是如何运行。
- 可以通过任何 类型的 BlockingQueue 进行通信。
- 不能依赖污染队列(PriorityBlockingQueue)
- 不能依赖锁定队列(SynchronousQueue)
- 不能完全依赖报价/轮询(SynchronousQueue)
- 可能存在更奇特的队列。
Creates a queued seq on another (presumably lazy) seq s. The queued seq will produce a concrete seq in the background, and can get up to n items ahead of the consumer. n-or-q can be an integer n buffer size, or an instance of java.util.concurrent BlockingQueue. Note that reading from a seque can block if the reader gets ahead of the producer.
http://clojure.github.com/clojure/clojure.core-api.html#clojure.core/seque
到目前为止我的尝试+一些测试:https://gist.github.com/934781
感谢使用 Java 或 Clojure 的解决方案。
最佳答案
class Reader {
private final ExecutorService ex = Executors.newSingleThreadExecutor();
private final List<Object> completed = new ArrayList<Object>();
private final BlockingQueue<Object> doneQueue = new LinkedBlockingQueue<Object>();
private int pending = 0;
public synchronized Object take() {
removeDone();
queue();
Object rVal;
if(completed.isEmpty()) {
try {
rVal = doneQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pending--;
} else {
rVal = completed.remove(0);
}
queue();
return rVal;
}
private void removeDone() {
Object current = doneQueue.poll();
while(current != null) {
completed.add(current);
pending--;
current = doneQueue.poll();
}
}
private void queue() {
while(pending < 10) {
pending++;
ex.submit(new Runnable() {
@Override
public void run() {
doneQueue.add(compute());
}
private Object compute() {
//do actual computation here
return new Object();
}
});
}
}
}
关于java - 生产者-消费者问题的一个转折点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5842639/