java - 如何在HiveMQ Client中接收多条消息? (MQTT)

标签 java eclipse mqtt hivemq

我试图弄清楚如何使用相同的 try catch block 在 HiveMQ 客户端中接收多条消息,甚至使用不同的客户端。我按照这个例子:

https://github.com/mqtt-bee/mqtt-bee-examples/blob/master/mqtt-3-blocking/src/main/java/org/mqttbee/examples/mqtt3blocking/Application.java

上面的示例适用于一个客户端和一个发布和订阅,但如果可能的话,我希望在 try catch 的同一 block 中执行多个这些操作。

package com.main;

import java.util.UUID;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;

import java.util.logging.Level;
import java.util.concurrent.TimeUnit;


public class Main {

    private static final Logger LOGGER = Logger.getLogger(Main.class.getName());  // Creates a logger instance 


    public static void main(String[] args) {

            Mqtt5BlockingClient client1 = Mqtt5Client.builder()
            .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
            .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
            .serverPort(1883)  // specifies the port of the server
            .buildBlocking();  // creates the client builder

            client1.connect();  // connects the client
            System.out.println("Client1 Connected");

            Mqtt5BlockingClient client2 = Mqtt5Client.builder()
                    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
                    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
                    .serverPort(1883)  // specifies the port of the server
                    .buildBlocking();  // creates the client builder

            client2.connect();  // connects the client
            System.out.println("Client2 Connected");            

            String testmessage = "How is it going!";
            byte[] messagebytesend = testmessage.getBytes();   // stores a message as a byte array to be used in the payload 


    try {  

        Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages
                                                                                    // .ALL - filters all incoming Publish messages 
            client1.subscribeWith()  // creates a subscription 
            .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
            .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
            .send(); 
            System.out.println("The client1 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something1/topic")   // publishes to the specified topic
            .qos(MqttQos.AT_LEAST_ONCE)  
            .payload(messagebytesend)  // the contents of the message 
            .send();
            System.out.println("The client1 has published");


         Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                         // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         System.out.println();
         String getdata = new String(tempdata); // converts the byte array to a String 
         System.out.println(getdata);

        client2.subscribeWith()  // creates a subscription 
           .topicFilter("test/something2/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
           .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
           .send(); 
           System.out.println("The client2 has subscribed");

         client2.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something2/topic")   // publishes to the specified topic
            .qos(MqttQos.AT_LEAST_ONCE)  
            .payload("The second message :P".getBytes())  // the contents of the message 
            .send();
            System.out.println("The client2 has published");  
          System.out.println();  


            // VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

         receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                          // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata2 = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         System.out.println();
         getdata = new String(tempdata2); // converts the byte array to a String 
         System.out.println(getdata);        

    }

    catch (InterruptedException e) {    // Catches interruptions in the thread 
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
        }

    catch (NoSuchElementException e){
        System.out.println("There are no received messages");   // Handles when a publish instance has no messages 
    }

    client1.disconnect();  
    System.out.println("Client1 Disconnected");

    client2.disconnect();  
    System.out.println("Client2 Disconnected");

    }
}

我得到的输出:

客户端1已连接

客户端2已连接

client1已订阅

client1已发布

进展如何!

client2已订阅

client2已发布

没有收到消息

客户端1已断开连接

客户端2已断开连接

我想要的输出:

客户端1已连接

客户端2已连接

client1已订阅

client1已发布

进展如何!

client2已订阅

client2已发布

第二条消息:P

客户端1已断开连接

客户端2已断开连接

最佳答案

我运行了你的代码并发现了这个警告日志:

2019-06-11 20:32:22,774 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.

您似乎忘记为第二个客户端设置发布过滤器。事实上,在等待第二条消息(针对 client2)的代码中,您检查了 client1 的消息流。 所以你只需要为client2添加一个发布过滤器:

Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);

然后等待 client2 的消息:

// VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

     receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get(); 

结果:

之前:

Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published

2019-06-11 20:46:36,537 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected

之后:

Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published


The second message :P
Client1 Disconnected
Client2 Disconnected

编辑:我希望这是您正在寻找的解决方案,因为所需的输出与我修复后得到的输出不同。因为 NoSuchElementException 不再抛出/捕获。因此,在第二条消息丢失后,“没有收到的消息”。

编辑响应评论:用于收集具有异步风格的 client2 的发布消息的片段(只需将 try block 中的代码替换为以下代码):

// The list where we put our received publish messages
            final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();

            // With the async flavour we can add a consumer for the incoming publish messages
            client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->
                    incomingMessagesClient2.add(mqtt5Publish));

            client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages

            client2.subscribeWith()  // creates a subscription
                    .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
                    .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once)
                    .send();
            System.out.println("The client2 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .qos(MqttQos.AT_LEAST_ONCE)
                    .payload(messagebytesend)  // the contents of the message
                    .send();
            System.out.println("The client1 has published");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .qos(MqttQos.AT_LEAST_ONCE)
                    .payload("The second message :P".getBytes())  // the contents of the message
                    .send();
            System.out.println("The client1 has published");
            System.out.println();


            TimeUnit.SECONDS.sleep(5);

            incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));

最诚挚的问候,

来自 HiveMQ 团队的 Michael

关于java - 如何在HiveMQ Client中接收多条消息? (MQTT),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56533751/

相关文章:

java - 支持代码完整的开源 Java IDE

c# - 玩字节...需要从 Java 转换为 C#

android - Eclipse 无法识别引号?

java - Paho MQTT 抛出异常

java - 使用 Eclipse 从 JAXB 生成示例 XML

java - 为什么我的第二个 GUI 按钮不起作用?

java - m2eclipse是否取消了多模块的支持?

Java 错误 - 类型 List<E> 不可见 - 类型 Map<K,V> 不可见

ssl - TLS - 通常在哪里为 MQTT 代理执行 tls 握手?

python - 使用 paho 传递 crt/pem