Azure IoT 中心 - "Subscribe"代码不起作用 : "connection was refused"

标签 azure iot publish-subscribe azure-iot-hub subscribe

当某些数据发布到 MQTT 主题 devices/device_id/messages/events/ 时,我尝试从 IoT 中心接收“通知”。为了接收数据,我使用以下主题:devices/device_id/messages/devicebound/。我的订阅代码如下(Python):

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

root_ca = "./certs/digicertbaltimoreroot.pem"
public_crt = './certs/rsa_cert.pem'
private_key = './certs/rsa_private.pem'

iothub_name = "myhub"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

当我运行时,我得到以下输出:

Connecting to Azure IoT Hub...
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')
Connected with status: 3
('on_disconnect', 'Some error occurred. 5: The connection was refused.')

有人可以建议我缺少什么吗?

最佳答案

基于文档"Using the MQTT protocol directly" ,你需要

to download and reference the DigiCert Baltimore Root Certificate. This certificate is the one that Azure uses to secure the connection.

正如您在发布过程中所做的那样:

client.tls_set(ca_certs=path_to_root_cert,     
               certfile=None, keyfile=None, 
               cert_reqs=ssl.CERT_REQUIRED,       
               tls_version=ssl.PROTOCOL_TLSv1, 
               ciphers=None)

此外,您还需要设置用户名和密码:

client.username_pw_set(username=iot_hub_name+".azure-devices.net/" + device_id, password=sas_token)

最后,您可以像这样订阅:

client.subscribe(topic=topic, qos=0)

安全 key 验证设备的示例代码:

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

path_to_root_cert = "./certs/digicertbaltimoreroot.cer"

iothub_name = "your hub name"
device_id = "device1"
sas_token = "SharedAccessSignature sr=[your hub name].azure-devices.net%2Fdevices%2Fdevice1&sig=[sig]&se=1526955728"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password=sas_token)

    client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect


    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.subscribe(topic=topic, qos=0)
    client.loop_forever()

x509 验证设备的示例代码: (需要提供设备证书和私钥文件。不需要SAS token作为密码;留空即可。但仍需要设置用户名。)

import paho.mqtt.client as mqtt
import ssl, random
from time import sleep

root_ca = "./certs/digicertbaltimoreroot.cer"
public_crt = './certs/mydevice-public.pem'
private_key = './certs/mydevice-private.pem'

iothub_name = "your hub name"
device_id = "mydevice"
mqtt_url = "{}.azure-devices.net".format(iothub_name)
mqtt_port = 8883
topic = "devices/{}/messages/devicebound/#".format(device_id)

def error_str(rc):
    return "Some error occurred. {}: {}".format(rc, mqtt.error_string(rc))

def on_disconnect(unused_client, unused_userdata, rc):
    print("on_disconnect", error_str(rc))

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_subscribe(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_message(client, userdata, msg):
    print("Topic: {0} -- Payload: {1}".format(msg.topic, str(msg.payload)))

if __name__ == "__main__":    
    client = mqtt.Client(device_id, protocol=mqtt.MQTTv311)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    client.username_pw_set(username=iothub_name+".azure-devices.net/" + device_id, password="")

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)
    client.tls_insecure_set(False)


    print("Connecting to Azure IoT Hub...")
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.subscribe(topic=topic, qos=0)
    client.loop_forever()

注意:主题为“devices/{device-id}/messages/devicebound/#”。

如何创 build 备证书文件可以引用here .

更新:这些示例代码用于接收 D2C messages您可以像这样发送以进行简单测试:

来自device explorer :

enter image description here

来自 azure 门户:

enter image description here

更新:接收 D2C 消息:

正如我在评论中指出的,Azure IoT Hub 不是通用的 MQTT 代理,因此您无法直接订阅从设备发送的消息。 要解决此问题,您可以使用 IoT 中心公开的与事件中心兼容的终结点,例如“Read the telemetry from your hub” 但不幸的是,Python 不支持这种方式。您可以使用 iothub-explorer CLI 实用程序或创建 Node.js.NETJava 基于事件中心的控制台应用程序来读取设备-来自 IoT 中心的到云消息。

关于Azure IoT 中心 - "Subscribe"代码不起作用 : "connection was refused",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50417822/

相关文章:

c# - 并行/顺序调用 httpclient 的最佳实践是什么,具有非常好的性能并且不会丢失数据

Azure DevOps Build Pipeline 根据拉取请求触发

raspberry-pi - 如何将运行 Android Things 的 Raspberry Pi 3 连接到 wifi 网络?

c# - (C#) 如何检查物联网设备是否能够发送到IoTHub

redis - 使用redis pub/sub连接Tornado进程

multithreading - Spring Integration 的异步发布订阅是如何工作的?

python - 获取 Google Cloud PubSub 中单条消息的大小

azure - ASP.NET Core 应用程序在发布到 Azure 后无法运行

sql-server - 在 Azure 自动化 powershell 脚本中从 SQL 打印消息不起作用

c - 在cooja模拟器中的Contiki中生成网关场景