java - TaskExecutor 不工作 Spring Integration

标签 java spring-integration spring-integration-dsl

我已经用任务执行器设置了文件轮询器

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

如上所示,我已将固定的 threadpool 设置为 10,每个轮询的最大消息数为 10。如果我放 10 个文件,它仍然会一个一个地处理。这里可能有什么问题?

* 更新 *

虽然我现在有其他问题,但在 Gary 的回答后它工作得很好。

我的轮询器是这样设置的

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

使用 AcceptAll 的原因是因为同一个文件可能会再次出现,这就是我首先移动文件的原因。但是当我启用线程执行器时,多个线程正在处理同一个文件,我假设是因为 AcceptAllFile

如果我更改为 AcceptOnceFileListFilter 它可以工作,但是不会再次拾取再次出现的相同文件!可以做些什么来避免这个问题?

问题/错误

在类AbstractPersistentAcceptOnceFileListFilter中我们有这段代码

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

现在例如,如果我设置了每次轮询 5 的最大值并且有两个文件,那么它可能的同一个文件将被两个线程拾取。

假设我的代码在我读取文件后移动文件。

但是另一个线程获取到accept方法

如果文件不存在,那么它将返回 lastModified 时间为 0,并且返回 true。

这会导致问题,因为该文件不存在。

如果它是 0 那么它应该返回 false 因为文件已经不存在了。

最佳答案

当您将任务执行器添加到轮询器时;调度程序线程所做的只是将轮询任务交给线程池中的一个线程; maxMessagesPerPoll 是轮询任务的一部分。轮询器本身每 5 秒仅运行一次。为了得到你想要的,你应该在流程中添加一个执行者 channel ......

@SpringBootApplication
public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<String>handle((p, h) -> {
                    try {
                        logger.info(p);
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
}

编辑

它对我来说很好用......

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .handle((p, h) -> {
                try {
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

touch test1.txt

2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

编辑1

同意 - 用这个转载...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .<File>handle((p, h) -> {
                try {
                    p.delete();
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt

关于java - TaskExecutor 不工作 Spring Integration,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53521593/

相关文章:

java - "Adopting MapReduce model"= 可扩展性的通用答案吗?

java - Spring集成应用和缓存

android - 使 xmpp-connection (spring integration) 与 google gcm 一起工作的正确参数是什么?

java - 如何为 Spring XD 配置 Spring InboundChannelAdapter?

java - 如何使用wireTap传递 header ?

java - 默认PreparedStatement不可序列化

java - xhtmlrenderer 创建长度为 0 的 PDF

java - 如何捕获从 Java (tomcat) 发送到 Oracle DB 的查询?

java - 如何使用 spring 集成 dsl 转换子流路由中的字符串

spring-boot - Spring集成TCP客户端抛出 'Socket closed during message assembly'异常