我正在 golang 中为 mqtt 模块编写一个微服务。该模块将同时被不同的功能使用。我使用 Grpc 作为传输层。 我做了一个连接函数,就是这个..
func Connect() { //it would be Connect(payload1 struct,topic string)
deviceID := flag.String("device", "handler-1", "GCP Device-Id")
bridge := struct {
host *string
port *string
}{
flag.String("mqtt_host", "", "MQTT Bridge Host"),
flag.String("mqtt_port", "", "MQTT Bridge Port"),
}
projectID := flag.String("project", "", "GCP Project ID")
registryID := flag.String("registry", "", "Cloud IoT Registry ID (short form)")
region := flag.String("region", "", "GCP Region")
certsCA := flag.String("ca_certs", "", "Download https://pki.google.com/roots.pem")
privateKey := flag.String("private_key", "", "Path to private key file")
server := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
topic := struct {
config string
telemetry string
}{
config: fmt.Sprintf("/devices/%v/config", *deviceID),
telemetry: fmt.Sprintf("/devices/%v/events/topic", *deviceID),
}
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := fmt.Sprintf("projects/%v/locations/%v/registries/%v/devices/%v",
*projectID,
*region,
*registryID,
*deviceID,
)
log.Println("[main] Loading Google's roots")
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(*certsCA)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
log.Println("[main] Creating TLS Config")
config := &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{},
MinVersion: tls.VersionTLS12,
}
flag.Parse()
connOpts := MQTT.NewClientOptions().
AddBroker(server).
SetClientID(clientid).
SetAutoReconnect(true).
SetPingTimeout(10 * time.Second).
SetKeepAlive(10 * time.Second).
SetDefaultPublishHandler(onMessageReceived).
SetConnectionLostHandler(connLostHandler).
SetReconnectingHandler(reconnHandler).
SetTLSConfig(config)
connOpts.SetUsername("unused")
///JWT Generation Starts from Here
token := jwt.New(jwt.SigningMethodES256)
token.Claims = jwt.StandardClaims{
Audience: *projectID,
IssuedAt: time.Now().Unix(),
ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
}
//Reading key file
log.Println("[main] Load Private Key")
keyBytes, err := ioutil.ReadFile(*privateKey)
if err != nil {
log.Fatal(err)
}
//Parsing key from file
log.Println("[main] Parse Private Key")
key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
if err != nil {
log.Fatal(err)
}
//Signing JWT with private key
log.Println("[main] Sign String")
tokenString, err := token.SignedString(key)
if err != nil {
log.Fatal(err)
}
//JWT Generation Ends here
connOpts.SetPassword(tokenString)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
}
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Printf("Not Connected..Retrying... %s\n", server)
} else {
fmt.Printf("Connected to %s\n", server)
}
}
我在 main.go 的 go 例程中调用这个函数
func main() {
fmt.Println("Server started at port 5005")
lis, err := net.Listen("tcp", "0.0.0.0:5005")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
//Creating keepAlive channel for mqttt subscribe
keepAlive := make(chan os.Signal)
defer close(keepAlive)
go func() {
//checking for internet connection
for !IsOnline() {
fmt.Println("No Internet Connection..Retrying")
//looking for internet connection after every 8 seconds
time.Sleep(8 * time.Second)
}
fmt.Println("Internet connected...connecting to mqtt broker")
repositories.Connect()
//looking for interupt(Ctrl+C)
value := <-keepAlive
//If Ctrl+C is pressed then exit the application
if value == os.Interrupt {
fmt.Printf("Exiting the application")
os.Exit(3)
}
}()
s := grpc.NewServer()
MqttRepository := repositories.MqttRepository()
// It creates a new gRPC server instance
rpc.NewMqttServer(s, MqttRepository)
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
func IsOnline() bool {
timeout := time.Duration(5000 * time.Millisecond)
client := http.Client{
Timeout: timeout,
}
//default url to check connection is http://google.com
_, err := client.Get("https://google.com")
if err != nil {
return false
}
return true
}
我在 main 中使用 go 例程,以便应用程序在每次启动时启动。
现在我想使用这个 MQTT Connect 功能来发布来自其他不同功能的数据。
例如函数 A 可以像 Connect(payload1,topic1)
一样调用它,函数 B 可以像 Connect(payload2,topic2)
一样调用它,然后这个函数应该处理将数据发布到云。
我是否应该在此 Connect 函数中添加主题和有效负载,然后从另一个函数调用它?或者是否有可能我可以将客户端返回或导出为全局,然后在另一个函数或 go 例程中使用它?如果我的问题听起来很愚蠢,我很抱歉..我不是 golang 专家..
最佳答案
Now I want to use this MQTT Connect function to publish the data from other different functions.
我怀疑我可能误解了您在这里尝试执行的操作,但除非您有建立多个连接的特定原因,否则最好连接一次,然后使用该单个连接发布多条消息。每次发送消息时建立连接都会遇到一些问题,包括:
- 建立连接需要时间并会产生一些网络流量(TLS 握手等)。
- 给定的 ClientID 只能有一个事件连接(如果您建立第二个连接,代理将关闭前一个连接)。
- 库不会自动断开连接 - 您需要调用
Disconnect
发布后。 - 传入消息可能会因连接中断而丢失(请注意
CleanSession
默认为 true)。
Should I just add the topic and payload in this Connect function and then call it from another function?
如上所述,首选方法是连接一次,然后通过一个连接发布多条消息。 Client
被设计为线程安全的,因此您可以传递它并调用 Publish
来自多个 go 例程。您还可以使用 AutoConnect
如果您希望库管理连接(还有一个 SetConnectRetry
函数),请使用选项(您就是),但请记住,如果您尝试发送 QOS 0 消息时链接已关闭,则不会重试该消息。
我建议您的 connect 函数返回客户端(即 func Connect() mqtt.Client
),然后使用该客户端发布消息(您可以将其存储在某处或只是将其传递;我建议将其添加到您的 grpc 服务器结构中)。
我猜如果您需要连接特定的clientid
,您可能需要建立多个连接为了发送到所需的主题(但通常您会给您的服务器连接访问广泛的主题)。这需要一些工作来确保您不会尝试同时使用同一客户端 ID 建立多个连接,并根据您的要求接收传入消息。
一些附加说明:
- 如果您使用
AutoConnect
和SetConnectRetry
您可以简化您的代码(只需使用IsConnectionOpen()
来检查连接是否已启动,从而无需使用IsOnline()
)。 - spec states “服务器必须允许长度在 1 到 23 UTF-8 编码字节之间的 ClientId” - 看起来你的比这个长(我没有使用过 GCP,它可能很好地支持/需要更长的客户端 ID)。里>
- 您不应该需要
InsecureSkipVerify
正在生产中。
关于go - Paho MQTT golang 用于多个模块?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68487000/