我有一个 Spring SFTP 输出适配器,我通过主程序中的“adapter.start()”启动它。启动后,适配器将按预期传输并上传指定目录中的所有文件。但我想在所有文件传输完毕后停止适配器。如何检测所有文件是否已传输,以便我可以发出适配器.stop()?
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
.filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
.preventDuplicates(true),
e -> e.id("sftpOutboundAdapter")
.autoStartup(false)
.poller(Pollers.trigger(new FireOnceTrigger())
.maxMessagesPerPoll(-1)))
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
.handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
.useTemporaryFileName(false)
.remoteDirectory(sftpRemoteDirectory))
.get();
}
最佳答案
@Artem Bilan 已经给出了答案。但对于像我这样的 Spring Integration 菜鸟来说,这是他所说的具体实现:
- 定义一个服务来按需获取 PDF 文件:
@Service
public class MyFileService {
public List<File> getPdfFiles(final String srcDir) {
File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
return Arrays.asList(files == null ? new File[]{} : files);
}
}
- 定义网关以按需启动 SFTP 上传流程:
@MessagingGateway
public interface SFtpOutboundGateway {
@Gateway(requestChannel = "sftpOutboundFlow.input")
void uploadFiles(List<File> files);
}
- 定义集成流程以通过
Sftp.outboundGateway
将文件上传到 SFTP 服务器:
@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
// could be also bound via @Value
private String sftpRemoteDirectory = "/path/to/remote/dir";
@Bean
public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22222);
factory.setUser("client1");
factory.setPassword("password123");
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
return e -> e
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
.handle(
Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
);
}
@Bean
public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
template.setAutoCreateDirectory(true);
template.afterPropertiesSet();
template.setUseTemporaryFileName(false);
return template;
}
}
接线:
public class SpringApp {
public static void main(String[] args) {
final MyFileService fileService = ctx.getBean(MyFileService.class);
final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
// trigger the sftp upload flow manually - only once
sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles());
}
}
导入注释:
1.
@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);
这里是DirectChannel channel sftpOutboundFlow.input
将用于将带有有效负载 (= List<File> files
) 的消息传递给接收者。如果尚未创建此 channel ,网关将隐式创建它。
2.
@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }
因为IntegrationFlow是Consumer
函数接口(interface),我们可以使用 IntegrationFlowDefinition 稍微简化流程。在 bean 注册阶段,IntegrationFlowBeanPostProcessor 将此内联 (Lambda) IntegrationFlow 转换为 StandardIntegrationFlow 并处理其组件。使用 Lambda 的 IntegrationFlow 定义将 DirectChannel 填充为流的 inputChannel,并在应用程序上下文中注册为名称为 sftpOutboundFlow.input
的 bean。在上面的示例中(流 bean 名称+“.input”)。这就是为什么我们使用 SFtpOutboundGateway
这个名称。网关。
引用号:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial
3.
@Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}
参见:Remote directory for sftp outbound gateway with DSL
流程图:
关于Spring SFTP 出站适配器 - 确定文件何时发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66906665/