我有一个在 webflux 上使用 redis 消息传递操作的 websocket 实现。它的作用是监听主题并通过 websocket 端点返回值。
我遇到的问题是,每次用户通过 websocket 向端点发送消息时,似乎进行了全新的 redis 订阅,导致订阅者在 redis 消息主题上累积,并且 websocket 响应随着数量的增加而增加Redis 主题消息订阅以及(示例用户发送 3 条消息,redis 主题订阅增加到 3,websocket 连接响应 3 次)。
想知道是否有一种方法可以重复使用对消息传递主题的相同订阅,从而防止多个 redis 主题订阅。
我使用的代码如下:
Websocket 处理程序
public class SendingMessageHandler implements WebSocketHandler { private final Gson gson = new Gson(); private final MessagingService messagingService; public SendingMessageHandler(MessagingService messagingService) { this.messagingService = messagingService; } @Override public Mono<Void> handle(WebSocketSession session) { Flux<WebSocketMessage> stringFlux = session.receive() .map(WebSocketMessage::getPayloadAsText) .flatMap(inputData -> messagingService.playGame(inputData) .map(data -> session.textMessage(gson.toJson(data)) ) ); return session.send(stringFlux); } }
消息处理服务
公共(public)类 MessagingService{ private final ReactiveRedisOperations
reactiveRedisOperations; public MessagingService(ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations) { this.reactiveRedisOperations = reactiveRedisOperations; } public Flux<Object> playGame(UserInput userInput){ return reactiveRedisOperations.listenTo("TOPIC_NAME"); }
提前谢谢你。
最佳答案
代替使用 ReactiveRedisOperations
,MessageListener
是这里的方法。您可以注册一次监听器,并使用以下作为监听器。
data -> session.textMessage(gson.toJson(data))
注册应该只在连接开始时发生一次。您可以覆盖 SendingMessageHandler
的 void afterConnectionEstablished(WebSocketSession session)
来完成此操作。这样,每个新的 Websocket 连接、每条消息都会创建一个新的订阅。
此外,不要忘记覆盖 afterConnectionClosed
,并取消订阅 redis 主题,并清理其中的监听器。
关于redis - Webflux,使用 Websocket 如何防止订阅两次 reactive redis 消息传递操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70051118/