Scala - 如何将 Dataset[Row] 转换为可以添加到 Dataframe 的列

标签 scala apache-spark dataframe dataset

我正在尝试将一列的数据框添加到更大的数据框,但是第一个数据框的问题是在创建它并尝试通过命令将其添加到主数据框之后:

  df.withColumn("name", dataframe)

我得到错误:

 **found   : org.apache.spark.sql.DataFrame
 (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Column**

我知道 Dataset[Row] 应该是 Dataframe 的同义词,但是我不确定如何解决这个错误。

对于上下文,我的代码的(真正)精简版如下:

// test function - will be used as part of the main script below
def Test(inputone: Double, inputtwo: Double): Double = { 
 var test = (2 * inputone) + inputtwo
 test 
}

对于主脚本(即问题所在)

//Importing the data via CSV
var df = sqlContext.read.format("csv").option("header",     "true").option("inferSchema", "true").load("/root/file.csv")

给出数据的上下文:

df: org.apache.spark.sql.DataFrame = [ID: int, blue: int ... 8 more fields]

+---+----+------+-----+------+------+----+---+-----+-----+
| ID|blue|purple|green|yellow|orange|pink|red|white|black|
+---+----+------+-----+------+------+----+---+-----+-----+
|  1| 500|    44|    0|     0|     3|   0|  5|   43|    2|
|  2| 560|    33|    1|     0|     4|   0| 22|   33|    4|
|  3| 744|    44|    1|    99|     3|1000| 78|   90|    0|
+---+----+------+-----+------+------+----+---+-----+-----+

root
 |-- ID: integer (nullable = true)
 |-- blue: integer (nullable = true)
 |-- purple: integer (nullable = true)
 |-- green: integer (nullable = true)
 |-- yellow: integer (nullable = true)
 |-- orange: integer (nullable = true)
 |-- pink: integer (nullable = true)
 |-- red: integer (nullable = true)
 |-- white: integer (nullable = true)
 |-- black: integer (nullable = true)

从那时起,脚本继续

// Creating a list for which columns to draw from the main dataframe
val a = List("green", "blue")

// Creating the mini dataframe to perform the function upon
val test_df = df.select(a.map(col): _*)

// The new dataframe will now go through the 'Test' function defined above
val df_function = test_df.rdd.map(col => Test(col(0).toString.toDouble, col(1).toString.toDouble))

// Converting the RDD output back to a dataframe (of one column)
val df_convert = df_function.toDF

作为引用,输出如下所示

+-----+
|value|
+-----+
|500.0|
|562.0|
|746.0|
+-----+

脚本的最后一行是将其添加到主数据框,如下所示

 df = df.withColumn("new column", df_convert)

但如上所述,我收到以下错误:

found   : org.apache.spark.sql.DataFrame

   (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

required: org.apache.spark.sql.Column

//////////编辑////////////

@user9819212 解决方案适用于简单的方法,但是当调用稍微复杂一点的方法时,出现以下错误

    test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function5>,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType)))
    java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function1

所以我尝试创建另一个简单版本的代码,对调用的测试函数进行一些额外更改

// test function - will be used as part of the main script below
def Test (valueone: Double, valuetwo: Integer): Double = {
    val test = if(valuetwo > 2000) valueone + 4000 else valueone
    val fakeList = List(3000,4000,500000000)
    val index = fakeList.indexWhere(x => x>=valueone)
    val test2 = fakeList(index - 1) * valueone
    test2
}

val test_udf = udf(Test _)

df = df.withColumn(
   "new column", 
   test_udf(col("green").cast("double"), col("blue").cast("integer"))
)

起初这似乎可行,但是当我尝试使用命令查看数据帧时

df.show

出现以下错误

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 153.0 failed 1 times, most recent failure: Lost task 0.0 in stage 153.0 (TID 192, localhost, executor driver): 
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (double, int) => double)

最佳答案

您不能通过这种方式从另一个 DataFrame(或 DataFrame)添加列。只需使用 UserDefinedFunction:

import org.apache.spark.sql.functions.udf._

val test_udf = udf(Test _)

df.withColumn(
   "new column", 
   test_udf(col("green").cast("double"), col("blue").cast("double"))
)

或者这么简单的函数:

df.withColumn(
   "new column", 
   2 * col("green").cast("double") + col("blue").cast("double")
)

关于Scala - 如何将 Dataset[Row] 转换为可以添加到 Dataframe 的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50435767/

相关文章:

scala - 如何控制从 Spark DataFrame 写入的输出文件的数量?

scala - log4j2.properties 调整以仅为 Spark 设置特定的日志级别

apache-spark - 如何确保RDD的每个分区都有一些数据

r - 从十亿转换为百万,反之亦然

python : choose only users that have purchased more than x element

python - 将数据框保存到 csv 文件(python)

scala - 如何在Akka HTTP中匹配所有路径

scala - 玩2.4.1,PlayEbean未找到

postgresql - Squeryl 中的显式 jsonb 类型转换

scala - 如何在 Scaladoc 中全局搜索方法?