我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent
的 Flux。这些事件是通过利用 spring-amqp
从 RabbitMQ 队列中消费消息来创建的。我的问题是:如何最好地将 MessageListener
的已配置监听器方法连接到可以传递给我的 Controller 的 Flux?
Project Reactor 的 create部分提到它“将现有 API 与响应式(Reactive)世界连接起来非常有用 - 例如基于监听器的异步 API”,但我不确定如何直接连接到消息监听器,因为它包含在 DirectMessageListenerContainer
和 MessageListenerAdapter
。他们在创建部分的示例:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
到目前为止,我最好的选择是创建一个 Processor
并在 RabbitMQ 监听器方法中每次调用 onNext()
以手动生成事件。
最佳答案
我有这样的事情:
@SpringBootApplication
@RestController
public class AmqpToWebfluxApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(AmqpToWebfluxApplication.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("foo", "event-" + i);
}
}
private TopicProcessor<String> sseFluxProcessor = TopicProcessor.share("sseFromAmqp", Queues.SMALL_BUFFER_SIZE);
@GetMapping(value = "/sseFromAmqp", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromAmqp() {
return this.sseFluxProcessor;
}
@RabbitListener(id = "fooListener", queues = "foo")
public void handleAmqpMessages(String message) {
this.sseFluxProcessor.onNext(message);
}
}
TopicProcessor.share()
允许有许多并发订阅者,当我们将此 TopicProcessor
作为 Flux
返回给我们的 /sseFromAmqp
通过 WebFlux 的 REST 请求。
@RabbitListener
只是将其接收到的消息委托(delegate)给该 TopicProcessor
。
在 main()
我有一个代码来确认即使没有订阅者我也可以发布到 TopicProcessor
。
使用两个单独的 curl
session 进行测试,并通过 RabbitMQ 管理插件将消息发布到队列。
顺便说一下,我使用 share()
因为:https://projectreactor.io/docs/core/release/reference/#_topicprocessor
from multiple upstream Publishers when created in the shared configuration
那是因为 @RabbitListener
确实可以从不同的 ListenerContainer
线程同时调用。
更新
我还将这个示例移到了我的 Sandbox
:https://github.com/artembilan/sendbox/tree/master/amqp-to-webflux
关于spring-amqp - 在 spring-webflux 中使用 Spring AMQP 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49662157/