我尝试在 Spring Boot 中构建一个 API,每次调用 saveRecord
时,它将在 ThreadPool 的线程中运行。我打算做的是在BlockingQueue
中缓冲记录,当大小大于400时,从BlockingQueue
中取出400条记录并放入文件(仅用于测试) .
private int putRecordBatchSize = 400;
private AtomicInteger counter = new AtomicInteger(0);
private BlockingQueue<String> buffer = new ArrayBlockingQueue<>(2000, true);
@Async
public void saveRecord() throws UnsupportedEncodingException {
int cur = counter.incrementAndGet();
if(buffer.add(cur+"\n") == false) {
logger.error(format("buffer add failed. buffer_remaining_cap=[%d]", buffer.remainingCapacity()));
}
if(buffer.size() >= putRecordBatchSize) {
logger.info("[PutRecordBatch] start.");
List<String> drainedRecords = new ArrayList<>(putRecordBatchSize);
buffer.drainTo(drainedRecords, putRecordBatchSize);
//TODO test code
drainedRecords.stream().forEach(k -> {
try {
Files.write(Paths.get("text.txt"), k.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
});
try {
Thread.currentThread().sleep(3000); // mocking time-consuming operation
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("[PutRecordBatch] end.");
buffer.clear();
}
}
但是从text.txt来看,在drainTo操作开始和结束之间添加到队列中的数据似乎不知何故丢失了,如下所示。有人可以帮我吗?非常感谢!
389
390
391
392
393
394
395
396
397
398
399
400 // This is the border for every drainTo operation
430
431
432
433
434
435
436
437
438
439
最佳答案
对我来说,丢失记录并不奇怪:
buffer.drainTo(drainedRecords, putRecordBatchSize);
从缓冲区
中删除putRecordBatchSize
元素。
写出这些记录后,您可以执行
buffer.clear();
这会删除在处理先前元素时添加的那些元素。
由于 buffer.drainTo(drainedRecords, putRecordBatchSize);
已经删除了您正在处理的元素,因此不需要 buffer.clear();
!
关于java - 使用BlockingQueue在Java中实现生产者-消费者似乎在消费后丢失数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43038974/