java - Moquette 中的 Paho 客户端不会使用离线消息

标签 java mqtt paho moquette

我在通过 eclipse Paho 客户端在 Moquette 服务器中使用离线 MQTT 消息时遇到问题。

以下是我所遵循的步骤。

  1. 创建并启动了 Moquette MQTT 代理。
  2. 使用 eclipse Paho 客户端创建了一个简单的 MQTT 消费者应用程序。
  3. 设置消费者使用主题“devices/reported/#”的数据,QOS:1,CleanSession:False
  4. 创建了一个简单的 MQTT 数据发布器,以使用 Eclipse Paho 将数据发布到 MQTT 代理。
  5. 使用 MQTT 数据发布程序将消息发布到:“devices/reported/client_1”主题,QoS 为:1

上述步骤成功,没有任何问题。

然后我停止了我的消费者应用程序,并将 MQTT 数据发送到具有相同主题的代理。使用我的发布者应用程序 - 服务器能够接收这些消息,但此时没有任何消费者使用此消息,因为我已经停止了我的消费者。 然后我再次启动我的消费者应用程序。它已成功连接到代理,但是,它没有收到我在消费者关闭时发送给代理的任何消息。

我是否需要对 Moquette 服务器进行任何特定配置才能保留数据(使用干净 session : false)? 或者我错过了什么?

请在下面找到我的示例代码,

Moquette 服务器初始化

package com.gbids.mqtt.moquette.main;

import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ServerLauncher {

    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        final IConfig configs = new MemoryConfig(props);

        final Server mqttBroker = new Server();
        final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
        mqttBroker.startServer(configs, userHandlers);

        System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("stopping moquette mqtt broker..");
                mqttBroker.stopServer();
                System.out.println("moquette mqtt broker stopped");
            }
        });
    }
}

MQTT 消费者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ConsumerLauncher implements MqttCallback {

    private static final String topicPrefix = "devices/reported";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "consumer";

    public static void main(String[] args) throws MqttException {
        final String clientId = "consumer_1";
        MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        sampleClient.connect(connOpts);
        sampleClient.subscribe(topicPrefix + "/#", 1);
        sampleClient.setCallback(new ConsumerLauncher());
    }

    public void connectionLost(Throwable throwable) {
        System.out.println("Consumer connection lost : " + throwable.getMessage());
    }

    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
    }
}

MQTT 发布者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientLauncher {

    private static final String content = "{\"randomData\": 25}";
    private static final String willContent = "Client disconnected unexpectedly";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "client";

    public static void main(String[] args) throws Exception{
        sendDataWithQOSOne();
        System.exit(0);
    }

    private static void sendDataWithQOSOne(){
        try {
            final String clientId = "client_1";
            MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false); // for publisher - this is not needed I think
            sampleClient.connect(connOpts);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(1);
            final String topic = "devices/reported/" + clientId;
            sampleClient.publish(topic, message);
            System.out.println("Message published from : " + clientId + " with payload of : " + content);
            sampleClient.disconnect();
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }
}

最佳答案

在您的情况下,您需要在 ClientLauncher 中创建 MqttMessage 时将 retained 标志设置为 true (出版商)。默认值为 false,如 documentation 中所示。 .

...    
message.setRetained(true)
...

设置此标志可以使消息保留在代理上并发送到新连接的客户端。请注意,代理仅保留主题的最后一条消息。无法为特定主题保留多条消息。

关于java - Moquette 中的 Paho 客户端不会使用离线消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49151167/

相关文章:

java - Uncaught Error : No polyfill registered for object Facebook JDK

javascript - 使用 mqtt.js 和 Mosca 的离线消息问题

java - MQTT Java将所有订阅消息放在同一个队列中

docker - docker 容器内的 SSH 隧道

docker - 本地MQTT mosquitto实例获得连接ECONNREFUSED 127.0.0.1:1883

android - 如何将Paho-MQTT添加到android studio

java - 泛美卫生组织:并行连接创建导致连接下拉

java - Libgdx如何存储碰撞检测数据

java - Spring Batch Item Processor 不执行

Java - 如何更改文件夹的图标