java并发: multi-producer one-consumer

标签 java concurrency consumer producer

我有这样一种情况,不同的线程填充一个队列(生产者),一个消费者从这个队列中检索元素。我的问题是,当从队列中检索这些元素之一时,一些元素会丢失(丢失信号?)。生产者代码是:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费者代码是:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

当代码运行时,我有时会添加 20 个元素并检索 20 个元素,但在其他情况下,检索到的元素少于 20 个。知道如何解决这个问题吗?

最佳答案

我建议您使用 BlockingQueue 而不是 Queue。 LinkedBlockingDeque 可能是您的理想选择。

您的代码如下所示:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

然后你只需要

queue.take()

在你的消费者线程上

想法是 .take() 将阻塞,直到队列中有一个项目可用,然后恰好返回一个(这是我认为您的实现受到影响的地方:轮询时缺少通知)。 .put() 负责为你做所有的通知。无需等待/通知。

关于java并发: multi-producer one-consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10006647/

相关文章:

java - 根据条件结束java线程

java - 指定构造函数,稍后初始化(Java)

java - 让 Java Axis 在 Windows 上运行?没有发现类定义错误

java - 线程一个接一个地运行,但不同时运行,我该如何解决这个问题?

php - 插入表时的竞争条件

java - 使用 Spring Hateoas 可分页

java - 很难理解 Java 消费者示例

c - 使用信号量的生产者和消费者中的段错误(核心转储)

java - Jackson POJO 映射 ArrayList<Class> 无法识别的字段

java - 使用 IP 地址确定移动运营商名称