我有一个像下面这样的 JSON
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
我正在加载我的 JSON 文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
+-------+----------+----------+
| name|parameter1|parameter2|
+-------+----------+----------+
|method1| P1name| 1.0 |
+-------+----------+----------+
我有一个像下面这样的功能:
def method1(P1:String, P2:Double)={
| print(P1)
print(P2)
| }
在执行下面的代码后,我根据列名调用我的方法1,它应该执行方法1。
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
但我收到波纹管错误。
<console>:63: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: String
请让我知道如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String
最佳答案
当您将参数传递为
method1($"parameter1",$"parameter2")
您将列传递给函数而不是原始数据类型。所以,我建议你改变你的
method1
和 method2
如 udf
函数,如果你想在函数内部应用原始数据类型操作。和 udf
函数必须为新列的每一行返回一个值。import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
def method2 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
那么你的
withColumn
api 应该可以正常工作df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
注意:udf 函数执行数据序列化和反序列化以更改要按行处理的列数据类型,这会增加复杂性和大量内存使用。 spark functions应尽可能使用
关于scala - 如何在 Spark Scala 中将 org.apache.spark.sql.ColumnName 转换为字符串、十进制类型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47408379/