java - 如何保存MQTT订阅所有主题的数据

标签 java mqtt

我正在使用java处理从Arduino获取传感器数据,并使用MQTT协议(protocol)将它们保存到MySql数据库中。我成功将数据发布到我的主题

  • 家/温度
  • 家庭/湿度

并订阅所有主题

  • 首页/#

我收到了所有消息。现在我想将这些值保存到数据库中。

我的问题是:

如何获取适当的数据并根据其主题(家庭/温度和家庭/湿度)保存? 谢谢你

这是我的代码:

 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.*;  
 import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  import de.bezier.data.sql.*;
 import processing.serial.*;
  import java.util.regex.*;
  Serial myPort;
  String value;
    String broker;
   String topic;
  String topic1;
  String topic2;
  String payloadtemp;
   String payloadhum;
   MqttClient myClient;
    MySQL DB; 
    String tempsensor_ID;
     String humsensor_ID;
     MqttMessage messagetemp ; 
     MqttMessage messagehum ; 

     void setup()  
        {  
     String portlisten= Serial.list()[0];
     myPort = new Serial( this, portlisten, 115200 );
    broker   = "tcp://192.192.191.198:1883";//;
     topic = "home/#";
    topic1="home/temperatureValue" ;  
     topic2= "home/HumidityValue";
     tempsensor_ID="temp_ID";
      humsensor_ID="hum_ID";
      String user = "root";
      String pass = "";
      String database = "iot";
      DB= new MySQL(this, "localhost", database , user , pass);


          } 
       void draw(){

       if (myPort.available() > 0 ) {
        value = myPort.readString();
        String tempdata = trim(value);
         String humdata=trim(value);
            String patterntemp= "Temperature:";
           String patterntempunit="C";
            String patternhum="Humidity:";
               String patternhumunit="%";
            Pattern ptemp= Pattern.compile(Pattern.quote(patterntemp) + "             (.*?)" + Pattern.quote(patterntempunit));
         Matcher mtemp= ptemp.matcher(tempdata);
             while(mtemp.find()){
            tempdata=mtemp.group(1);
              payloadtemp=tempdata;

          }

         Pattern phum= Pattern.compile(Pattern.quote(patternhum) + "(.*?)" +               Pattern.quote(patternhumunit));
          Matcher mhum= phum.matcher(humdata);
           while(mhum.find()){
           humdata=mhum.group(1);
               payloadhum=humdata;

                }

              try {  
          myClient = new MqttClient(broker, MqttClient.generateClientId());        
     myClient.connect();        
            messagetemp = new MqttMessage((" " + payloadtemp).getBytes());  
            messagehum = new MqttMessage((" " + payloadhum).getBytes());  



             if(payloadtemp!=(null) & payloadhum!=(null)){

                 myClient.publish(topic1, messagetemp);  
                  myClient.subscribe(topic1); 

                       myClient.publish(topic2, messagehum);  
                     myClient.subscribe(topic2);          


          //myClient.subscribe(topic); //wildcard (topic) used when  subscribing to topics but not allowed when publishing a msg
                       }

                if ( DB.connect() )
               {
               myClient.setCallback(new MqttCallback() {
                  @Override
                public void messageArrived(String arg0, MqttMessage arg1) {

                     String msgrec= new String(arg1.getPayload());

                        println("RESULT " + arg0 + " - " + msgrec + " ");


                    throw new RuntimeException();

                            }

                        @Override
                     public void deliveryComplete(IMqttDeliveryToken arg0) {


                           println("DELIVERY " + arg0);

                           throw new RuntimeException();
                               }

                          @Override
                      public void connectionLost(Throwable arg0) {
                           throw new RuntimeException();
                         }
                          });


                           if(payloadtemp!=(null) & payloadhum!=(null)){


                     /*DB.execute("INSERT INTO `temperature`(`idsensor`,`temperaturevalue`) VALUES ('"+tempsensor_ID+ "','" +      messageReceived(topic1,messagetemp.getPayload()) + "');");
                    DB.execute("INSERT INTO `light`(`idsensor`,`lightvalue`) VALUES ('"+lightsensor_ID+ "','" +   messageReceived(topic2,messagehum.getPayload())+ "');");
                    */
                    }
                    }
                  else
                   {
                println("Error in the connection :-( ");
                      }  
                                 }
                        catch(MqttException mex) {  

                      System.out.println("Error: "+mex.getMessage());  
                            mex.printStackTrace();  
                     } 
                           }

                              }

最佳答案

我假设您正在实现 PAHO 客户端......

然后您需要使用 messageArrived 回调中的参数来检查您收到的通知是哪个主题...

client.setCallback(new MqttCallback() {

      @Override
      public void messageArrived(String topicInforming, MqttMessage mqttMessage) throws Exception {
            subscriptionMessageCallback.onNewMessage(mqttMessage.toString());
      }
   ....

在此代码段中,topicInforming 告诉您是否收到有关温度或湿度的消息,

对于数据库,您将需要一个连接器来处理插入事务...

您需要定义每条消息是否应位于同一个表中,这取决于您并取决于架构的外观

关于java - 如何保存MQTT订阅所有主题的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42501746/

相关文章:

java - 与 Java 中的@Override 相反的标记

java - 应用程序因 java.lang.stackOverflow 异常而崩溃

ssl - TLS 连接到 cloud mqtt for owntracks

python - paho-mqtt 订阅一次性消息

python - 哪个协程是顺序的?

java - 使用网络爬虫抓取网络数据

java - MimeMessage.getAllHeaders() 返回两个不同项目中的两个不同类

java - Spring MVC Model.containsAttribute 调用的方法是无访问器方法

database - 从 rabbitMQ 发送消息到 PostgreSQL

java - Spring MVC + Mosquitto + MQTT 集成无法获取任何消息