python - getBatch 从 MQTTTextStreamSource 返回的 DataFrame 没有 isStreaming=true

标签 python apache-spark pyspark spark-structured-streaming apache-bahir

我尝试将 MQTT 与 PySpark Structured Streaming 结合使用。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[4]") \
    .getOrCreate()

# Custom Structured Streaming receiver
lines = spark\
             .readStream\
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
             .option("topic","uwb/distances")\
             .option('brokerUrl', 'tcp://127.0.0.1:1883')\
             .load()

# Split the lines into words
words = lines.select(explode(split(lines.value, ' ')).alias('word'))

# Generate running word count
wordCounts = words.groupBy('word').count()

# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

错误信息:

Logical Plan:
Aggregate [word#7], [word#7, count(1) AS count#11L]
+- Project [word#7]
   +- Generate explode(split(value#2,  )), false, [word#7]
      +- StreamingExecutionRelation org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1, [value#2, timestamp#3]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1 did not have isStreaming=true

我不明白我的代码有什么问题。此外,根据this post Bahir MQTT 实际上支持 Structured Streaming 2.1.0。我也尝试了 Spark 2.2.1 并遇到了同样的问题。

这是我运行代码的方式:

spark-submit \
  --jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, \
  lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, \
  lib/org.eclipse.paho.client.mqttv3-1.2.0.jar \
  TestSpark.py

我该如何解决这个问题?

最佳答案

我下载了 Spark 2.2.0 二进制文件并执行了如下代码:

~/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit \
    --jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, \
    lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, \
    lib/org.eclipse.paho.client.mqttv3-1.2.0.jar \
    TestSpark.py

这解决了问题。以前我只是更改 MQTT jar 文件的版本,例如spark-streaming-mqtt_2.11-2.2.1.jar,但显然还不够。

关于python - getBatch 从 MQTTTextStreamSource 返回的 DataFrame 没有 isStreaming=true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51537178/

相关文章:

scala - 在 Spark 中加入两个 HDFS 文件

apache-spark - pyspark 的 toDF() 与 createDataFrame() 的奇怪行为

python - 用 Pandas 延迟加载 csv

apache-spark - Spark scala从Dataframe创建列,其值取决于日期时间范围

python - Python 线程中的死锁

python - PySpark。读取 Parquet 时通过转换为字符串来合并模式?

docker - Py4JJavaError : An error occurred while calling o45. 加载。 : java. lang.NoClassDefFoundError:org/apache/spark/sql/sources/v2/StreamWriteSupport

apache-spark - PySpark isin 函数

python - 数据框和子图中的日期错误

python - 为什么 urlparse.urlenparse 工作不一致?