java - 使用BlockingQueue在Java中实现生产者-消费者似乎在消费后丢失数据

标签 java spring multithreading spring-boot

我尝试在 Spring Boot 中构建一个 API,每次调用 saveRecord 时,它将在 ThreadPool 的线程中运行。我打算做的是在BlockingQueue中缓冲记录,当大小大于400时,从BlockingQueue中取出400条记录并放入文件(仅用于测试) .

private int putRecordBatchSize = 400;
private AtomicInteger counter = new AtomicInteger(0);
private BlockingQueue<String> buffer = new ArrayBlockingQueue<>(2000, true);

@Async
public void saveRecord() throws UnsupportedEncodingException {
    int cur = counter.incrementAndGet();
    if(buffer.add(cur+"\n") == false)   {
        logger.error(format("buffer add failed. buffer_remaining_cap=[%d]", buffer.remainingCapacity()));
    }

    if(buffer.size() >= putRecordBatchSize) {
        logger.info("[PutRecordBatch] start.");
        List<String> drainedRecords = new ArrayList<>(putRecordBatchSize);
        buffer.drainTo(drainedRecords, putRecordBatchSize);

        //TODO test code
        drainedRecords.stream().forEach(k -> {
            try {
                Files.write(Paths.get("text.txt"), k.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        try {
            Thread.currentThread().sleep(3000); // mocking time-consuming operation
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        logger.info("[PutRecordBatch] end.");
        buffer.clear();
    }
}

但是从text.txt来看,在drainTo操作开始和结束之间添加到队列中的数据似乎不知何故丢失了,如下所示。有人可以帮我吗?非常感谢!

389
390
391
392
393
394
395
396
397
398
399
400  // This is the border for every drainTo operation
430
431
432
433
434
435
436
437
438
439

最佳答案

对我来说,丢失记录并不奇怪:

buffer.drainTo(drainedRecords, putRecordBatchSize);

缓冲区中删除putRecordBatchSize元素。

写出这些记录后,您可以执行

buffer.clear();

这会删除在处理先前元素时添加的那些元素。

由于 buffer.drainTo(drainedRecords, putRecordBatchSize); 已经删除了您正在处理的元素,因此不需要 buffer.clear();!

关于java - 使用BlockingQueue在Java中实现生产者-消费者似乎在消费后丢失数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43038974/

相关文章:

java - Spring Async 方法隐藏异常

java - hibernate -> ArrayList 无法转换为 Set

java - Spring Boot Controller 不返回 html 页面

spring - 使用 Spock 在 Spring 中测试 Mock Bean

python - 如何使用 PRAW (Reddit) 和多线程更新数据帧

c++ - 我可以使用指针分配以安全的方式检测线程的开始吗?

c++ - pthreads 中的内存模型规范

java - Java Swing JList 的键值对

java - 安卓工作室 : Execution failed for task ':app:transformClassesWithDexBuilderForDebug'

Spring MVC 3. 绑定(bind)后验证前如何修改表单