python - 如何使用 pyspark 从 Kafka 获取并打印一行?必须使用 writeStream.start() 执行流式源查询

标签 python apache-spark pyspark spark-structured-streaming

我正在尝试从 Kafka 读取一些数据以查看那里有什么。

我写了

builder = SparkSession.builder\
   .appName("PythonTest01")

spark = builder.getOrCreate()

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
  .option("subscribe", dataFlowTopic) \
  .load()

# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.printSchema()

df = df.first()

query = df \
    .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

不幸的是,它发誓

pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

它想要什么以及如何满足它?


如果我删除 first() 它发誓

Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;

我要写

#df = df.first()

query = df \
    .writeStream \
    .outputMode('append') \
    .format('console') \
    .start()

query.awaitTermination()

它不是首先打印,而是最后一行并且不会终止。

最佳答案

and not terminates.

是 Steam ;它并不意味着终止

printing not first, but last row

引用startingOffsets选项。默认为latest

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#reading-data-from-kafka

关于python - 如何使用 pyspark 从 Kafka 获取并打印一行?必须使用 writeStream.start() 执行流式源查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66953694/

相关文章:

python - Python-[Errno 10054]现有连接被远程主机强行关闭-发送腌制数据问题

scala - 如何避免在 Spark 中广播大型查找表

python - 将数据插入数据库时​​PySpark NoSuchMethodError : sun. nio.ch.DirectBuffer.cleaner

python - 有没有办法可以设置 python turtle 的背景颜色?

python - 自动 int 到 float 转换

python - 将实例保存到数据库之前/之后是否触发了 Django post_save?

scala - 我如何在 Spark Rdd 中转换 Seq

apache-spark - sbt - 构建 Spark 需要太多时间

python - 删除与某些行重复的所有行

scala - pyspark 与 scala 中的 FPgrowth 计算关联