我正在开发一个 mqtt 订阅者,它将消息转发到 beanstalk-tube 或 api-endpoint。我想出了以下代码:
#!/usr/bin/python
import pymysql.cursors
import sys
import time
import paho.mqtt.client as mqtt
from threading import Thread
class Process(Thread):
def __init__(self, sid=None, host=None, username=None, password=None, topic=None, topic_qos=None, destination_type=None, destination=None):
Thread.__init__(self)
self.sid = sid
self.host = host
self.username = username
self.password = password
self.topic = topic
self.topic_qos = topic_qos
self.destination_type = destination_type
self.destination = destination
self.client = None
def on_connect(self, client, obj, flags, rc):
print("connected")
self.client.subscribe(self.topic, qos=self.topic_qos)
self.client.loop_forever()
def on_message(self, client, obj, msg):
print(str(msg.payload))
def run(self):
self.client = mqtt.Client(str(self.sid) + "_subscriber")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.username_pw_set(self.username, self.password)
self.client.connect(self.host, 1883, 60)
def main(argv):
db_connection = pymysql.connect(host=argv[0],
user=argv[1],
password=argv[2],
db=argv[3],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
processes = []
try:
with db_connection.cursor() as cursor:
cursor.execute("SELECT `id`,`topic`,`topic_qos`,`target_type`,`target` FROM mqtt_subscriptions;")
result = cursor.fetchall()
for subscription in result:
process = Process(subscription['id'], argv[4], argv[5], argv[6], subscription['topic'],
subscription['topic_qos'], subscription['target_type'],
subscription['target'])
process.start()
processes.append(process)
finally:
db_connection.close()
while True:
#print("check for new imports")
time.sleep(4)
if __name__ == "__main__":
main(sys.argv[1:])
问题是 Process 类中的方法没有被触发。这可能是因为我使用的是面向对象编程,而所有示例都不是。但这一定是可能的吧?
很想听听您的想法或建议。
吉诺
最佳答案
将对 self.client.loop_forever()
的调用移出 on_connect()
回调,并将其放在对 self.client 的调用之后。连接(self.host,1883,60)
回调应该快速返回,你所做的就是让回调永远不会返回。
关于Python MQTT 回调未调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37304204/