Spring WebSockets ActiveMQ convertAndSendToUser

标签 spring spring-boot websocket activemq stomp

我有一个 Spring Boot 应用程序 (Jhipster),它使用 STOMP over WebSockets 将信息从服务器传递给用户。

我最近添加了一个 ActiveMQ 服务器来处理水平扩展应用程序,使用 Amazon 自动扩展组/负载均衡器。

我使用 convertAndSendToUser() 方法,该方法在应用程序的单个实例上运行以定​​位经过身份验证的用户的“个人队列”,因此只有他们才能收到消息。

但是,当我在负载均衡器后面启动应用程序时,我发现消息被发送给用户如果事件是在服务器上生成的,他们的 websocket-proxy 连接(到经纪人)成立于?

我如何确保消息通过 ActiveMQ 到达用户实际“也已连接”的应用程序的任何实例,而不管哪个实例收到,比如执行 convertAndSendToUser() 的 HTTP 请求事件?

此处供引用的是我的 StompBrokerRelayMessageHandler:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
    handler.setTcpClient(new Reactor2TcpClient<>(
        new StompTcpFactory(orgProperties.getAws().getAmazonMq().getStompRelayHost(),
            orgProperties.getAws().getAmazonMq().getStompRelayPort(), orgProperties.getAws().getAmazonMq
            ().getSsl())
    ));

    return handler;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/queue", "/topic")
        .setSystemLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setSystemPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass())
        .setClientLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setClientPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass());

    config.setApplicationDestinationPrefixes("/app");
}

我通过检查 SessionSubscribeEvent 中的 header 找到了与在 ActiveMQ 上生成的队列对应的名称,该名称是在用户订阅用户队列时在监听器中生成的,如 simpSessionId

@Override
@EventListener({SessionSubscribeEvent.class})
public void onSessionSubscribeEvent(SessionSubscribeEvent event) {
    log.debug("Session Subscribe Event:" +
        "{}", event.getMessage().getHeaders().toString());
}

ActiveMQ中可以找到对应的队列,格式为:{simpDestination}-user{simpSessionId}

我可以将 sessionId 保存在键值对中,然后将消息推送到该主题 channel 吗?


我还找到了一些 possibilities在 CONNECT/SUBSCRIBE 框架中设置 ActiveMQ 特定的 STOMP 属性以创建持久订阅者 如果我设置这些属性,Spring 会比理解路由吗?

client-id & subcriptionName

最佳答案

修改 MessageBrokerReigstry 配置解决了这个问题:

config.enableStompBrokerRelay("/queue", "/topic")
            .setUserDestinationBroadcast("/topic/registry.broadcast")

基于 documentation section 4.4.13 中的这一段:

In a multi-application server scenario a user destination may remain unresolved because the user is connected to a different server. In such cases you can configure a destination to broadcast unresolved messages to so that other servers have a chance to try. This can be done through the userDestinationBroadcast property of the MessageBrokerRegistry in Java config and the user-destination-broadcast attribute of the message-broker element in XML

我没有看到任何关于“为什么”/topic/registry.broadcast 是正确的“主题”目的地的文档,但我发现了它的各种迭代:

  1. websocket sessions sample doesn't cluster.. spring-session-1.2.2
  2. What is MultiServerUserRegistry in spring websocket?
  3. Spring websocket - sendToUser from a cluster does not work from backup server

关于Spring WebSockets ActiveMQ convertAndSendToUser,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49964647/

相关文章:

javascript - 如何使用循环连接 socket.io 客户端

java - 如何创建多部分响应

java.lang.ClassCastException : net. sf.ehcache.hibernate.SingletonEhCacheRegionFactory 无法转换为 org.hibernate.cache.RegionFactory

java - 将现有的 Spring 应用程序转换为 Spring-Boot

java - SpringBoot : Can I @Autowire Bean in runnable JAR from JAR provided using java -cp?

c# - 如何通过 websocket 将音频发送到 Nexmo Voice

java - Spring Configuration 类放置最佳实践

java - 为 Spring Web 服务分离 REST API 和实现

java - @Value boolean 返回值 true 的 'invalid boolean value'

javascript - 强制 javascript 代码同步