scala - 如何将 RDD[Row] 转换回 DataFrame

标签 scala apache-spark dataframe rdd

这个问题在这里已经有了答案:





How to convert rdd object to dataframe in spark

(12 个回答)


5年前关闭。




我一直在尝试将 RDD 转换为 DataFrame 并再次转换回来。首先,我有一个类型为 (Int, Int) 的 RDD,称为 dataPair。然后我使用以下方法创建了一个带有列标题的 DataFrame 对象:

val dataFrame = dataPair.toDF(header(0), header(1))

然后我使用以下方法将它从 DataFrame 转换回 RDD:
val testRDD = dataFrame.rdd

它返回一个 org.apache.spark.sql.Row 类型的 RDD(不是 (Int, Int))。然后我想使用 .toDF 将其转换回 RDD,但出现错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

我已经尝试为 testRDD 定义一个 Data(Int, Int) 类型的模式,但是我得到了类型不匹配的异常:
error: type mismatch;
found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
    val testRDD: RDD[Data] = dataFrame.rdd
                                       ^

我已经导入了
import sqlContext.implicits._

最佳答案

要从行的 RDD 创建 DataFrame,通常有两个主要选项:

1) 您可以使用 toDF()可以通过 import sqlContext.implicits._ 导入.但是,这种方法仅适用于以下类型的 RDD:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

  • (来源:SQLContext.implicits 对象的 Scaladoc)

    最后一个签名实际上意味着它可以用于元组的 RDD 或案例类的 RDD(因为元组和案例类是 scala.Product 的子类)。

    因此,要将此方法用于 RDD[Row] ,您必须将其映射到 RDD[T <: scala.Product] .这可以通过将每一行映射到自定义案例类或元组来完成,如下面的代码片段所示:
    val df = rdd.map({ 
      case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
    }).toDF("col1_name", ..., "colN_name")
    

    或者
    case class MyClass(val1: String, ..., valN: Long = 0L)
    val df = rdd.map({ 
      case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
    }).toDF("col1_name", ..., "colN_name")
    

    这种方法的主要缺点(在我看来)是您必须在 map 函数中逐列显式设置生成的 DataFrame 的架构。如果您事先不知道架构,也许这可以通过编程方式完成,但那里的事情可能会变得有点困惑。因此,或者,还有另一种选择:

    2) 您可以使用 createDataFrame(rowRDD: RDD[Row], schema: StructType) ,可在 SQLContext 中找到目的。例子:
    val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
    

    请注意,无需显式设置任何架构列。我们重用旧的 DF 模式,即 StructType类并且可以轻松扩展。但是,这种方法有时是不可能的,并且在某些情况下可能比第一种方法效率低。

    我希望它比以前更清楚。干杯。

    关于scala - 如何将 RDD[Row] 转换回 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37011267/

    相关文章:

    scala - 使用 TestActorRef 测试 actor 崩溃

    python - 在 Zeppelin 上运行 Jupyter/IPython 文档

    scala - 使用 Map 替换 Spark 中的列值

    python - 为 Pyspark 数据框按多列重新分区

    python - Pandas 数据框对重复行执行计算

    R:如何根据单元格中的值将数据框中的一行拆分为多行?

    Scala 函数/方法参数化返回类型

    scala - 为什么在 Scala 中创建对象很快?

    scala - Spark : how to zip an RDD with each partition of the other RDD

    python - 使用 python 获取数据框中每列的唯一字符串值列表