java - Spring Integration 消息轮询

标签 java spring spring-integration spring-messaging

我有一个 Spring 配置设置,用于从数据库队列轮询消息:

<int:annotation-config default-publisher-channel="messageChannel" />

<task:executor id="messageTaskExecutor" pool-size="1"
    queue-capacity="1" rejection-policy="CALLER_RUNS" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<bean id="messageQueryProvider"
    class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />

<bean id="messageSessionStore"
    class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource" />
    <property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
    <property name="tablePrefix" value="QUEUE_" />
    <property name="usingIdCache" value="true" />
</bean>

<int:channel id="messageChannel">
    <int:queue message-store="messageSessionStore" />
</int:channel>

<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
    <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>

但是,该应用程序在多个节点上运行。当服务器重新启动时,似乎会发生消息被超过1个节点拾取的情况(节点全部立即关闭并按顺序重新启动)。有没有办法避免多次消息处理?

最佳答案

使用 OracleChannelMessageStoreQueryProvider 是不可能的。只是因为我们依赖FOR UPDATE SKIP LOCKED 。因此当 SELECT由一个节点执行,记录被锁定,下一条记录将转到表中的下一个空闲行。

JavaDoc 中没有 setUsingIdCache() :

 * <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
 * to true, as the Oracle query will ignore locked rows.</p>

但我认为这完全无关。删除该选项和 <int:transaction-synchronization-factory>您将简化配置,但不得更改行为。

我想你看到的就像round-robin :一个节点获取第一行,下一个节点跳过它并获取下一行。

我不相信不同的节点在 Oracle 时会收到相同的消息。

关于java - Spring Integration 消息轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49258844/

相关文章:

spring - 部署 spring 应用程序时出现 javax.management.InstanceAlreadyExistsException

java - 获取数据报的IP地址 spring-integration

java - 部署时出现空指针异常

java - 背压时 Intent 未清除, "on create"中的项目不断堆叠。安卓

java - HTTP 状态 404 – 未找到 tomcat spring

java - hibernate : Not able to create table

java - java中对象的内存管理

java - 无法 Autowiring org.springframework.mail.javamail.JavaMailSender

java - 使用hibernate java mysql保存@Lob时出错

Java Spring SFTP 连接无法重新连接?