spring - 在一段时间内从 channel 消耗给定数量的消息

标签 spring spring-integration

首先向Artem Bilan提问(我和你以前的同事一起工作)

我有这样的流程:

  • JdbcPollingChannelAdapter 用于尽快获取数据集(因为数据库是流程瓶颈)
  • CHAIN_OF_TRANSFORMATORS(由于业务需求)
  • ServiceActivator,用于使用 JmsTemplate 发送 JMS

根据要求,我必须在每个静态周期发送不同数量的消息(它由一些具有“10,20,100...”结构的“配置文件”提供,意味着“在第一分钟内发送 10 条消息,在第二分钟内发送 20 条消息” ,第三分钟内 100 条消息...”)。 周期的实现非常简单,它由 PeriodicPoller 提供。

问题是需求的第一部分。通过使用 SqlParameterSource 实现的 JdbcPollingChannelAdapter 获取给定数量的表行的情况,但不符合上面提到的“瓶颈”原因。是否可以“从 channel 获取N条消息”?

我尝试在适配器旁边的 channel 上使用一些 ReleaseStrategy 来实现它,但没有将其与定期轮询结合起来。如果有人帮助我,我会详细描述使用的方法。

与使用 Java DSL 描述集成流上下文相关的额外困难,同时大多数示例都使用 XML。

感谢您的建议!

最佳答案

在与 Dmitrii 私下讨论后,我们提出了这个解决方案。

要求

  • 读取一定数量的消息并将其一条一条发送到JMS队列
  • 每个轮询间隔的消息数量都会发生变化
  • 发送消息并将其数量与特定轮询任务所需的数量进行比较。如果它们不相等,则会生成错误报告,表明没有足够的消息可供轮询。

解决方案

  1. QueueChannel 中收集消息
  2. 更改maxMessagesPerPoll对于每个轮询的端点
  3. <poller> ( PollerMetadata ) 有 adviceChain选项。
  4. 这样我们就可以提供一些定制的 Advice (MethodInterceptor)
  5. 注入(inject)我们的PollingConsumer改变它maxMessagesPerPoll之前invocation.proceed()
  6. 使用一些AtomicInteger bean 在 ChannelInterceptor#preSend 中递增和reset在那Advice
  7. 此外,您可以在该建议中检查之前的民意调查状态,比较 maxMessagesPerPoll与柜台。

关于spring - 在一段时间内从 channel 消耗给定数量的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29100051/

相关文章:

spring-integration:如何以 SSE 形式提供延迟详细信息

spring-integration - Spring集成窃听和日志记录 channel 适配器日志记录问题

java - spring出错时保留textbox的值

java - Spring Security 3 Web + Restful登录

java - BindingResult 和 bean 名称 'newUser' 的普通目标对象都不能作为请求属性

java - Spring MVC : Ensure parameter is valid, 通过多个 Controller 进行横切

java - 如何使用FtpInboundFileSynchronizer自动删除本地文件?

java - 独立的 Spring Boot 集成 Java 项目

java - Spring 与 JMS 配置集成

spring - 组件扫描在 Tomcat webapp 的 JAR 中找不到 @Component