apache-spark - 发生异常 : pyspark. sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

标签 apache-spark pyspark spark-structured-streaming

在代码中 如果不是 df.head(1).isEmpty: 我遇到了异常,

Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不知道如何在流数据中使用 if 。 当我使用 jupyter 执行每一行时,代码很好,我可以得到结果。但使用 .py 不好。

我的目的是这样的:我想使用流式传输每秒从kafka获取数据,然后我将每批流数据(一批意味着我一秒钟得到的数据)转换为pandas dataframe,然后我使用pandas函数对数据进行处理,最后将结果发送到其他kafka主题。

请帮助我,并原谅我的台球英语,非常感谢。

sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data") \
  .load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

if not df.head(1).isEmpty:
  alertQuery = ds \
          .writeStream \
          .queryName("qalerts")\
          .format("memory")\
          .start()

  alerts = spark.sql("select * from qalerts")
  pdAlerts = alerts.toPandas()
  a = pdAlerts['value'].tolist()

  d = []
  for i in a:
      x = json.loads(i)
      d.append(x)

  df = pd.DataFrame(d)
  print(df)
  ds = df['jobID'].unique().tolist()


  dics = {}
  for source in ds:
      ids = df.loc[df['jobID'] == source, 'id'].tolist()
      dics[source]=ids

  print(dics)  
query = ds \
  .writeStream \
  .queryName("tableName") \
  .format("console") \
  .start()

query.awaitTermination()

最佳答案

删除if not df.head(1).isEmpty:,你应该没问题。

异常的原因很简单,即流式查询是一种永远不会结束并且不断执行的结构化查询。根本不可能查看单个元素,因为不存在“单个元素”,而是(可能)有数千个元素,并且很难判断何时您想查看幕后并只看到一个元素单个元素。

关于apache-spark - 发生异常 : pyspark. sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54472860/

相关文章:

hadoop - 如何从 Spark MLlib FP Growth 模型中提取数据

python - 如何在 python 中使用 `map` 将 dict 值转换为整数?

scala - Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

apache-spark - 在 Spark Streaming (Spark 2.0) 中使用 Kafka

java - 使用 couchbase-spark-connector 时出错。抛出 BackPressureException

apache-spark - pyspark 预期构建 ClassDict 的参数为零(对于 pyspark.mllib.linalg.DenseVector)

apache-spark - 来自 RDD 的 PySpark LDA 模型密集向量

python - PySpark 中的随机数生成

python - PySpark 分发模块导入

apache-spark - 触发流无法写入hdfs路径