java - Spring Integration Listener 中 FileReadingMessageSource 中的 scanEachPoll

标签 java duplicates queue spring-integration listener

在Spring Integration Listener中,有一个类文件调用FileReadingMessageSource,有一个 boolean 变量,允许编码器通过其方法setScanEachPoll(boolean scanEachPoll)对其进行修改.

我设法做到了,监听器将在每个周期扫描目录。但是,我发现了一些错误,即 toBeReceived 队列每次扫描目录时都会将附加文件保留在其中。

例如文件a、b、c、d、e。

第一个周期,扫描,队列将有a,b,c,d,e --> 处理a,队列剩下b,c,d,e

第二个周期,扫描,队列将有b,c,d,e,b,c,d,e --> 处理b,队列剩下c,d,e,b,c,d,e 队列中将会有重复的文件名。

此处粘贴 FileReadingMessageSource 类中的 scanInputDirectory() 方法。

private void scanInputDirectory() {
    List<File> filteredFiles = scanner.listFiles(directory);
    Set<File> freshFiles = new LinkedHashSet<File>(filteredFiles);
    if (!freshFiles.isEmpty()) {
        toBeReceived.addAll(freshFiles);
        if (logger.isDebugEnabled()) {
            logger.debug("Added to queue: " + freshFiles);
        }
    }
}

有什么办法可以防止这种情况发生吗?

最佳答案

我仍然相信 AcceptOnceFileListFilter 不会让相同的文件从目标目录中提取:请参阅 DefaultDirectoryScanner.listFiles() 以及它如何使用 this.filter

OTOH OOM 问题可以通过 FileReadingMessageSourceinternalQueueCapacity ctor arg 修复:

/**
 * Creates a FileReadingMessageSource with a bounded queue of the given
 * capacity. This can be used to reduce the memory footprint of this
 * component when reading from a large directory.
 *
 * @param internalQueueCapacity
 *            the size of the queue used to cache files to be received
 *            internally. This queue can be made larger to optimize the
 *            directory scanning. With scanEachPoll set to false and the
 *            queue to a large size, it will be filled once and then
 *            completely emptied before a new directory listing is done.
 *            This is particularly useful to reduce scans of large numbers
 *            of files in a directory.
 */
public FileReadingMessageSource(int internalQueueCapacity) {
    this(null);
    Assert.isTrue(internalQueueCapacity > 0,
            "Cannot create a queue with non positive capacity");
    this.scanner = new HeadDirectoryScanner(internalQueueCapacity);
}

然后查看该类的 JavaDocs:

 * A custom scanner that only returns the first <code>maxNumberOfFiles</code>
 * elements from a directory listing. This is useful to limit the number of File
 * objects in memory and therefore mutually exclusive with {@code AcceptOnceFileListFilter}.
 * It should not be used in conjunction with an {@code AcceptOnceFileListFilter}.

但是它不能与 AcceptOnceFileListFilter 一起使用,并且最终必须删除已处理的文件,以避免在下次轮询中出现它们。

另请参阅 Reference Manual 中的更多信息.

关于java - Spring Integration Listener 中 FileReadingMessageSource 中的 scanEachPoll,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48940071/

相关文章:

java - 如何检查 Java 8 Streams 中是否存在任何重复项?

c++ - 如何使存储在 vector 中的对象中的数据唯一?

java - 行列迷宫递归错误

java - 2D ArrayList 在 Java 中查找列中的对象

asp.net - 一种在 ASP.NET 中生成图像签名或哈希值以进行重复检测的方法?

python 多处理问题

javascript - 如何在使用 jQuery 进行动画处理时运行函数?

java - 嵌套的 JSONObject

java - 如何在JAVA或Android中加解密UTF-8?

用于 SQL 更新语句的 Java 单工作线程