我正在尝试根据另一列的值在 Spark 数据集中创建一个新列。另一列的值作为键在 json 文件中搜索,返回的值是用于新列的值。
这是我尝试过的代码,但它不起作用,而且我不确定 UDF 是如何工作的。在这种情况下,如何使用 withColumn 或 udf 添加列?
Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file path");
Object obj = new JSONParser().parse(new FileReader("json path"));
JSONObject jo = (JSONObject) obj;
df = df.withColumn("cluster", functions.lit(jo.get(df.col("existing col_name")))));
任何帮助将不胜感激。提前致谢!
最佳答案
Spark 允许您使用 udf 函数创建自定义用户定义函数 (UDF)。
以下是如何定义 UDF 的 scala 片段。
val obj = new JSONParser().parse(new FileReader("json path"));
val jo = obj.asInstanceOf[JSONObject];
def getJSONObject(key: String) = {
jo.get(key)
}
定义函数后,您可以将其转换为 UDF,如下所示:
val getObject = udf(getJSONObject _)
有两种使用 UDF 的方法。
df.withColumn("cluster", lit(getObject(col("existing_col_name"))))
如果您使用spark sql,则必须在使用之前在sqlContext中注册您的udf。
spark.sqlContext.udf.register("get_object", getJSONObject _)
然后您可以将其用作
spark.sql("从 some_table 中选择 get_object(existing_column)")
其中,使用哪一个完全是主观的。
关于java - 无法理解 Spark 中的 UDF,尤其是 Java 中的 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52711450/