java - Apache Camel、sFTP 组件和幂等存储库

标签 java spring apache-camel

我想以高可用性 (HA) 模式构建应用程序。
这意味着我可以拥有可变数量的实例,即我需要有 5 个应用程序实例。
应用程序应该从 ftp/sftp 读取数据,避免重复(一个文件不能处理 2 次)。
为了解决这个问题,我决定在 active/active 中使用集群 Camel 路由。设置。此设置使用幂等存储库。
下面是我的幂等存储库配置(我使用的是 spring boot、camel spring boot starters、sql 和 ftp)

@Configuration
public class IdempotentRepoConf {

    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcMessageIdRepository sftpProcesorName() {
        return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
    }

}
还有我的路由器
@Component
public class FooSftpRouter extends RouteBuilder {

    @Autowired
    private IdempotentRepository idempotentRepository;

    @Override
    public void configure() throws Exception {
        from("sftp:localhost:2221/upload/files/foo?username=foo" +
                "&password=pass" +
                "&move=./.done" +
                "&moveFailed=.error" +
                "&idempotentRepository=#sftpProcesorName")
                .idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
                    .to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
                .end();

    }
}

当我只运行一个实例时,一切正常,但当我运行多个实例时,出现错误
2020-08-03 15:14:14.589  WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer         : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 4 
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:778) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFile(SftpOperations.java:717) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:434) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:145) ~[camel-ftp-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:234) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:196) ~[camel-file-3.2.0.jar:3.2.0]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187) ~[camel-support-3.2.0.jar:3.2.0]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:106) ~[camel-support-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.jcraft.jsch.SftpException: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2225) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1318) ~[jsch-0.1.55.jar:na]
    at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1290) ~[jsch-0.1.55.jar:na]
    at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:759) ~[camel-ftp-3.2.0.jar:3.2.0]
    ... 13 common frames omitted
要收到此警告,我执行以下步骤
  • 在本地化中将多个文件放在 sftp 中 /home/foo/upload/files/foo (即

  • for i in {1..10}; do  touch "Foo $i "; done;
    
  • 检查数据库(postgress)

  • select * from camel_messageprocessed;
    
    正如我所料,我有 10 条记录
      processorname   | messageid |        createdat        
    ------------------+-----------+-------------------------
     sftpProcesorName | Foo 1     | 2020-08-03 15:14:13.392
     sftpProcesorName | Foo 10    | 2020-08-03 15:14:13.607
     sftpProcesorName | Foo 9     | 2020-08-03 15:14:14.409
     sftpProcesorName | Foo 6     | 2020-08-03 15:14:14.419
     sftpProcesorName | Foo 8     | 2020-08-03 15:14:14.427
     sftpProcesorName | Foo 2     | 2020-08-03 15:14:14.435
     sftpProcesorName | Foo 3     | 2020-08-03 15:14:14.447
     sftpProcesorName | Foo 5     | 2020-08-03 15:14:14.455
     sftpProcesorName | Foo 4     | 2020-08-03 15:14:14.462
     sftpProcesorName | Foo 7     | 2020-08-03 15:14:14.469
    (10 rows)
    
    但在日志中我多次看到警告和错误
    org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3
    
    Caused by: com.jcraft.jsch.SftpException: No such file
    
    我的 build.gradle 有依赖
        compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
        compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'
    
    
    我也试过选项 readLock=idempotent但是这个选项在 documentation(only for file component) 并且可能不适用于 ftp/sftp 组件
    所以我避免重复和只处理一次的问题仍然没有解决。
    我做错了什么?

    最佳答案

    很明显,您的流程正在竞争。我不认为这是在做任何事情&idempotentRepository=#sftpProcesorNameidempotentConsumer() DSL 仅在 SFTP 发生后运行。
    从内存中尝试的一些项目:

  • readLock=fileLock # 可能行不通
  • readlock=markerFile # 可能是“基本正常”操作所需的全部
  • inProgressRepository=#jdbcRepository应该是铁包溶液。
  • 关于java - Apache Camel、sFTP 组件和幂等存储库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63230658/

    相关文章:

    java - 无法访问的语句帮助(链表)

    java - Camel 文件: Stop route when all files are processed

    java - 等待依赖项 [(&(component=Simple)(objectClass=org.apache.camel.spi.ComponentResolver))]

    java - 如何查询特定 DBpedia 资源/页面的多个实体?

    java - 嵌套的ArrayList初始化有问题吗?

    java - 从 DELETE 和 INSERT 切换到 UPDATE 是否更快

    spring - 如何在 Spring 3.1 中构造函数 Autowiring HttpServletResponse?

    java - Spring singleton bean 在被代理时在每个方法调用上创建新的实例/bean

    mysql - Spring-Boot,无法使用spring-data JPA在MySql中保存unicode字符串

    javascript - 如何记录来自 Camel 调用的 Javascript 的消息?