apache-spark - 在 2.0 中将 RDD 转换为 Dataframe

标签 apache-spark apache-spark-sql spark-dataframe

我正在尝试将 rdd 转换为 Spark2.0 中的数据帧

val conf=new SparkConf().setAppName("dataframes").setMaster("local")
val sc=new SparkContext(conf)
val sqlCon=new SQLContext(sc)
import sqlCon.implicits._
val rdd=sc.textFile("/home/cloudera/alpha.dat").persist()
val row=rdd.first()
val data=rdd.filter { x => !x.contains(row) }

data.foreach { x => println(x) }


case class person(name:String,age:Int,city:String)
val rdd2=data.map { x => x.split(",") }
val rdd3=rdd2.map { x => person(x(0),x(1).toInt,x(2)) }
val df=rdd3.toDF()


df.printSchema();
df.registerTempTable("alpha")
val df1=sqlCon.sql("select * from alpha")
df1.foreach { x => println(x) }

但我在 toDF() 处遇到了以下错误。 ---> "val df=rdd3.toDF() "

Multiple markers at this line:
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case 
 classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
- Implicit conversion found: rdd3 ⇒ rddToDatasetHolder(rdd3): (implicit evidence$4: 
 org.apache.spark.sql.Encoder[person])org.apache.spark.sql.DatasetHolder[person]

如何使用 toDF() 将上述内容转换为 Dataframe

最佳答案

Cloudera 和 Spark 2.0?嗯,没想到我们还支持它:)

无论如何,首先您不需要在您的 RDD 上调用 .persist() 以便您可以删除该位。其次,由于 Person 是一个案例类,您应该将其名称大写。

最后,在 Spark 2.0 中,您不再调用 import sqlContext.implicits._ 来隐式构建 DataFrame 模式,您现在调用 import spark.implicits。 _。您的错误消息暗示了这一点。

关于apache-spark - 在 2.0 中将 RDD 转换为 Dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40640620/

相关文章:

apache-spark - Spark Structured Streaming 如何确定事件已晚到?

scala - 使用FlatMap使用Spark和Scala将列名称附加到元素

python - 如何通过多值列过滤JSON数据

SQL - 为列 B 中缺少的每个不同元素的 A 列中的每个不同元素插入值为 0 的行

java - Apache Spark 数据集 API - 不接受架构 StructType

apache-spark - 为什么spark在sql查询的末尾追加 'WHERE 1=0'

apache-spark - 在 spark 2.3.2 中,调用 Dataset.count() 时出现 java.lang.ClassCastException

hadoop - Mapreduce作业提交与Spark作业提交

apache-spark - 如何检索输出大小和从 Spark UI 写入的记录等指标?

python - 如何将二进制文件从 hdfs 读入 Spark 数据帧?