go - Paho MQTT golang 用于多个模块?

标签 go mqtt goroutine paho

我正在 golang 中为 mqtt 模块编写一个微服务。该模块将同时被不同的功能使用。我使用 Grpc 作为传输层。 我做了一个连接函数,就是这个..

func Connect() { //it would be Connect(payload1 struct,topic string)

    deviceID := flag.String("device", "handler-1", "GCP Device-Id")
    bridge := struct {
        host *string
        port *string
    }{
        flag.String("mqtt_host", "", "MQTT Bridge Host"),
        flag.String("mqtt_port", "", "MQTT Bridge Port"),
    }
    projectID := flag.String("project", "", "GCP Project ID")
    registryID := flag.String("registry", "", "Cloud IoT Registry ID (short form)")
    region := flag.String("region", "", "GCP Region")
    certsCA := flag.String("ca_certs", "", "Download https://pki.google.com/roots.pem")
    privateKey := flag.String("private_key", "", "Path to private key file")

    server := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
    topic := struct {
        config    string
        telemetry string
    }{
        config:    fmt.Sprintf("/devices/%v/config", *deviceID),
        telemetry: fmt.Sprintf("/devices/%v/events/topic", *deviceID),
    }
    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := fmt.Sprintf("projects/%v/locations/%v/registries/%v/devices/%v",
        *projectID,
        *region,
        *registryID,
        *deviceID,
    )
    log.Println("[main] Loading Google's roots")
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(*certsCA)
    if err == nil {
        certpool.AppendCertsFromPEM(pemCerts)
    }

    log.Println("[main] Creating TLS Config")
    config := &tls.Config{
        RootCAs:            certpool,
        ClientAuth:         tls.NoClientCert,
        ClientCAs:          nil,
        InsecureSkipVerify: true,
        Certificates:       []tls.Certificate{},
        MinVersion:         tls.VersionTLS12,
    }

    flag.Parse()

    connOpts := MQTT.NewClientOptions().
        AddBroker(server).
        SetClientID(clientid).
        SetAutoReconnect(true).
        SetPingTimeout(10 * time.Second).
        SetKeepAlive(10 * time.Second).
        SetDefaultPublishHandler(onMessageReceived).
        SetConnectionLostHandler(connLostHandler).
        SetReconnectingHandler(reconnHandler).
        SetTLSConfig(config)
    connOpts.SetUsername("unused")
    ///JWT Generation Starts from Here
    token := jwt.New(jwt.SigningMethodES256)
    token.Claims = jwt.StandardClaims{
        Audience:  *projectID,
        IssuedAt:  time.Now().Unix(),
        ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
    }
    //Reading key file
    log.Println("[main] Load Private Key")
    keyBytes, err := ioutil.ReadFile(*privateKey)
    if err != nil {
        log.Fatal(err)
    }
    //Parsing key from file
    log.Println("[main] Parse Private Key")
    key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
    if err != nil {
        log.Fatal(err)
    }
    //Signing JWT with private key
    log.Println("[main] Sign String")
    tokenString, err := token.SignedString(key)
    if err != nil {
        log.Fatal(err)
    }
    //JWT Generation Ends here

    connOpts.SetPassword(tokenString)
    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() && token.Error() != nil {
            log.Fatal(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("Not Connected..Retrying...  %s\n", server)
    } else {
        fmt.Printf("Connected to %s\n", server)
    }

}

我在 main.go 的 go 例程中调用这个函数

func main() {
    fmt.Println("Server started at port 5005")
    lis, err := net.Listen("tcp", "0.0.0.0:5005")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    //Creating keepAlive channel for mqttt subscribe
    keepAlive := make(chan os.Signal)
    defer close(keepAlive)
    go func() {
        //checking for internet connection
        for !IsOnline() {
            fmt.Println("No Internet Connection..Retrying")
            //looking for internet connection after every 8 seconds
            time.Sleep(8 * time.Second)
        }
        fmt.Println("Internet connected...connecting to mqtt broker")
        repositories.Connect()
        //looking for interupt(Ctrl+C)
        value := <-keepAlive
        //If Ctrl+C is pressed then exit the application
        if value == os.Interrupt {
            fmt.Printf("Exiting the application")
            os.Exit(3)
        }
    }()
    s := grpc.NewServer()
    MqttRepository := repositories.MqttRepository()
    // It creates a new gRPC server instance
    rpc.NewMqttServer(s, MqttRepository)
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)

    }
}

