scala - 将两列传递给scala中的udf?

标签 scala apache-spark user-defined-functions

我有一个包含两列的数据框,一列是数据,另一列是
该数据字段中的字符数。

Data    Count
Hello   5
How     3
World   5

我想根据计数列中的值更改列数据的值。如何做到这一点?我用 udf 试过这个:
invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("value"),invalidrecords("a_cnt")))

这似乎失败了,这是正确的做法吗?

最佳答案

这是一个简单的方法

首先你创建一个 dataframe

import sqlContext.implicits._
val invalidrecords = Seq(
  ("Hello", 5),
  ("How", 3),
  ("World", 5)
).toDF("Data", "Count")

你应该有
+-----+-----+
|Data |Count|
+-----+-----+
|Hello|5    |
|How  |3    |
|World|5    |
+-----+-----+

然后你定义 udf 函数为
import org.apache.spark.sql.functions._
def appendDelimiterError = udf((data: String, count: Int) => "value with error" )

然后你调用 withColumn作为
invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

你应该有输出
+-----+-----+----------------+
|Data |Count|value           |
+-----+-----+----------------+
|Hello|5    |value with error|
|How  |3    |value with error|
|World|5    |value with error|
+-----+-----+----------------+

您可以编写逻辑而不是从 udf 返回字符串。功能

已编辑

在下面的评论中回答您的要求将需要您更改 udf 函数和 withColumn 如下
def appendDelimiterError = udf((data: String, count: Int) => {
  if(count < 5) s"convert value to ${data} - error"
  else data
} )

invalidrecords.withColumn("Data",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

你应该有输出
+----------------------------+-----+
|Data                        |Count|
+----------------------------+-----+
|Hello                       |5    |
|convert value to How - error|3    |
|World                       |5    |
+----------------------------+-----+

关于scala - 将两列传递给scala中的udf?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44970829/

相关文章:

scala - 无法使用 Maven 项目从 Eclipse 通过 HiveContext 访问配置单元表

scala - 我可以在 Scala 中没有包私有(private)类吗?

scala - Spark 数据框到箭头

recursion - 将递归 makefile 变量导出到子 make

scala - 为什么在一种情况下而不是另一种情况下得到 “missing parameter for expanded function”?

scala - 不确定如何在 Scala 中最佳地执行此列表转换

python - PySpark 拆分行并转换为 RDD

apache-spark - Spark 根据字母分区写入 Parquet

javascript - 如何通过 start : stop: while using the parameters 'ev' and 'ui' ? 使用我自己的函数

apache-spark - Spark SQL(语言,而非 API)和 UDF 的行数据访问