我正在尝试从 Kafka
读取一些数据以查看那里有什么。
我写了
builder = SparkSession.builder\
.appName("PythonTest01")
spark = builder.getOrCreate()
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", dataFlowTopic) \
.load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
df = df.first()
query = df \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
不幸的是,它发誓
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
它想要什么以及如何满足它?
如果我删除 first()
它发誓
Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
我要写
#df = df.first()
query = df \
.writeStream \
.outputMode('append') \
.format('console') \
.start()
query.awaitTermination()
它不是首先打印,而是最后一行并且不会终止。
最佳答案
and not terminates.
是 Steam ;它并不意味着终止
printing not first, but last row
引用startingOffsets
选项。默认为latest
关于python - 如何使用 pyspark 从 Kafka 获取并打印一行?必须使用 writeStream.start() 执行流式源查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66953694/