当某些数据发布到 MQTT 主题 devices/device_id/messages/events/
时,我尝试从 IoT 中心接收“通知”。为了接收数据,我使用以下主题:devices/device_id/messages/devicebound/
。我的订阅代码如下(Python):
import paho.mqtt.client as mqtt
import ssl, random
from time import sleep
root_ca = "./certs/digicertbaltimoreroot.pem"
public_crt = './certs/rsa_cert.pem'
private_key = './certs/rsa_private.pem'
iothub_name = "myhub"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)
def error_str(rc):
return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))
def on_disconnect(unused_client, unused_userdata, rc):
print("on_disconnect", error_str(rc))
def on_connect(client, userdata, flags, response_code):
print("Connected with status: {0}".format(response_code))
client.subscribe(topic, 1)
def on_subscribe(client, userdata, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_message(client, userdata, msg):
print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))
if __name__ == "__main__":
client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
print("Connecting to Azure IoT Hub...")
client.connect(mqtt_url, mqtt_port, keepalive=60)
client.loop_forever()
当我运行时,我得到以下输出:
Connecting to Azure IoT Hub...
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')
有人可以建议我缺少什么吗?
最佳答案
基于文档"Using the MQTT protocol directly" ,你需要
to download and reference the DigiCert Baltimore Root Certificate. This certificate is the one that Azure uses to secure the connection.
正如您在发布过程中所做的那样:
client.tls_set(ca_certs=path_to_root_cert,
certfile=None, keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1,
ciphers=None)
此外,您还需要设置用户名和密码:
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" + device_id, password=sas_token)
最后,您可以像这样订阅:
client.subscribe(topic=topic, qos=0)
安全 key 验证设备的示例代码:
import paho.mqtt.client as mqtt
import ssl, random
from time import sleep
path_to_root_cert = "./certs/digicertbaltimoreroot.cer"
iothub_name = "your hub name"
device_id = "device1"
sas_token = "SharedAccessSignature sr=[your hub name].azure-devices.net%2Fdevices%2Fdevice1&sig=[sig]&se=1526955728"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)
def error_str(rc):
return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))
def on_disconnect(unused_client, unused_userdata, rc):
print("on_disconnect", error_str(rc))
def on_connect(client, userdata, flags, response_code):
print("Connected with status: {0}".format(response_code))
client.subscribe(topic, 1)
def on_subscribe(client, userdata, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_message(client, userdata, msg):
print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))
if __name__ == "__main__":
client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)
client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password=sas_token)
client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
print("Connecting to Azure IoT Hub...")
client.connect(mqtt_url, mqtt_port, keepalive=60)
client.subscribe(topic=topic, qos=0)
client.loop_forever()
x509 验证设备的示例代码: (需要提供设备证书和私钥文件。不需要SAS token作为密码;留空即可。但仍需要设置用户名。)
import paho.mqtt.client as mqtt
import ssl, random
from time import sleep
root_ca = "./certs/digicertbaltimoreroot.cer"
public_crt = './certs/mydevice-public.pem'
private_key = './certs/mydevice-private.pem'
iothub_name = "your hub name"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)
def error_str(rc):
return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))
def on_disconnect(unused_client, unused_userdata, rc):
print("on_disconnect", error_str(rc))
def on_connect(client, userdata, flags, response_code):
print("Connected with status: {0}".format(response_code))
client.subscribe(topic, 1)
def on_subscribe(client, userdata, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_message(client, userdata, msg):
print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))
if __name__ == "__main__":
client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password="")
client.tls_set(root_ca,
certfile = public_crt,
keyfile = private_key,
cert_reqs = ssl.CERT_REQUIRED,
tls_version = ssl.PROTOCOL_TLSv1_2,
ciphers = None)
client.tls_insecure_set(False)
print("Connecting to Azure IoT Hub...")
client.connect(mqtt_url, mqtt_port, keepalive=60)
client.subscribe(topic=topic, qos=0)
client.loop_forever()
注意:主题为“devices/{device-id}/messages/devicebound/#”。
如何创 build 备证书文件可以引用here .
更新:这些示例代码用于接收 D2C messages您可以像这样发送以进行简单测试:
来自device explorer :
来自 azure 门户:
更新:接收 D2C 消息:
正如我在评论中指出的,Azure IoT Hub 不是通用的 MQTT 代理,因此您无法直接订阅从设备发送的消息。 要解决此问题,您可以使用 IoT 中心公开的与事件中心兼容的终结点,例如“Read the telemetry from your hub” 但不幸的是,Python 不支持这种方式。您可以使用 iothub-explorer CLI 实用程序或创建 Node.js 或 .NET 或 Java 基于事件中心的控制台应用程序来读取设备-来自 IoT 中心的到云消息。
关于Azure IoT 中心 - "Subscribe"代码不起作用 : "connection was refused",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50417822/