我想将传入消息写入消息队列,并让单个专用线程无延迟地使用消息 - 与 Spring Integration listen on queue without poller 非常相似
我尝试过:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
但这会导致任务调度程序线程调度轮询操作,然后轮询操作将阻塞,直到消息可用为止。这不是我的“专用线程”想法,如果任务调度程序线程也用于写入队列,并且另一端没有消费者线程,则会导致我的应用程序死锁。
我尝试的下一步是:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
但是,由于 fixedDelay(0)
,这导致应用程序产生大量计划任务。
我遇到的下一个选项是:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
.route(inboundRouter())
.get()
这似乎按预期工作;我有一个专门的线程来处理所有消息。但是,我不再拥有可以监控其统计信息(通过 JMX)的消息队列。
那么,有什么办法可以实现我的目标吗?如何实现?
最佳答案
好吧,那么您必须真正为该线程专用单独的TaskScheduler
。
不幸的是,该框架尚未提供明确的 API 来将该 API 注入(inject)端点,但无论如何这是可能的:
@Bean
public IntegrationFlow dedicatedPollingThreadFlow() {
return IntegrationFlows.from(MessageChannels.queue("myQueueChannel"))
.bridge(e -> e
.poller(Pollers.fixedDelay(0).receiveTimeout(-1))
.id("dedicatedPollingConsumer"))
.channel(c -> c.queue("results"))
.get();
}
@Bean
public TaskScheduler dedicatedTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
@Bean
@DependsOn("dedicatedPollingThreadFlow")
public String dedicatedPollingConsumerConfigurer(
@Qualifier("dedicatedPollingConsumer") PollingConsumer dedicatedPollingConsumer) {
dedicatedPollingConsumer.setTaskScheduler(dedicatedTaskScheduler());
return "";
}
还要注意.receiveTimeout(-1)
。这样,它就会执行常规的 BlockingQueue.take()
永远阻塞您的专用线程。
从框架角度来看,将 TaskScheduler
与现有 .poller()
一起注入(inject)到 GenericEndpointSpec
中。
同时 JIRA ticket .
关于java - Spring集成MessageQueue无需轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44183584/