我正在 4 个不同的服务器上运行 4 个基于 Spring Boot Integration 的应用程序实例。 流程是:
- 在共享文件夹中逐个读取 XML 文件。
- 处理文件(检查结构、内容...)、转换数据并发送电子邮件。
- 在另一个共享文件夹中编写有关此文件的报告。
- 删除已成功处理的文件。
我正在寻找一种非阻塞且安全的解决方案来处理这些文件。
用例:
- 如果实例在读取或处理文件时崩溃(因此不会结束集成链):另一个实例必须处理该文件,或者同一实例必须在重新启动后处理该文件。
- 如果一个实例正在处理文件,其他实例不得处理该文件。
我已经构建了这个 Spring Integration XML 配置文件(它包括带有共享 H2 数据库的 JDBC 元数据存储):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:${input.files.path}"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:${output.files.path}"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
问题:
对于多个文件,当成功处理 1 个文件时,事务将提交元数据存储区(表INT_METADATA_STORE
)中的其他现有文件。因此,如果应用程序重新启动,其他文件将永远不会被处理
(如果应用程序在处理第一个文件时崩溃,它可以正常工作)。
看起来它只适用于读取文件,不适用于处理集成链中的文件...如何逐个文件管理 JVM 崩溃文件上的回滚事务?
非常感谢任何帮助。这会让我发疯:(
谢谢!
编辑/注释:
我已根据 Artem Bilan 的回答更新了我的配置。并删除轮询器 block 中的事务性 block :我在实例之间存在事务冲突(丑陋的表锁异常)。尽管行为是相同的。
我在
poller
block 中测试此配置失败(相同行为):<int:advice-chain> <tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="file*" timeout="30000" propagation="REQUIRED"/> </tx:attributes> </tx:advice> </int:advice-chain>
也许是基于 Idempotent Receiver Enterprise Integration Pattern 的解决方案可以工作。但我没能配置它......我找不到精确的文档。
最佳答案
您不应该使用 PseudoTransactionManager
,而应使用 DataSourceTransactionManager
。
由于您使用 JdbcMetadataStore
,它将参与事务,如果下游流失败,元数据存储中的条目也将回滚。
关于java - Spring 集成: retry configuration with multi-instances,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53260483/