java - 如何在 spring mqtt 集成中停止重复订阅收到的保留消息

标签 java spring spring-integration mqtt mosquitto

订阅保留主题时获取重复的保留消息。

我在我的物联网项目中使用了 spring mqtt 集成。 在这里,一旦收到保留消息,它就会继续订阅,直到我将保留标志设置为 true 的同一主题发布空白消息。 我注意到,当我使用 mqtt 命令在终端中执行相同的过程以订阅保留的主题时,它仅订阅一次,不会发生重复订阅。

我使用下面的代码通过 # 订阅所有主题

@Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public DefaultMqttPahoClientFactory clientfactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        options.setCleanSession(false);
        //options.setCleanSession(true);
        //options.setServerURIs(new String[] { "tcp://localhost" });
        options.setServerURIs(new String[] { "url" });
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("admin",
                clientfactory(), "#");
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        /*adapter.setc*/
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")

    public MessageHandler handler() {
        return new MessageHandler() {

            public void handleMessage(Message<?> message) throws MessagingException {

            mqttSubscriptionProcessor.processSubscription(message);


            }

        };
    }

我使用此命令发布了保留的消息

mosquitto_pub -u admin -P pwd -t hello/topic  -m "test msg" -r -d

Eclipse控制台中的结果是

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg 

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

这里我只需要订阅保留的主题一次,不需要更改 Spring 集成代码中的任何更改。

最佳答案

这就是保留消息的工作原理,当客户端订阅匹配的主题时,在任何新消息之前发布的保留位设置为主题的最后一条消息将始终首先传递给客户端。

如果您不希望保留消息(并因此始终传递),则在发布时不要设置保留位。

否则,正如您所发现的,您可以通过发布具有空负载且保留位设置为同一主题的消息来清除主题的保留消息。

或者您可以在客户端中过滤消息,因为您可以随时检查消息传递时是否设置了保留标志。

对于 spring 端,您似乎正在创建 4 个客户端,因此每个客户端都会在订阅时接收消息。您可以通过查看代理日志来证明这一点,如果您在详细模式下运行 mosquitto,它将显示它传递的每条消息。

关于java - 如何在 spring mqtt 集成中停止重复订阅收到的保留消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55510473/

相关文章:

spring-boot - 无法在 Spring-Boot 应用程序中使用 Spring-integration-kafka 禁用 Kafka 消息的手动提交

java - 将 Spring Boot.war 部署到 tomcat 时无法启动组件 [StandardEngine[Catalina]]

spring - javax.el.PropertyNotFoundException : The class 'java.lang.String' does not have the property

java - 加载其他内容时加载对话框不会绘制

java - Java 编译器是否消除了 getters 的函数调用?

java - 在 weblogic 10.3.6 上访问 spring boot application.properties

spring - Spring 集成扩展是否是最新且可用的?

java - 如何将此 spring-integration 配置从 XML 转换为 Java?

java - 如何拆分表达式语言中的字符串并获取最后一个元素?

java - 在 Java 中根据 blob 的内容创建文件的代码片段