我有一个庞大的数据集,我需要将其填充到数据库中。我正在编写基于Java并发库(带有BlockingQueue和executorService的生产者-消费者模型)的代码,它可以在数据到达时不断将数据添加到队列中。消费者不断检索数据,除非遇到“毒药”(然后死亡)。
主类,带有要发布的虚拟数据。队列大小有意保持较小:
public class MessageProcessor {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
5, true);
private static final ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ExecutorService consumerExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String POISON = "THE_END";
public void processMessages() throws InterruptedException {
//Create and start consumer
Runnable consumer = new MessageConsumer(queue);
consumerExecutor.execute(consumer);
for (String payload : getPayload()) {
//create and start producer with given payload
Runnable producer = new MessageProducer(queue, payload);
executor.execute(producer);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
consumerExecutor.shutdown();
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
}
private List<String> getPayload() {
List<String> payloads = new ArrayList<>();
payloads.add("data1");
payloads.add("data2");
payloads.add("data3");
payloads.add("data4");
payloads.add("data5");
payloads.add("data6");
payloads.add("data7");
payloads.add("data8");
payloads.add("data9");
payloads.add("data10");
payloads.add(POISON);
return payloads;
}}
生产者可运行:
public class MessageProducer implements Runnable {
private BlockingQueue<String> queue;
private String payload;
public MessageProducer(BlockingQueue<String> queue, String payload) {
this();
this.queue = queue;
this.payload = payload;
}
private MessageProducer() {
}
public void run() {
try {
queue.put(payload);
System.out.println("Put : " + payload );
} catch (InterruptedException e) {
e.printStackTrace();
}
}}
消费者可运行:
public class MessageConsumer implements Runnable {
private BlockingQueue<String> queue;
private static final String POISON = "THE_END";
public MessageConsumer(BlockingQueue<String> queue) {
this();
this.queue = queue;
}
private MessageConsumer() {
}
public void run() {
String payload = "";
do {
try {
payload = queue.take();
System.out.println("Got : " + payload );
} catch (InterruptedException ie) {
// handle
break;
}
} while (!payload.equals(POISON));
}}
输出:
Put : data1
Put : data2
Put : data3
Put : data7
Put : data6
Put : data5
Got : data1
Got : data2
Got : data3
Got : data5
Put : data10
Put : data8
Put : data9
Got : data6
Got : data7
Put : data4
Put : THE_END
Got : data8
Got : data9
Got : data10
Got : THE_END
当我执行 new MessageProcessor.processMessages() 时,我观察到两个异常:
- 消费者未能获取项目:data4(我假设是因为它在检索“data4”之前获取了有毒数据(“THE_END”)) - 但为什么它不按照从队列中插入的顺序获取数据什么是先进先出?
- 插入队列(put)不会按照列表中项目的顺序发生(例如,在“data3”之后插入“data7”)
谢谢!!
最佳答案
你的两个问题是一样的。
由于您有多个并行运行的生产者,因此您无法保证第一个生产者会在第二个生产者之前将其元素放入队列。因此,队列中的项目不是按顺序排列的,并且毒物出现在 data4 之前,因此消费者不会消耗该数据。
关于java - 从 BlockingQueue 获取时缺少项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22365312/