Spring SFTP 出站适配器 - 确定文件何时发送

标签 spring spring-integration spring-integration-sftp

我有一个 Spring SFTP 输出适配器,我通过主程序中的“adapter.start()”启动它。启动后,适配器将按预期传输并上传指定目录中的所有文件。但我想在所有文件传输完毕后停止适配器。如何检测所有文件是否已传输,以便我可以发出适配器.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

最佳答案

@Artem Bilan 已经给出了答案。但对于像我这样的 Spring Integration 菜鸟来说,这是他所说的具体实现:

  1. 定义一个服务来按需获取 PDF 文件:
@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}
  • 定义网关以按需启动 SFTP 上传流程:
  • @MessagingGateway
    public interface SFtpOutboundGateway {
        @Gateway(requestChannel = "sftpOutboundFlow.input")
        void uploadFiles(List<File> files);
    }
    
  • 定义集成流程以通过 Sftp.outboundGateway 将文件上传到 SFTP 服务器:
  • @Configuration
    @EnableIntegration
    public class FtpFlowIntegrationConfig {
        // could be also bound via @Value 
        private String sftpRemoteDirectory = "/path/to/remote/dir";
    
        @Bean
        public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
            factory.setHost("localhost");
            factory.setPort(22222);
            factory.setUser("client1");
            factory.setPassword("password123");
            factory.setAllowUnknownKeys(true);
            return new CachingSessionFactory<>(factory);
        }
    
        @Bean
        public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
            return e -> e
                    .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                    .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                    .handle(
                        Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                    );
        }
    
        @Bean
        public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
            RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
            template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
            template.setAutoCreateDirectory(true);
            template.afterPropertiesSet();
            template.setUseTemporaryFileName(false);
            return template;
        }
    }
    

    接线:

    public class SpringApp {
        public static void main(String[] args) {
            final MyFileService fileService = ctx.getBean(MyFileService.class);
            final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
            // trigger the sftp upload flow manually - only once
            sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
        }
    }
    

    导入注释:

    1.

    @Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);

    这里是DirectChannel channel sftpOutboundFlow.input将用于将带有有效负载 (= List<File> files ) 的消息传递给接收者。如果尚未创建此 channel ,网关将隐式创建它。

    2.

    @Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }

    因为IntegrationFlowConsumer函数接口(interface),我们可以使用 IntegrationFlowDefinition 稍微简化流程。在 bean 注册阶段,IntegrationFlowBeanPostProcessor 将此内联 (Lambda) IntegrationFlow 转换为 StandardIntegrationFlow 并处理其组件。使用 Lambda 的 IntegrationFlow 定义将 DirectChannel 填充为流的 inputChannel,并在应用程序上下文中注册为名称为 sftpOutboundFlow.input 的 bean。在上面的示例中(流 bean 名称+“.input”)。这就是为什么我们使用 SFtpOutboundGateway 这个名称。网关。

    引用号:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

    3.

    @Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}

    参见:Remote directory for sftp outbound gateway with DSL

    流程图:

    enter image description here

    关于Spring SFTP 出站适配器 - 确定文件何时发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66906665/

    相关文章:

    java - SimpleUrlHandlerMapping 不起作用

    java - 如何组织 Spring Integration 重试方法,其列表随着每次重试而减少?

    spring - 如何使用 Spring JMS 在 ActiveMQ 中创建多个监听器

    java - 如何在 spring mqtt 集成中停止重复订阅收到的保留消息

    java - 处理文件后通过 SftpOutboundGateway 删除文件

    java - 从命令行覆盖 spring 配置服务器配置

    java - Hibernate注解一对多关系映射的优化方式

    java - spring-integration 中的 Windows key 身份验证

    java - 具有多个文件夹的 Sftp OutboundAdapter

    java - Spring MVC - 表单映射