spring-integration - 使用 Spring Integration 从远程 SFTP 目录和子目录进行流式传输

标签 spring-integration spring-integration-dsl spring-integration-sftp

我正在使用 Spring Integration Streaming Inbound Channel Adapter,从远程 SFTP 获取流并解析内容过程的每一行。

我使用:

IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
                          .filter(remoteFileFilter)
                          .remoteDirectory("test_dir"),
                        e -> e.id("sftpInboundAdapter")
                              .autoStartup(true)
                              .poller(Pollers.fixedDelay(fetchInt)))
                .handle(Files.splitter(true, true))
....

现在就可以工作了。但我只能从 test_dir 目录获取文件,但我需要递归地从该目录和子目录获取文件并解析每一行。

我注意到入站 channel 适配器Sftp.inboundAdapter(sftpSessionFactory).scanner(...)。它可以扫描子目录。但我没有看到Streaming Inbound Channel Adapter的任何内容。

那么,如何在Streaming Inbound Channel Adapter中实现“从目录递归获取文件”?

谢谢。

最佳答案

您可以使用两个出站网关 - 第一个执行 ls -R(递归列表);拆分结果并使用配置了 mget -stream 的网关来获取每个文件。

编辑

@SpringBootApplication
public class So60987851Application {

    public static void main(String[] args) {
        SpringApplication.run(So60987851Application.class, args);
    }

    @Bean
    IntegrationFlow flow(SessionFactory<LsEntry> csf) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
                .handle(Sftp.outboundGateway(csf, Command.LS, "payload")
                        .options(Option.RECURSIVE, Option.NAME_ONLY)
                        // need a more robust metadata store for persistence, unless the files are removed
                        .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                .split()
                .log()
                .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
                .handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
                        .options(Option.STREAM))
                .split(new FileSplitter())
                .log()
                // instead of a filter, we can remove the remote file.
                // but needs some logic to wait until all lines read
//              .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
//              .log()
                .get();
    }

    @Bean
    CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
        return new CachingSessionFactory<>(sf);
    }

    @Bean
    DefaultSftpSessionFactory sf() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.8");
        sf.setUser("gpr");
        sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
        sf.setAllowUnknownKeys(true);
        return sf;
    }

}

关于spring-integration - 使用 Spring Integration 从远程 SFTP 目录和子目录进行流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60987851/

相关文章:

spring-integration - 为 Http.outboundGateway spring dsl 配置错误处理和重试

java - Kafka Spring 集成流程上的空指针

redis - spring 集成 redis 轮询器与事务

java - `RotatingServerAdvice`多次不取文件怎么解决?

java - Spring SFTP读文件锁

java - 通过 Spring Integration 将多个文件从不同源复制到不同目的地

java - Spring 集成 - 队列/轮询器似乎在没有任何操作的情况下耗尽线程池

spring-integration - 如何使用 Java Config 配置 SFTP 出站网关?

java - SpringXD部署Stream时出错: StringDeserializer class could not be found

java - 仅第一条消息传送到服务器