考虑从 dataframe
写入的数据至 kafka
然后阅读 kafka
返回一个新的 dataframe
:
// Write from df to kafka
val wdf = airj.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "air2008")
.save
现在读回数据
// Read from kafka into spark df
import org.apache.spark.sql.functions._
val flights = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "air2008")
.load())
多少条记录?
scala> flights.count
res36: Long = 5824436
让我们将其注册为一个表:
flights.createOrReplaceTempView("flights_raw")
让我们换一种方式问这个问题:有多少记录 .. ??!
spark.sql("select count(1) from flights_raw").show
+--------+
|count(1)|
+--------+
|0 |
+--------+
让我们再用第一种方式问这个问题:
scala> flights.count
res40: Long = 0
这里发生了什么 ?
最佳答案
createOrReplaceTempView
被惰性求值意味着它不会持久化到内存中。为此,您必须cache
数据。
flights.cache
flights.createOrReplaceTempView("flights_raw")
或者
flights.createOrReplaceTempView("flights_raw")
spark.table("flights_raw")
spark.table("flights_raw").cache
spark.table("flights_raw").count
应该做的伎俩。
关于scala - 注册为表格后,从kafka读取到spark的数据消失了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55563921/