java - Spring Integration消息轮询

原文 标签 java spring spring-integration spring-messaging

我有一个用于从数据库队列中轮询消息的Spring配置设置:

<int:annotation-config default-publisher-channel="messageChannel" />

<task:executor id="messageTaskExecutor" pool-size="1"
    queue-capacity="1" rejection-policy="CALLER_RUNS" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<bean id="messageQueryProvider"
    class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />

<bean id="messageSessionStore"
    class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource" />
    <property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
    <property name="tablePrefix" value="QUEUE_" />
    <property name="usingIdCache" value="true" />
</bean>

<int:channel id="messageChannel">
    <int:queue message-store="messageSessionStore" />
</int:channel>

<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
    <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>


但是,该应用程序在多个节点上运行。重新启动服务器时,似乎发生了消息被多个节点拾取的情况(所有节点都立即关闭并按顺序重新启动)。有什么方法可以避免多条消息处理?

最佳答案

使用OracleChannelMessageStoreQueryProvider这是不可能的。仅仅因为我们依赖FOR UPDATE SKIP LOCKED。因此,当一个节点执行SELECT时,记录将被锁定,下一个记录将转到表中的下一个空闲行。

JavaDoc中没有setUsingIdCache()的内容:

 * <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
 * to true, as the Oracle query will ignore locked rows.</p>


但是我认为这是完全无关的。删除该选项和<int:transaction-synchronization-factory>将简化您的配置,但是不得更改其行为。

我认为您所看到的就像round-robin:一个节点获得第一行,下一个跳过它并获得下一个。

我以某种方式不相信使用Oracle时,不同的节点会收到相同的消息。

关于java - Spring Integration消息轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49258844/

相关文章:

java - 监听 Hibernate Session 创建

java - 使用Spring的元注释扩展lombok注释

java - 如何设置TLS Server以在Spring集成中验证客户端?

java - 如何使用 Spring Integration 2.0.5 根据内容路由消息?

java - Java-读写.txt文件

java - Java中的ascii char表示?

任务 ':app:transformResourcesWithMergeJavaResForDebug'的java执行失败

Java Spring Controller 拒绝除 GET 之外的所有请求

java - 没有参数的 Spring Integration Gateway

java - 执行完所有测试后进行清理(spock 框架)