func IsOnline() bool {
    timeout := time.Duration(5000 * time.Millisecond)
    client := http.Client{
        Timeout: timeout,
    }
    //default url to check connection is http://google.com
    _, err := client.Get("https://google.com")

    if err != nil {
        return false
    }

    return true
}

我在 main 中使用 go 例程,以便应用程序在每次启动时启动。

现在我想使用这个 MQTT Connect 功能来发布来自其他不同功能的数据。

例如函数 A 可以像 Connect(payload1,topic1) 一样调用它,函数 B 可以像 Connect(payload2,topic2) 一样调用它,然后这个函数应该处理将数据发布到云。

我是否应该在此 Connect 函数中添加主题和有效负载,然后从另一个函数调用它?或者是否有可能我可以将客户端返回或导出为全局,然后在另一个函数或 go 例程中使用它?如果我的问题听起来很愚蠢,我很抱歉..我不是 golang 专家..

最佳答案

Now I want to use this MQTT Connect function to publish the data from other different functions.

我怀疑我可能误解了您在这里尝试执行的操作,但除非您有建立多个连接的特定原因,否则最好连接一次,然后使用该单个连接发布多条消息。每次发送消息时建立连接都会遇到一些问题,包括:

  • 建立连接需要时间并会产生一些网络流量(TLS 握手等)。
  • 给定的 ClientID 只能有一个事件连接(如果您建立第二个连接,代理将关闭前一个连接)。
  • 库不会自动断开连接 - 您需要调用 Disconnect发布后。
  • 传入消息可能会因连接中断而丢失(请注意 CleanSession 默认为 true)。

Should I just add the topic and payload in this Connect function and then call it from another function?

如上所述,首选方法是连接一次,然后通过一个连接发布多条消息。 Client被设计为线程安全的,因此您可以传递它并调用 Publish来自多个 go 例程。您还可以使用 AutoConnect 如果您希望库管理连接(还有一个 SetConnectRetry 函数),请使用选项(您就是),但请记住,如果您尝试发送 QOS 0 消息时链接已关闭,则不会重试该消息。

我建议您的 connect 函数返回客户端(即 func Connect() mqtt.Client ),然后使用该客户端发布消息(您可以将其存储在某处或只是将其传递;我建议将其添加到您的 grpc 服务器结构中)。

我猜如果您需要连接特定的clientid,您可能需要建立多个连接为了发送到所需的主题(但通常您会给您的服务器连接访问广泛的主题)。这需要一些工作来确保您不会尝试同时使用同一客户端 ID 建立多个连接,并根据您的要求接收传入消息。

一些附加说明:

  • 如果您使用 AutoConnect SetConnectRetry 您可以简化您的代码(只需使用 IsConnectionOpen() 来检查连接是否已启动,从而无需使用 IsOnline() )。
  • spec states “服务器必须允许长度在 1 到 23 UTF-8 编码字节之间的 ClientId” - 看起来你的比这个长(我没有使用过 GCP,它可能很好地支持/需要更长的客户端 ID)。
  • 您不应该需要InsecureSkipVerify正在生产中。

关于go - Paho MQTT golang 用于多个模块?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68487000/

相关文章:

go - 为什么在我的代码中 goroutine 似乎自动卡住循环变量

python - 运行Go异步操作并写入map

sql - 带有表名 bindvar 的 Postgres sqlx 准备语句

jms - 如何从 MQTT 生产并在 ActiveMQ 中作为 MQTT 和 JMS 消费

api - golang 缺少 len 参数

node.js - 如何将聊天与 nodejs 和 xmpp 集成到我现有的 Web 应用程序中?

scala - Akka 基于 Actor 的自定义事件总线实现导致瓶颈

go - 所有 go routines 都睡着了 - 死锁

go - 使用值创建结构实例

http - 如何在 HTTP 中间件处理程序之间重用 *http.Request 的请求体?