java - Spring 集成: retry configuration with multi-instances

标签 java spring spring-boot spring-integration

我正在 4 个不同的服务器上运行 4 个基于 Spring Boot Integration 的应用程序实例。 流程是:

  1. 在共享文件夹中逐个读取 XML 文件。
  2. 处理文件(检查结构、内容...)、转换数据并发送电子邮件。
  3. 在另一个共享文件夹中编写有关此文件的报告。
  4. 删除已成功处理的文件。

我正在寻找一种非阻塞且安全的解决方案来处理这些文件。

用例:

  • 如果实例在读取或处理文件时崩溃(因此不会结束集成链):另一个实例必须处理该文件,或者同一实例必须在重新启动后处理该文件。
  • 如果一个实例正在处理文件,其他实例不得处理该文件。

我已经构建了这个 Spring Integration XML 配置文件(它包括带有共享 H2 数据库的 JDBC 元数据存储):

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
    <int:queue/>
</int:channel>

<!-- Input -->
<int-file:inbound-channel-adapter 
    id="inputFilesAdapter"
    channel="inputFilesChannel"
    directory="file:${input.files.path}" 
    ignore-hidden="true"
    comparator="lastModifiedFileComparator"
    filter="compositeFilter">
   <int:poller fixed-rate="10000" max-messages-per-poll="1"  task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="1"/>

<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
    <property name="driverClassName" value="org.h2.Driver"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxIdle" value="4"/>
</bean>

<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
    <constructor-arg ref="jdbcDataSource"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="jdbcDataSource"/>
</bean>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                <constructor-arg index="0" ref="jdbcMetadataStore"/>
                <constructor-arg index="1" value="files"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
    <int:service-activator ref="fileActivator" method="fileRead"/>
    <int:service-activator ref="fileActivator" method="fileProcess"/>
    <int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>

<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>

<int-file:outbound-channel-adapter 
    id="outputFilesChannel" 
    directory="file:${output.files.path}"
    filename-generator-expression ="payload.name">
    <int-file:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
             <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
        </bean>
    </int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>

</beans>

问题: 对于多个文件,当成功处理 1 个文件时,事务将提交元数据存储区(表INT_METADATA_STORE)中的其他现有文件。因此,如果应用程序重新启动,其他文件将永远不会被处理 (如果应用程序在处理第一个文件时崩溃,它可以正常工作)。 看起来它只适用于读取文件,不适用于处理集成链中的文件...如何逐个文件管理 JVM 崩溃文件上的回滚事务?

非常感谢任何帮助。这会让我发疯:(

谢谢!

编辑/注释:

最佳答案

您不应该使用 PseudoTransactionManager,而应使用 DataSourceTransactionManager

由于您使用 JdbcMetadataStore,它将参与事务,如果下游流失败,元数据存储中的条目也将回滚。

关于java - Spring 集成: retry configuration with multi-instances,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53260483/

相关文章:

java - 在中使用 Autowiring 对象

java - 如何将单击的单选按钮的值传递给 Spring MVC Controller ?

spring - 获取原因 :null in property dlqDeliveryFailureCause

mysql - H2 jdbc异常 - 奇数个字符的十六进制字符串

java - Spring MVC 表单处理

java - 如何使用可变日期作为数组名称的 JSON - Java/Spring

spring-boot - 如何在 Spring Boot 中设置同站 cookie 标志?

java - 如何授权 QuickBooks Online 对私有(private)桌面应用程序的请求

java - 如何从 Java 中的二维对象数组中提取数字

java - 安卓/ Gradle : specify a dependency for a subset of build types