scala - 注册为表格后,从kafka读取到spark的数据消失了?

标签 scala apache-spark apache-kafka

考虑从 dataframe 写入的数据至 kafka然后阅读 kafka返回一个新的 dataframe :

// Write from df to kafka
val wdf  = airj.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "air2008")
  .save

现在读回数据
// Read from kafka into spark df
import org.apache.spark.sql.functions._
val flights = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "air2008")
  .load())

多少条记录?
scala> flights.count
res36: Long = 5824436

让我们将其注册为一个表:
flights.createOrReplaceTempView("flights_raw")

让我们换一种方式问这个问题:有多少记录 .. ??!
spark.sql("select count(1) from flights_raw").show
+--------+
|count(1)|
+--------+
|0       |
+--------+

让我们再用第一种方式问这个问题:
scala> flights.count
res40: Long = 0

这里发生了什么 ?

最佳答案

createOrReplaceTempView被惰性求值意味着它不会持久化到内存中。为此,您必须cache数据。

flights.cache
flights.createOrReplaceTempView("flights_raw")

或者
flights.createOrReplaceTempView("flights_raw")
spark.table("flights_raw")
spark.table("flights_raw").cache
spark.table("flights_raw").count

应该做的伎俩。

关于scala - 注册为表格后,从kafka读取到spark的数据消失了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55563921/

相关文章:

java - 将 Scala 选项转换为 Java 可选

ruby - 动态语言——我应该选择哪一种?

apache-spark - 如何 CROSS JOIN 2 数据框?

java - 如何使用spark-core 1.6.0版本中SparkContext类的getOrCreate方法?

hadoop - 合并分布式应用程序中的输入

java - Kafka 流连接中的 RecordTooLargeException

apache-kafka - kafka 压缩与引用字节的配置有何关系?

java - Kafka Producer 无法通过代码连接

scala - 如何避免 Scala 中类型绑定(bind)的重复

scala - Spark : Transpose DataFrame Without Aggregating