我有一个文件系统目录,我想轮询文件然后并发处理每个文件,每个文件一个线程。我的印象是,在幕后,InboundFileAdapter
将每个文件放入一个队列中,这样我就可以使用下游的执行程序 channel 来同时处理稍后的调用。我在 Java Config 中实现如下:
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
.scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
.filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
e -> e.poller(Pollers
.fixedDelay(fileSystemPollDelay)
.get()))
.channel(MessageChannels.executor(executor).get())
.transform(File::toPath)
.enrichHeaders(cleanUpConfigurer)
.get()
执行者 channel 下游的每个 channel 本身都是直接 channel 。
但是,我看到下游服务的并发性很差。使用缓存线程池,我看到同一个线程基本上串行执行下游代码,而如果我使用固定池执行器,我看到不同的线程权衡仍然串行执行。
我也试过在轮询器和执行器 channel 之间架起一座桥梁,但无济于事。
最佳答案
这只是因为 SourcePollingChannelAdapterFactoryBean
的作用:
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
this.pollerMetadata.setMaxMessagesPerPoll(1);
}
因此,每个 .fixedDelay(fileSystemPollDelay)
只有一个 File
从队列中轮询以进行处理。
因此,只需将 .maxMessagesPerPoll()
增加到适合您的系统值的值,并享受并发性带来的乐趣!
顺便说一句,没有理由在轮询适配器之后引入ExecutorChannel
。出于相同的并发原因,您可以将 .taskExecutor()
用于 .poller()
。
关于java - 使用 spring 入站文件适配器进行并发处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33444304/