java - MQTT回调客户端重连逻辑

标签 java mqtt

我无法找到重新连接 mqtt 回调客户端的逻辑。有方法 onDisconnected() 但我无法在互联网上找到文档或任何示例示例。

我的听众

公共(public)类MyListener实现Listener {

    public MyListener()
    {

    }

    @Override
    public void onConnected()
    {
        System.out.println("Connected ....");
    }

    @Override
    public void onDisconnected()
    {
        System.out.println("Disconnected");
    }

    @Override
    public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack)
    {
        System.out.println("Entered Onpublish");

        try
        {
         System.out.println("received msg:" + msg);
        }
        catch (HikeException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally{
            ack.run();
        }

    }


    @Override
    public void onFailure(Throwable value)
    {
        value.printStackTrace();
    }

}

创建连接

private void createConnection(String host, int port,String id, String token) throws Exception
{

    this.disconnect();
    MQTT mqtt = new MQTT();
    mqtt.setHost(host, port);
    mqtt.setUserName(id);
    mqtt.setPassword(token);
    CallbackConnection callbackConnection = null;
    callbackConnection = mqtt.callbackConnection();
    callbackConnection.listener(new MyListener());
    callbackConnection.connect(new MyCallback<Void>("CONNECT"));
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/u", QoS.AT_MOST_ONCE) }, new MyCallback<byte[]>("EVENT SUBSCRIBE"));
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/s", QoS.AT_LEAST_ONCE), new Topic(uid + "/a", QoS.AT_LEAST_ONCE) }, new MyCallback<byte[]>("MSG SUBSCRIBE"));

    this.callbackConnection = callbackConnection;
}

我的回调

class MyCallback<T> implements Callback<T>
{
    public MyCallback(String tag)
    {
        super();
        this.tag = tag;
    }

    String tag;

    @Override
    public void onSuccess(T value)
    {
        System.out.println("TAG:" + tag + " =SUCCESS value=" + value);
    }

    @Override
    public void onFailure(Throwable value)
    {
        System.out.println("TAG:" + tag + "Fail");
        value.printStackTrace();
    }

}

我的问题是如何实现 mqtt 重新连接到服务器逻辑?如果我应该使用 onDisconnect() 方法,那么我该如何使用它?

最佳答案

这是我在连接丢失时实现 Mqtt 重新连接的方法,启动一个线程来尝试连接到 MqttServer,该线程将在成功连接时被销毁。

  boolean retrying = false;
   public void reConnect(){
        if (!retrying) {
            retrying = true;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (;;) {
                        try {
                            if (isInetAvailable() && !mqttClient.isConnected()) {
                                if(isPasswdProtected) {
                                     //connect with MqttConnectionOptions
                                    Connect_with_passwd();
                                } else {
                                    Connect();
                                }
                                Thread.sleep(MQTT_RETRY_INTERVAL);
                            } else if (isConnected()) {
                                List<String> topics = topicsSubscribed;
                                topicsSubscribed.clear();
                                for (String topic : topics) {
                                    try {
                                        subscribeToTopic(topic);
                                    } catch (MqttException e) {
                                    }
                                }
                                retrying = false;
                                break;
                            } else if (!Internet.isAvailable()) {
                                Thread.sleep(INET_RETRY_INTERVEL);
                            }
                        } catch (MqttException | InterruptedException e) {
                            try {
                                Thread.sleep(MQTT_RETRY_INTERVAL);
                            } catch (InterruptedException ex) {
                            }
                        }
                    }
                }
            }).start();
        }
}
 /*Check internet connection*/

 public static boolean isInetAvailable() {
    boolean connectivity;
    try {
        URL url = new URL(GOOGLE);
        URLConnection conn = url.openConnection();
        conn.connect();
        connectivity = true;
    } catch (IOException e) {
        connectivity = false;
    }
    return connectivity;
}

关于java - MQTT回调客户端重连逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19152301/

相关文章:

java - 通过静态方法访问单例的字段

java - Mac 笔记本电脑锁定时,终端上运行的 java 程序是否继续运行?

go - 如何使用 paho.mqtt.golang 库订阅多个 MQTT 主题?

python - 用于测试的 MQTT 代理

java - For 循环和 while 循环跳过 Scanner

java - JPA 延迟加载

java - Jersey 休息 api 安全

ssl - 如何在 esp32 sdk 示例 ssl_mutual_auth 中使用 https ://test. mosquitto.org/ssl/index.php 生成客户端证书?

docker - 使用 Docker 设置多个 MQTT 代理

authentication - Mosquitto websockets 安全最佳实践