redis - Webflux,使用 Websocket 如何防止订阅两次 reactive redis 消息传递操作

标签 redis spring-webflux spring-websocket

我有一个在 webflux 上使用 redis 消息传递操作的 websocket 实现。它的作用是监听主题并通过 websocket 端点返回值。

我遇到的问题是,每次用户通过 websocket 向端点发送消息时,似乎进行了全新的 redis 订阅,导致订阅者在 redis 消息主题上累积,并且 websocket 响应随着数量的增加而增加Redis 主题消息订阅以及(示例用户发送 3 条消息,redis 主题订阅增加到 3,websocket 连接响应 3 次)。

想知道是否有一种方法可以重复使用对消息传递主题的相同订阅,从而防止多个 redis 主题订阅。

我使用的代码如下:

  • Websocket 处理程序

    public class SendingMessageHandler implements WebSocketHandler {
      private final Gson gson = new Gson();
    
      private final MessagingService messagingService;
    
      public SendingMessageHandler(MessagingService messagingService) {
          this.messagingService = messagingService;
      }
    
      @Override
      public Mono<Void> handle(WebSocketSession session) {
          Flux<WebSocketMessage> stringFlux = session.receive()
                  .map(WebSocketMessage::getPayloadAsText)
                  .flatMap(inputData ->
                          messagingService.playGame(inputData)
                                  .map(data ->
                                          session.textMessage(gson.toJson(data))
                                  )
                  );
    
          return session.send(stringFlux);
      }
     }
    
  • 消息处理服务

    公共(public)类 MessagingService{ private final ReactiveRedisOperations reactiveRedisOperations;

      public MessagingService(ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations) {
          this.reactiveRedisOperations = reactiveRedisOperations;
      }
    
    
      public Flux<Object> playGame(UserInput userInput){
          return reactiveRedisOperations.listenTo("TOPIC_NAME");
      }
    

提前谢谢你。

最佳答案

代替使用 ReactiveRedisOperationsMessageListener 是这里的方法。您可以注册一次监听器,并使用以下作为监听器。

data -> session.textMessage(gson.toJson(data))

注册应该只在连接开始时发生一次。您可以覆盖 SendingMessageHandlervoid afterConnectionEstablished(WebSocketSession session) 来完成此操作。这样,每个新的 Websocket 连接、每条消息都会创建一个新的订阅。

此外,不要忘记覆盖 afterConnectionClosed,并取消订阅 redis 主题,并清理其中的监听器。

Instructions on how to use MessageListener.

关于redis - Webflux,使用 Websocket 如何防止订阅两次 reactive redis 消息传递操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70051118/

相关文章:

redis - 我如何在 aws 弹性缓存 redis 中异步复制 redis 数据(主从配置)。

spring - 无法自动接线 `WebTestClient` - 没有自动配置

Websocket : Is it possible to add multiple Endpoints using SockJS?

java - 如何在 Spring 中使用 JWT 验证 SockJS CONNECT

ruby-on-rails - Sidekiq——Email.delay.sendMail

php - 按值 zset 查找 redis 键 - laravel

java - 当 flatMap 返回空 Mono 时如何调用 switchIfEmpty?

java - Spring Boot + Spring Web Socket + RabbitMQ Web STOMP

spring-boot - Spring 数据 JPA Redis : Cannot write custom method based query

spring - Spring Webflux Rest API(带注释的 Controller )的总处理时间