java - 分布式任务队列中的并发(生产者/消费者)

标签 java multithreading concurrency distributed task-queue

我的应用程序(Java)随机生成一些任务并由分布式后台线程异步消耗。

我没有分布式锁解决方案,例如 ZooKeeper现在。 我没有任何第三方消息队列。

我使用数据库作为任务队列,消费的结果也保存在数据库中,所有消费者/生产者共享访问。

我有一些这样的代码:

消费者:

while(true) {
  // block the thread and wait from producer's notify
  // my producers would produce MANY work items but only notify each consumer ONCE.
  waitProducer();

  // consume the queue
  while(database.queueNotEmpty()) {
    // consume each work item and remove from database queue
    consumeAll();
  }
}

制作人:

for(...) {
  database.enqueue(work[i]);
}
// notify all consumers
notifyAllConsumer();

显然上面的代码存在并发错误。我有 3 个问题:

1.如何避免分布式消费者消费同一个任务? (关于这一行:“consumeAll()”) 或者减少重复计算。多次使用一个任务不会是一个错误,但在我的情况下效率较低。

2.如何避免队列不为空但没有消费者 Activity ?顺序是: 一个消费者和一个生产者样本:

  • Consumer: while(database.queueNotEmpty())//队列为空,中断 while 循环
  • 生产者:database.enqueue(work[i]);//生成一个任务
  • 生产者:notifyAllConsumer();//通知消费者,但是 它已经处于 Activity 状态
  • 消费者:waitProducer();//挂起线程,但仍有工作要做
<小时/>

3.这个问题有什么最佳实践吗?特别是在纯java中。 第三方消息队列或者zookeeper之类的东西是必须的吗? 少锁或无锁优先;就我而言,效率比正确性更重要。

谢谢!

最佳答案

我建议您使用LinkedBlockingQueue在这种情况下。

LinkedBlockingQueue tutorial

您可以使用 take()/put() 方法,如果您想在有时间限制的情况下等待,您可以使用 Offer()、poll() 和 peek()。

我也在类似的问题中使用过它。

关于java - 分布式任务队列中的并发(生产者/消费者),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23889764/

相关文章:

c++ - 如何读锁多线程C++程序

尝试中断并加入 main 时 Java 线程挂起

java - 在php中将字符串转换为utf-8

java - 为什么模拟的类不会进入未模拟的函数?

java - NumberFormat setMaximumFractionDigits 方法

java - 简单、便宜的并发列表或集合?

Java 8 : How can I convert a for loop to run in parallel?

java - 如何创建平衡的团体

c# - 从 C# 中的不同线程启动计时器

Java Swing 线程问题