java - Spring集成MessageQueue无需轮询

标签 java spring multithreading spring-integration

我想将传入消息写入消息队列,并让单个专用线程无延迟地使用消息 - 与 Spring Integration listen on queue without poller 非常相似

我尝试过:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但这会导致任务调度程序线程调度轮询操作,然后轮询操作将阻塞,直到消息可用为止。这不是我的“专用线程”想法,如果任务调度程序线程也用于写入队列,并且另一端没有消费者线程,则会导致我的应用程序死锁。

我尝试的下一步是:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但是,由于 fixedDelay(0),这导致应用程序产生大量计划任务。

我遇到的下一个选项是:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
  .route(inboundRouter())
  .get()

这似乎按预期工作;我有一个专门的线程来处理所有消息。但是,我不再拥有可以监控其统计信息(通过 JMX)的消息队列。

那么,有什么办法可以实现我的目标吗?如何实现?

最佳答案

好吧,那么您必须真正为该线程专用单独的TaskScheduler

不幸的是,该框架尚未提供明确的 API 来将该 API 注入(inject)端点,但无论如何这是可能的:

    @Bean
    public IntegrationFlow dedicatedPollingThreadFlow() {
        return IntegrationFlows.from(MessageChannels.queue("myQueueChannel"))
                .bridge(e -> e
                        .poller(Pollers.fixedDelay(0).receiveTimeout(-1))
                        .id("dedicatedPollingConsumer"))
                .channel(c -> c.queue("results"))
                .get();
    }

    @Bean
    public TaskScheduler dedicatedTaskScheduler() {
        return new ThreadPoolTaskScheduler();
    }

    @Bean
    @DependsOn("dedicatedPollingThreadFlow")
    public String dedicatedPollingConsumerConfigurer(
            @Qualifier("dedicatedPollingConsumer") PollingConsumer dedicatedPollingConsumer) {
        dedicatedPollingConsumer.setTaskScheduler(dedicatedTaskScheduler());
        return "";
    }

还要注意.receiveTimeout(-1)。这样,它就会执行常规的 BlockingQueue.take() 永远阻塞您的专用线程。

从框架角度来看,将 TaskScheduler 与现有 .poller() 一起注入(inject)到 GenericEndpointSpec 中。

同时 JIRA ticket .

关于java - Spring集成MessageQueue无需轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44183584/

相关文章:

.net - 调用 UI 委托(delegate)会导致 UI 被隐藏

java - 将字符串中的字符增加特定值

javascript - 通过 Angular JS 发布多个数组数据

java - Xuggler是否生成RTSP流

spring - 如何在 Vaadin 应用程序中处理发布请求?

java - 如何在 spring-batch 中拆分和加入流程以配置作业中的设置和拆卸步骤

multithreading - 线程和进程与多线程和多核/多处理器 : How they are mapped?

java - 如何在执行 Action 的过程中延迟?

java - pom.xml 从 https ://start. spring.io/下载后显示错误

java - 如果appendAllowed 设置为true,writeHeader() 仍在运行吗?