首先你可以在 https://github.com/keiseithunder/spring-sftp-xml-to-json/blob/main/src/main/java/com/demo/sftp/SftpApplication.java 找到我的代码
我有一个 Spring 集成,可以将文件从 SFTP 服务器读取到流中并发送到 kafka,然后在成功生成消息后尝试将远程文件移动到同一远程服务器中的其他目录。但是,文件根本没有移动,也没有抛出任何错误。
我尝试使用此代码从 upload/test.xml
移动文件(upload/file/test.xml
中的文件是我用来复制到 upload
目录的备份)到 upload/processed/test.xml
.
@Bean
@ServiceActivator(inputChannel = "success")
public MessageHandler handler() {
return new SftpOutboundGateway(sftpSessionFactory(), "mv", "");
}
我已经设置了file_renameTo=upload/processed/test.xml
在 headers
。不确定,我做错了什么。或者有一种方法可以使用像 advice.setOnSuccessExpressionString("@template.copy(headers['file_remoteDirectory']+'/'+headers['file_remoteFile'])");
这样的东西移动文件?
我的消息是
"GenericMessage [payload=Note(to=Toves, from=Jani, heading=Reminder, body=Don't forget me this weekend!!!!!), headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"test.xml","link":false,"modified":1674550080000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":122}, kafka_messageKey=test.xml, file_remoteDirectory=upload, kafka_recordMetadata=test-0@298, file_renameTo=upload/processed/test.xml, id=708d04c4-5abc-9f45-e83b-1fea7ffa5e8d, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@2e0aa05, file_remoteFile=test.xml, timestamp=1674556856288}]"
PS。我尝试使用调试器来查看错误,发现它似乎在处引发错误
private String obtainRemoteFilePath(Message<?> requestMessage) {
Error here----> String remoteFilePath = this.fileNameProcessor.processMessage(requestMessage);
Assert.state(remoteFilePath != null,
() -> "The 'fileNameProcessor' evaluated to null 'remoteFilePath' from message: " + requestMessage);
return remoteFilePath;
}
“com.demo.sftp.models.Note 类无法转换为 java.lang.String 类(com.demo.sftp.models.Note 位于加载器 org.springframework.boot.devtools.restart 的未命名模块中。 classloader.RestartClassLoader @387f9ed2; java.lang.String 位于加载器 'bootstrap' 的 java.base 模块中)"
编辑 1:添加解决方案
- 需要将 InboundChannelAdapter 定义为 PublishSubscribeChannel
@Bean
public MessageChannel streamChannel() {
return new PublishSubscribeChannel();
}
- 然后将 OutboundGateway 添加为 LOWEST_PRECEDENCE 顺序,例如
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = "streamChannel")
public MessageHandler moveFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString(
"headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
- 额外:如果我们想要移动有错误的文件(来自错误 channel )。我们需要改变
setRenameExpressionString
匹配ErrorMessage
结构。例如。
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public MessageHandler moveErrorFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"payload['failedMessage']['headers']['file_remoteDirectory'] + '/' + payload['failedMessage']['headers']['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString(
"payload['failedMessage']['headers']['file_remoteDirectory'] + '/error/' + payload['failedMessage']['headers']['timestamp'] + '-' + payload['failedMessage']['headers']['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.HIGHEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
最佳答案
如artem-bilan回答 Spring integration - SFTP rename or move file in remote server after copying
我可以通过将 InboundChannelAdapter
更改为 PublishSubscribeChannel
并创建新订阅者来解决该问题,如下所示
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
@ServiceActivator(inputChannel = "streamChannel")
public MessageHandler moveFile() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
"headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
sftpOutboundGateway
.setRenameExpressionString("headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
sftpOutboundGateway.setRequiresReply(false);
sftpOutboundGateway.setUseTemporaryFileName(true);
sftpOutboundGateway.setOutputChannelName("nullChannel");
sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
sftpOutboundGateway.setAsync(true);
return sftpOutboundGateway;
}
关于spring-integration - Spring集成在生成消息后将文件移动到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75220522/