spring-amqp - 在 spring-webflux 中使用 Spring AMQP 消费者

标签 spring-amqp project-reactor

我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 从 RabbitMQ 队列中消费消息来创建的。我的问题是:如何最好地将 MessageListener 的已配置监听器方法连接到可以传递给我的 Controller 的 Flux?

Project Reactor 的 create部分提到它“将现有 API 与响应式(Reactive)世界连接起来非常有用 - 例如基于监听器的异步 API”,但我不确定如何直接连接到消息监听器,因为它包含在 DirectMessageListenerContainerMessageListenerAdapter。他们在创建部分的示例:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

到目前为止,我最好的选择是创建一个 Processor 并在 RabbitMQ 监听器方法中每次调用 onNext() 以手动生成事件。

最佳答案

我有这样的事情:

@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);

        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("foo", "event-" + i);
        }

    }

    private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);

    @GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getSeeFromAmqp() {
        return this.sseFluxProcessor;
    }

    @RabbitListener(id = "fooListener", queues = "foo")
    public void handleAmqpMessages(String message) {
        this.sseFluxProcessor.onNext(message);
    }

}

TopicProcessor.share() 允许有许多并发订阅者,当我们将此 TopicProcessor 作为 Flux 返回给我们的 /sseFromAmqp 通过 WebFlux 的 REST 请求。

@RabbitListener 只是将其接收到的消息委托(delegate)给该 TopicProcessor

main() 我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor

使用两个单独的 curl session 进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。

顺便说一下,我使用 share() 因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor

from multiple upstream Publishers when created in the shared configuration

那是因为 @RabbitListener 确实可以从不同的 ListenerContainer 线程同时调用。

更新

我还将这个示例移到了我的 Sandbox:https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux

关于spring-amqp - 在 spring-webflux 中使用 Spring AMQP 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49662157/

相关文章:

java - 如何创建Reactor Netty热流

java - 如何打开单声道

java - 在 TimeoutMainSubscriber 的情况下将 Reactor Context 复制到 MDC

java - 如何对Flux进行移动窗口计算并将结果作为新的Flux输出

java - 没有连接的 AMQP/RabbitMQ channel 什么时候死掉?

java - Activemq 与 rabbitmq

reactive-programming - 抛出异常作为 react 流验证的最正确方法

spring-boot - 多个带有 Spring Boot 的 Rabbitmq 队列

Spring AMQP RPC 非默认交换

java - 如何将消息直接绑定(bind)到我的对象类