java - 使用 spring 入站文件适配器进行并发处理

标签 java spring concurrency spring-integration

我有一个文件系统目录,我想轮询文件然后并发处理每个文件,每个文件一个线程。我的印象是,在幕后,InboundFileAdapter 将每个文件放入一个队列中,这样我就可以使用下游的执行程序 channel 来同时处理稍后的调用。我在 Java Config 中实现如下:

return IntegrationFlows
            .from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
                            .scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
                            .filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
                    e -> e.poller(Pollers
                            .fixedDelay(fileSystemPollDelay)
                            .get()))
            .channel(MessageChannels.executor(executor).get())
            .transform(File::toPath)
            .enrichHeaders(cleanUpConfigurer)
            .get()

执行者 channel 下游的每个 channel 本身都是直接 channel 。

但是,我看到下游服务的并发性很差。使用缓存线程池,我看到同一个线程基本上串行执行下游代码,而如果我使用固定池执行器,我看到不同的线程权衡仍然串行执行。

我也试过在轮询器和执行器 channel 之间架起一座桥梁,但无济于事。

最佳答案

这只是因为 SourcePollingChannelAdapterFactoryBean 的作用:

if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
    // the default is 1 since a source might return
    // a non-null and non-interruptible value every time it is invoked
    this.pollerMetadata.setMaxMessagesPerPoll(1);
}

因此,每个 .fixedDelay(fileSystemPollDelay) 只有一个 File 从队列中轮询以进行处理。

因此,只需将 .maxMessagesPerPoll() 增加到适合您的系统值的值,并享受并发性带来的乐趣!

顺便说一句,没有理由在轮询适配器之后引入ExecutorChannel。出于相同的并发原因,您可以将 .taskExecutor() 用于 .poller()

关于java - 使用 spring 入站文件适配器进行并发处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33444304/

相关文章:

java - 连接池 C3P0 日志记录

java - Spring Boot 单元测试配置

java - 示例 SyncService 中 sSyncAdapterLock 的用途是什么?

java - 在 HashMap 值对象上同步

java - 强制停止后关闭残留的 Selenium WebDriver 实例

java - spring中通过Service访问Join Tables的元素

java - 根据显示的文本更改按钮的操作

java - 使用 jsoup 获取链接并将其重定向到最后带有 ?token= 的链接

java - Spring mvc 测试+mockito。 @ModelAttribute 空指针

java - N+1 HTTP 调用通过同时队列进行批处理