apache-spark - 将 mqtt 与 pyspark 流结合使用

标签 apache-spark spark-streaming mqtt

我是 spark 和 mqtt 的新手。我正在尝试使用我在网上获得的名为 wordcount.py 的 MQTTUtils 代码

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
        exit(-1)

    sc = SparkContext(appName="PythonStreamingMQTTWordCount")
    ssc = StreamingContext(sc, 1)

    brokerUrl = sys.argv[1]
    topic = sys.argv[2]

    lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

我按照说明安装了 mosquitto 代理(它正在工作),下载 spark-streaming-mqtt-assembly_2.11-1.6.2.jar 并使用以下命令运行 python 脚本:
~$ spark-submit --jars spark-streaming-mqtt-assembly_*.jar wordcount.py

但显示的错误:

从 pyspark.streaming.mqtt 导入 MQTTUtils

ImportError:没有名为 mqtt 的模块

那是我错过了这里的任何东西吗?
谢谢

最佳答案

对于 spark 版本 2.*,我们可以在 Structured Streaming 中使用 MQTT通过包括Bahir Jar。

从 pyspark 连接到 MQTT 代理:

(spark
    .readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("topic","mytopic")
    .load("tcp://{}".format(broker_uri)))

关于apache-spark - 将 mqtt 与 pyspark 流结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39349068/

相关文章:

apache-kafka - Spark Streaming 应用程序因 KafkaException : String exceeds the maximum size or with IllegalArgumentException 而失败

hadoop - 为什么某些工作节点在运行 Spark 应用程序时会占用更多系统 CPU?

performance - Spark read.parquet 花费太多时间

scala - 如何在本地模式下更改执行程序的数量?

java - 如何将MQTT Paho导入Java 'playground'?

Android非常频繁地杀死服务

java - Paho MQTT 与 MQTT paho spring 集成

apache-spark - 具有嵌套列的 Apache Spark 窗口函数

java - Spark 可以预初始化重型第三方库吗?

python - 如何解决 ERROR Executor - Exception in task 0.0 in stage 20.0 (TID 20)?