java - 如何在 Spring Boot 中通过 websocket 代理按照提交顺序发送消息?

标签 java spring-boot kotlin websocket

我想配置一个 spring boot websocket 消息代理,以便它按照提交的顺序分派(dispatch)消息。

基于answers to similar questions ,我尝试将调度任务执行器的池大小设置为 1,但仍然收到按错误顺序调度的消息。

出于调试目的,我添加了发送前和发送后 channel 拦截器,它们记录正在分派(dispatch)消息的线程,并且我可以看到线程 ID 有所不同。

我做错了什么?

代码(Kotlin):

Websocket 配置:

package foo.bar

import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.simp.config.ChannelRegistration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.messaging.support.ChannelInterceptor
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer

@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer{
    companion object {
        private val LOGGER = LoggerFactory.getLogger(WebSocketConfig::class.java)
    }
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        config.enableSimpleBroker("/topic")
        config.setApplicationDestinationPrefixes("/app");
        config.configureBrokerChannel().taskExecutor().corePoolSize(1)
        config.configureBrokerChannel().taskExecutor().maxPoolSize(1)
        val channelInterceptor: ChannelInterceptor = object: ChannelInterceptor {
            override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> {
                LOGGER.debug("Message broker sending message on Thread " + Thread.currentThread().id);
                return message
            }

            override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
                LOGGER.debug("Message broker sent message on Thread " + Thread.currentThread().id);
            }
        }
        config.configureBrokerChannel().interceptors(channelInterceptor)
    }

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
                .withSockJS()
    }

    override fun configureClientOutboundChannel(registration: ChannelRegistration) {
        registration.taskExecutor().corePoolSize(1)
        registration.taskExecutor().maxPoolSize(1)
    }

    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        registration.taskExecutor().corePoolSize(1)
        registration.taskExecutor().maxPoolSize(1)
    }
}

(已删除)发送消息的代码:

@Controller
class StateController

@Autowired constructor(
    private val template: SimpMessagingTemplate
) {

....

    fun publishMsg(topicId: String, msg: MyMessageType){
        template.convertAndSend("/topic/msg/"+topicId, msg)
    }
}

这是一些示例日志记录。正如您所看到的,执行器正在使用多个Thread,或者更确切地说,似乎有多个执行器。此外,线程 ID 来回跳跃,在我看来,这清楚地确认了分派(dispatch)执行并不像我预期的那样是单线程的。 日志记录:

09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.preSend             :  32  Message broker sending message on Thread 60
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.preSend             :  32  Message broker sending message on Thread 47
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.postSend            :  38  Message broker sent message on Thread 47
09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.postSend            :  38  Message broker sent message on Thread 60

最佳答案

经过进一步调试,我发现了我的错误。不同的线程来自提交代码。我应该将 channel 拦截器添加到出站客户端 channel ,而不是代理:

override fun configureClientOutboundChannel(registration: ChannelRegistration) {
    val channelInterceptor: ChannelInterceptor = object: ChannelInterceptor {
        override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> {
            LOGGER.debug("Message broker sending message on Thread " + Thread.currentThread().id);
            return message
        }

        override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
            LOGGER.debug("Message broker sent message on Thread " + Thread.currentThread().id);
        }
    }
    registration.interceptors(channelInterceptor)
}

关于java - 如何在 Spring Boot 中通过 websocket 代理按照提交顺序发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54090998/

相关文章:

java - N 叉树的高度

java - 尝试从二进制文件读取非 Java 原语

java - Spring boot启动构建JPA容器非常慢

java - Spring boot测试启动应用程序

android - 重用或重新创建观察者?

java - 从 Java 中调用具有类委托(delegate)的 Kotlin 对象作为静态方法

java - Eclipse 不会运行 :\. 元数据\.log 错误

java - 以编程方式检查类是否具有有效 JPA 定义的最简单方法

java - 如何获取 Feign 客户端的名称?

Android studio 注销 : clear saved preferences kotlin