java - 如何最好地在 Spring Integration 中实现 DynamicPoller

标签 java spring spring-integration

我有一个接收大型消息的流程(这些消息位于 RDBMS 表中),因此我无法在给定时间处理太多此类消息。因此,我正在使用 <int:poller max-messages-per-poll="" /> 限制处理, 还有一些 queuescapacity设置为 <int:queue capacity=""> .我了解多个线程/事务将参与此流程,对于用例,这是可以接受的。

轮询数据库的查询需要一些时间才能运行,因此我不想比我需要的更频繁地运行它。此外,此流收到的消息往往会在“突发”内进入,这意味着它可能会收到 1000 条消息,然后一个小时内没有收到任何消息。

我想做的是使用 dynamic-poller这将很少进行轮询(因为正如所指出的那样,查询的运行成本很高)除非我看到我收到大量消息,在这种情况下我想非常频繁地进行轮询,直到处理完所有消息。例如,如果我有 <int:poller max-messages-per-poll="100" />并且我知道轮询器刚刚读取了 100 条消息,那么很有可能 RDBMS 中还有更多消息需要处理,我应该在处理完成后立即再次轮询。

我知道 Spring 不提供修改 trigger 的方法使它在本质上是动态的,并且已经看过 Spring Integration Ref “7.1.5 Change Polling Rate at Runtime “ 在dynamic-poller示例项目:Dynamic Poller
这是一个开始,但我真的需要 poller根据当前负载改变它的频率。
我在这一点上可能不正确,但我认为 Gary 在他关于“ Implementing High-Availability Architectures with Spring Integration ”的演讲中可能提到了类似的实现起来很有趣。
无论如何编写一个类来更改 poller频率似乎没什么大不了的。更具挑战性的是如何知道何时发生了没有结果的投票,因为没有任何内容被发布到输出 channel 。

我考虑过的一些选项:

  1. 附上 <int:wire-tap channel="" />poller的 channel 调用 <int:service-activator> .服务激活器检查消息数量并调整 pollerperiodDynamicPeriodicTrigger 上.
    问题是,如果没有收到任何消息,这将永远不会被调用,所以一旦我调整以更频繁地轮询,轮询周期将无限期地保持下去。

  2. 与 #1 相同,但将逻辑添加到 DynamicPeriodicTrigger这将恢复 period回到initialDelay在下一个触发发生后或在特定时间段后。

  3. 使用<int:advice-chain> <int:poller> 中的元素带有 MethodInterceptor 的元素实现。
    类似于 Artem 在这个 link 中的建议. 虽然这让我可以站在 receive 的前面方法,它不允许我访问 receive 的结果方法(这会给我检索到的消息数)。请注意,这似乎已被 Gary 在 link 中提到的内容所证实。 .

    The request handler advice chain is a special case; we had to take care to only advise the internal endpoint methods and not any downstream processing (on output channels).

    Advising pollers is simpler because we're advising the whole flow. As described in section "7.1.4 Namespace Support" subsection "AOP Advice chains", you simply create an advice by implementing the MethodInterceptor interface.

    See SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain() for a very simple advice...

    Code:
    adviceChain.add(new MethodInterceptor() {
    public Object invoke(MethodInvocation invocation) throws Throwable {
    adviceApplied.set(true);
    return invocation.proceed();
    }
    });
    This simply is used to assert that the advice was called properly; a real advice would add code before and/or after the invocation.proceed().

    In effect, this advice advises all methods, but there is only one, (Callable.call()).

  4. 创建 AfterReturning带有寻找 Message<T> receive() 切入点的建议方法。

  5. 克隆 JdbcPollingChannelAdapter并在那个新类(class)中添加我的钩子(Hook)。

  6. 也许 Gary 在这个 link 上的建议会很有用,但“要点”链接不再有效。

更新:
我最终实现的选项是使用 AfterReturningAdvice看起来像下面这样。
原代码:

<int-jdbc:inbound-channel-adapter id="jdbcInAdapter" 
    channel="inputChannel" data-source="myDataSource"
    query="SELECT column1, column2 from tableA"
    max-rows-per-poll="100">
    <int:poller fixed-delay="10000"/>
</int-jdbc:inbound-channel-adapter>

新代码:

<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
    <constructor-arg name="period" value="20000" />
</bean> 
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata">
    <property name="maxMessagesPerPoll" value="1000"/>
    <property name="trigger" ref="jdbcDynamicTrigger"/>
</bean>
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
    <property name="newPeriod" value="1"/>
    <property name="adjustmentThreshold" value="100"/>
    <property name="pollerMetadata" ref="jdbcPollerMetaData"/>
</bean> 
<aop:config>
    <aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" >
        <aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/>
    </aop:aspect>   
</aop:config>   
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="myDataSource" />
    <constructor-arg value="SELECT column1, column2 from tableA" />
    <property name="maxRowsPerPoll" value="100" />
</bean> 
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean" 
    channel="inputChannel"
    auto-startup="false">
    <int:poller ref="jdbcPollerMetaData" />
</int:inbound-channel-adapter>

我对此做了更多研究,觉得 Spring Integration 或许可以为轮询器提供一些 Hook ,以便开发人员可以更好地自定义它们。
有关详细信息,请参阅 https://jira.spring.io/browse/INT-3633

如果该 JIRA 没有得到实现,并且有人对我实现的代码感兴趣,请对此添加评论,我将在 github 或 gist 上提供代码。

最佳答案

感谢打开 JIRA 问题;我们应该在那里讨论这个特性,因为堆栈溢出不太适合扩展对话。

但是,我不确定您上面所说的“...但是“要点”链接不再有效...”是什么意思。它对我来说很好...... https://gist.github.com/garyrussell/5374267但让我们在 JIRA 中讨论。

关于java - 如何最好地在 Spring Integration 中实现 DynamicPoller,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28369308/

相关文章:

java - 如何暂时禁用Spring缓存的缓存

java - Spring Rest 模板和 JSON 数据

java - Spring 集成 channel 链怪异

java - Spring Integration中文件的处理组

java - Spring Integration SFTP 出站网关 mget -R 问题

java - "Could not find or load main class"使用命令行

java - 串联sql语句插入

java - 开发一个聊天网站

java - Spring路由因无效路由而出现404错误

hibernate - 使用 DBUnit 测试 JPA/Hibernate 实体