spring-integration - Spring集成在生成消息后将文件移动到kafka

标签 spring-integration spring-integration-sftp

首先你可以在 https://github.com/keiseithunder/spring-sftp-xml-to-json/blob/main/src/main/java/com/demo/sftp/SftpApplication.java 找到我的代码

我有一个 Spring 集成,可以将文件从 SFTP 服务器读取到流中并发送到 kafka,然后在成功生成消息后尝试将远程文件移动到同一远程服务器中的其他目录。但是,文件根本没有移动,也没有抛出任何错误

我尝试使用此代码从 upload/test.xml 移动文件(upload/file/test.xml 中的文件是我用来复制到 upload 目录的备份)到 upload/processed/test.xml .

  @Bean
  @ServiceActivator(inputChannel = "success")
  public MessageHandler handler() {
    return new SftpOutboundGateway(sftpSessionFactory(), "mv", "");
  }

我已经设置了file_renameTo=upload/processed/test.xmlheaders 。不确定,我做错了什么。或者有一种方法可以使用像 advice.setOnSuccessExpressionString("@template.copy(headers['file_remoteDirectory']+'/'+headers['file_remoteFile'])"); 这样的东西移动文件?

我的消息是

"GenericMessage [payload=Note(to=Toves, from=Jani, heading=Reminder, body=Don't forget me this weekend!!!!!), headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"test.xml","link":false,"modified":1674550080000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":122}, kafka_messageKey=test.xml, file_remoteDirectory=upload, kafka_recordMetadata=test-0@298, file_renameTo=upload/processed/test.xml, id=708d04c4-5abc-9f45-e83b-1fea7ffa5e8d, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@2e0aa05, file_remoteFile=test.xml, timestamp=1674556856288}]"
PS。我尝试使用调试器来查看错误,发现它似乎在

处引发错误
    private String obtainRemoteFilePath(Message<?> requestMessage) {
Error here----> String remoteFilePath = this.fileNameProcessor.processMessage(requestMessage);
        Assert.state(remoteFilePath != null,
                () -> "The 'fileNameProcessor' evaluated to null 'remoteFilePath' from message: " + requestMessage);
        return remoteFilePath;
    }

“com.demo.sftp.models.Note 类无法转换为 java.lang.String 类(com.demo.sftp.models.Note 位于加载器 org.springframework.boot.devtools.restart 的未命名模块中。 classloader.RestartClassLoader @387f9ed2; java.lang.String 位于加载器 'bootstrap' 的 java.base 模块中)"

编辑 1:添加解决方案

  1. 需要将 InboundChannelAdapter 定义为 PublishSubscribeChannel
  @Bean
  public MessageChannel streamChannel() {
    return new PublishSubscribeChannel();
  }
  • 然后将 OutboundGateway 添加为 LOWEST_PRECEDENCE 顺序,例如
  •   @Bean
      @Order(Ordered.LOWEST_PRECEDENCE)
      @ServiceActivator(inputChannel = "streamChannel")
      public MessageHandler moveFile() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
            "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
        sftpOutboundGateway
            .setRenameExpressionString(
                "headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
        sftpOutboundGateway.setRequiresReply(false);
        sftpOutboundGateway.setUseTemporaryFileName(true);
        sftpOutboundGateway.setOutputChannelName("nullChannel");
        sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
        sftpOutboundGateway.setAsync(true);
        return sftpOutboundGateway;
      }
    
  • 额外:如果我们想要移动有错误的文件(来自错误 channel )。我们需要改变setRenameExpressionString匹配 ErrorMessage结构。例如。
  •   @Bean
      @Order(Ordered.LOWEST_PRECEDENCE)
      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public MessageHandler moveErrorFile() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
            "payload['failedMessage']['headers']['file_remoteDirectory'] + '/' + payload['failedMessage']['headers']['file_remoteFile']");
        sftpOutboundGateway
            .setRenameExpressionString(
                "payload['failedMessage']['headers']['file_remoteDirectory'] + '/error/' + payload['failedMessage']['headers']['timestamp'] + '-' + payload['failedMessage']['headers']['file_remoteFile']");
        sftpOutboundGateway.setRequiresReply(false);
        sftpOutboundGateway.setUseTemporaryFileName(true);
        sftpOutboundGateway.setOutputChannelName("nullChannel");
        sftpOutboundGateway.setOrder(Ordered.HIGHEST_PRECEDENCE);
        sftpOutboundGateway.setAsync(true);
        return sftpOutboundGateway;
      }
    

    最佳答案

    artem-bilan回答 Spring integration - SFTP rename or move file in remote server after copying 我可以通过将 InboundChannelAdapter 更改为 PublishSubscribeChannel 并创建新订阅者来解决该问题,如下所示

     @Bean
      @Order(Ordered.LOWEST_PRECEDENCE)
      @ServiceActivator(inputChannel = "streamChannel")
      public MessageHandler moveFile() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
            "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
        sftpOutboundGateway
            .setRenameExpressionString("headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
        sftpOutboundGateway.setRequiresReply(false);
        sftpOutboundGateway.setUseTemporaryFileName(true);
        sftpOutboundGateway.setOutputChannelName("nullChannel");
        sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
        sftpOutboundGateway.setAsync(true);
        return sftpOutboundGateway;
      }
    

    关于spring-integration - Spring集成在生成消息后将文件移动到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75220522/

    相关文章:

    java - 获取数据报的IP地址 spring-integration

    java - Spring Integration 中的 REST 端点使消息 channel 成为多线程

    java - 如何运行 Spring 与多线程集成

    java - Spring 入站文件 channel 适配器 Prevent-duplicates=false 不起作用

    java - 我可以直接从java代码调用spring集成链元素吗?

    java - Spring 集成 - 带有 SimpleAsyncTaskExecutor 的任务执行器

    spring-integration - 如何将特定的kafka消费者分配给特定的分区

    java - 带有 Spring 集成的 UDP 服务器

    java - 用于带删除的 SFTP 出站的 Spring Integration DSL

    spring-integration - 使用 java 配置从 sftp 服务器接收文件后,有没有办法停止入站 channel 适配器