Python MQTT 回调未调用

标签 python mqtt python-multithreading

我正在开发一个 mqtt 订阅者,它将消息转发到 beanstalk-tube 或 api-endpoint。我想出了以下代码:

#!/usr/bin/python
import pymysql.cursors
import sys
import time
import paho.mqtt.client as mqtt
from threading import Thread


class Process(Thread):
    def __init__(self, sid=None, host=None, username=None, password=None, topic=None, topic_qos=None, destination_type=None, destination=None):
        Thread.__init__(self)
        self.sid = sid
        self.host = host
        self.username = username
        self.password = password
        self.topic = topic
        self.topic_qos = topic_qos
        self.destination_type = destination_type
        self.destination = destination
        self.client = None

    def on_connect(self, client, obj, flags, rc):
        print("connected")
        self.client.subscribe(self.topic, qos=self.topic_qos)
        self.client.loop_forever()

    def on_message(self, client, obj, msg):
        print(str(msg.payload))

    def run(self):
        self.client = mqtt.Client(str(self.sid) + "_subscriber")
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.username_pw_set(self.username, self.password)
        self.client.connect(self.host, 1883, 60)


def main(argv):
    db_connection = pymysql.connect(host=argv[0],
                                    user=argv[1],
                                    password=argv[2],
                                    db=argv[3],
                                    charset='utf8mb4',
                                    cursorclass=pymysql.cursors.DictCursor)
    processes = []
    try:
        with db_connection.cursor() as cursor:
            cursor.execute("SELECT `id`,`topic`,`topic_qos`,`target_type`,`target` FROM mqtt_subscriptions;")
            result = cursor.fetchall()
            for subscription in result:
                process = Process(subscription['id'], argv[4], argv[5], argv[6], subscription['topic'],
                                         subscription['topic_qos'], subscription['target_type'],
                                         subscription['target'])
                process.start()
                processes.append(process)
    finally:
        db_connection.close()
        while True:
            #print("check for new imports")
            time.sleep(4)


if __name__ == "__main__":
    main(sys.argv[1:])

问题是 Process 类中的方法没有被触发。这可能是因为我使用的是面向对象编程,而所有示例都不是。但这一定是可能的吧?

很想听听您的想法或建议。

吉诺

最佳答案

将对 self.client.loop_forever() 的调用移出 on_connect() 回调,并将其放在对 self.client 的调用之后。连接(self.host,1883,60)

回调应该快速返回,你所做的就是让回调永远不会返回。

关于Python MQTT 回调未调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37304204/

相关文章:

python - wxPython - 如何 "highlight"TextCtrl?

python - Pandas:将组转换为 json 列表,而不使用 groupby 或 apply

python - Tkinter 中的文本输入

swift - 如何使用 swift 通过 wss 连接到 MQTT?

java - 使用 Java 中的 MQTT 将命令发布到 IBM IoT 中的设备

python - 从正在运行的线程调用其他线程上的协程

python - “Choco install python”在 AppVeyor 上失败,出现 1603

c++ - 当函数仅支持字符串类型时以二进制十六进制发送数据

python - threading.timer 只打印 for 循环的最后一个值

python - 为什么 multiprocessing.Queue 没有 task_done 方法