scala - 使用 Spark Dataframe 中的函数创建新列

标签 scala apache-spark dataframe

我正在尝试找出 Spark 中的新数据帧 API。看起来是向前迈出的一大步,但在做一些本应非常简单的事情时却遇到了困难。我有一个包含 2 列的数据框,“ID”和“金额”。作为一个通用示例,假设我想返回一个名为“code”的新列,该列返回基于“Amt”值的代码。我可以编写一个类似这样的函数:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

当我尝试像这样使用它时:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

我收到类型不匹配错误

found   : org.apache.spark.sql.Column
required: Integer

我尝试将函数的输入类型更改为 org.apache.spark.sql.Column,但随后我开始在函数编译时遇到错误,因为它需要在 if 语句中使用 bool 值。

我这样做错了吗?有没有比使用 withColumn 更好/另一种方法来做到这一点?

感谢您的帮助。

最佳答案

假设您的架构中有“Amt”列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

我认为 withColumn 是添加列的正确方法

关于scala - 使用 Spark Dataframe 中的函数创建新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30219592/

相关文章:

r - 快速融化的数据表操作

scala - 家庭多态性+混合?

java - Scala中的复合设计模式?

java - 正则表达式 - 除了给定单词之外的所有有效标识符?

apache-spark - standalone安装的hadoop和spark自带的hadoop有什么区别?

python - dataprep.eda TypeError : Please provide npartitions as an int, 或如果指定 chunksize 则可能为 None

java - 是否可以使用支持传输的 netty 和 arterr 来运行 akka 系统?

join - 如何在 Apache Spark SQL 中执行更新

apache-spark - 为什么 Window 函数失败并显示 "Window function X does not take a frame specification"?

python - 如何按元素移动数据帧以填充 NaN?