我是 spark 和 mqtt 的新手。我正在尝试使用我在网上获得的名为 wordcount.py 的 MQTTUtils 代码
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)
brokerUrl = sys.argv[1]
topic = sys.argv[2]
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我按照说明安装了 mosquitto 代理(它正在工作),下载 spark-streaming-mqtt-assembly_2.11-1.6.2.jar 并使用以下命令运行 python 脚本:
~$ spark-submit --jars spark-streaming-mqtt-assembly_*.jar wordcount.py
但显示的错误:
从 pyspark.streaming.mqtt 导入 MQTTUtils
ImportError:没有名为 mqtt 的模块
那是我错过了这里的任何东西吗?
谢谢
最佳答案
对于 spark 版本 2.*,我们可以在 Structured Streaming 中使用 MQTT通过包括Bahir Jar。
从 pyspark 连接到 MQTT 代理:
(spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic","mytopic")
.load("tcp://{}".format(broker_uri)))
关于apache-spark - 将 mqtt 与 pyspark 流结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39349068/