所以,我有一个 BlockingQueue,我正在用数据(来自许多线程)填充它。我想将这些数据聚合到 1000 个桶中,然后将它们传递到其他地方。所以我编写了一个线程类,它轮询队列的末尾,当它有足够的元素时,它会发送聚合数据。
我希望在 java.util.concurrent 中找到一些东西来帮助更多人解决这个问题。我能看到通过 java.util.concurrent 做到这一点的唯一方法是让队列中的每个插入都添加一个可运行的任务,然后将其添加到聚合集中,但这对我来说似乎效率很低。
通过线程轮询队列策略,假设我有5个线程,每个线程可以在本地内存中聚合(顺序并不重要),然后传递。队列和目标是唯一的争用接触点——1 个线程一次可以轮询阻塞队列。目的地可能永远不会出现争议。
使用基于任务的方法,使用执行器,所有线程将共享一个聚合点,因此会不断发生争用,更不用说集合的同步/并发变体速度较慢。
似乎很明显只有几个线程总是轮询 BlockingQueue。缺点是现在我需要写下它们所有的开始、停止,我需要处理线程死亡的情况,等等。这一切看起来像是我希望在 java.util.concurrent 或 apache 中找到的样板文件图书馆。
我真的离预订还有那么远吗?一个始终让 x 个线程运行并在失败时重新启动它们的类?是否还有另一种我没有看到的明显(高性能)方法?
最佳答案
试试这个。
public class Consumer<DATA> {
private List<DATA> dataList = new ArrayList<DATA>();
private ExecutorService threadPool = Executors.newFixedThreadPool(5);
public synchronized void consume(DATA data) {
dataList.add(data);
if(dataList.size() >= 1000) {
threadPool.submit(new ConsumerWorker(data));
}
}
}
我们本质上是在生产者的线程上下文中累积数据,直到达到所需的限制。然后,我们将这批数据提交到线程池,该线程池将根据 ConsumerWorker 的可用性对其进行排队或执行。您也可以调整线程池的行为。例如,使用 newCachedThreadPool() 将删除不活动的线程。
关于java - 编写我自己的消费者线程——我没有看到什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9695185/