java - 从 BlockingQueue 获取时缺少项目

标签 java multithreading concurrency queue executorservice

我有一个庞大的数据集,我需要将其填充到数据库中。我正在编写基于Java并发库(带有BlockingQueue和executorService的生产者-消费者模型)的代码,它可以在数据到达时不断将数据添加到队列中。消费者不断检索数据,除非遇到“毒药”(然后死亡)。

主类,带有要发布的虚拟数据。队列大小有意保持较小:

public class MessageProcessor {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
        5, true);
private static final ExecutorService executor = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ExecutorService consumerExecutor = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String POISON = "THE_END";


public void processMessages() throws InterruptedException {

//Create and start consumer 
    Runnable consumer = new MessageConsumer(queue);
    consumerExecutor.execute(consumer);

    for (String payload : getPayload()) {
        //create and start producer with given payload
        Runnable producer = new MessageProducer(queue, payload);
        executor.execute(producer);
    }

    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.MINUTES);

    consumerExecutor.shutdown();
    consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);

}

private List<String> getPayload() {
    List<String> payloads = new ArrayList<>();
    payloads.add("data1");
    payloads.add("data2");
    payloads.add("data3");
    payloads.add("data4");
    payloads.add("data5");
    payloads.add("data6");
    payloads.add("data7");
    payloads.add("data8");
    payloads.add("data9");
    payloads.add("data10");
    payloads.add(POISON);

    return payloads;
}}

生产者可运行:

public class MessageProducer implements Runnable {
private BlockingQueue<String> queue;
private String payload;

public MessageProducer(BlockingQueue<String> queue, String payload) {
    this();
    this.queue = queue;
    this.payload = payload;
}

private MessageProducer() {
}

public void run() {
    try {
            queue.put(payload);
            System.out.println("Put : " + payload );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}}

消费者可运行:

public class MessageConsumer implements Runnable {

private BlockingQueue<String> queue;
private static final String POISON = "THE_END";

public MessageConsumer(BlockingQueue<String> queue) {
    this();
    this.queue = queue;
}

private MessageConsumer() {
}

public void run() {

    String payload = "";
    do {
        try {
            payload = queue.take();
            System.out.println("Got : " + payload );
        } catch (InterruptedException ie) {
            // handle
            break;
        }
    } while (!payload.equals(POISON));
}}

输出:

Put : data1
Put : data2
Put : data3
Put : data7
Put : data6
Put : data5
Got : data1
Got : data2
Got : data3
Got : data5
Put : data10
Put : data8
Put : data9
Got : data6
Got : data7
Put : data4
Put : THE_END
Got : data8
Got : data9
Got : data10
Got : THE_END

当我执行 new MessageProcessor.processMessages() 时,我观察到两个异常:

  1. 消费者未能获取项目:data4(我假设是因为它在检索“data4”之前获取了有毒数据(“THE_END”)) - 但为什么它不按照从队列中插入的顺序获取数据什么是先进先出?
  2. 插入队列(put)不会按照列表中项目的顺序发生(例如,在“data3”之后插入“data7”)

谢谢!!

最佳答案

你的两个问题是一样的。

由于您有多个并行运行的生产者,因此您无法保证第一个生产者会在第二个生产者之前将其元素放入队列。因此,队列中的项目不是按顺序排列的,并且毒物出现在 data4 之前,因此消费者不会消耗该数据。

关于java - 从 BlockingQueue 获取时缺少项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22365312/

相关文章:

java - 通过 Java 提交多个 hadoop 作业

java - 我的文字游戏战利品系统遇到了问题

java - 无法从 openapi 客户端中的字符串反序列化 `java.time.OffsetDateTime` 类型的值

java - 如何编写initbinder进行json反序列化?

python - 用线程 boost python

java - Concurrent HashMap Null key 和并发级别

java - java --classpath 是覆盖 CLASSPATH 还是追加到它?

java - 重新创建 ServerSocket 后我的 GUI 卡住

c++ - std::set<std::future> 不可能存在吗

C 静态变量和 fork 进程