java - 无法理解 Spark 中的 UDF,尤其是 Java 中的 UDF

标签 java apache-spark dataset user-defined-functions

我正在尝试根据另一列的值在 Spark 数据集中创建一个新列。另一列的值作为键在 json 文件中搜索,返回的值是用于新列的值。

这是我尝试过的代码,但它不起作用,而且我不确定 UDF 是如何工作的。在这种情况下,如何使用 withColumn 或 udf 添加列?

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file path");
        Object obj = new JSONParser().parse(new FileReader("json path"));
        JSONObject jo = (JSONObject) obj;

        df = df.withColumn("cluster", functions.lit(jo.get(df.col("existing col_name")))));

任何帮助将不胜感激。提前致谢!

最佳答案

Spark 允许您使用 udf 函数创建自定义用户定义函数 (UDF)。

以下是如何定义 UDF 的 scala 片段。

val obj = new JSONParser().parse(new FileReader("json path"));
val jo = obj.asInstanceOf[JSONObject];

def getJSONObject(key: String) = {
   jo.get(key)
}

定义函数后,您可以将其转换为 UDF,如下所示:

 val getObject = udf(getJSONObject _)

有两种使用 UDF 的方法。

  1. df.withColumn("cluster", lit(getObject(col("existing_col_name"))))

  2. 如果您使用spark sql,则必须在使用之前在sqlContext中注册您的udf。

    spark.sqlContext.udf.register("get_object", getJSONObject _)

    然后您可以将其用作

    spark.sql("从 some_table 中选择 get_object(existing_column)")

其中,使用哪一个完全是主观的。

关于java - 无法理解 Spark 中的 UDF,尤其是 Java 中的 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52711450/

相关文章:

apache-spark - K-Means 聚类偏向于一个中心

vb.net - 如何使用 DataSet 作为数据源过滤 DataGridView

java - Netbeans GUI 和 SQL 问题

python - Apache Spark : How to create a matrix from a DataFrame?

java - 使用 SparseIntArray 和其他来自 android.util.* 的本地单元测试

python - 将RDD保存为pyspark中的序列文件

python - 如何在 Pytorch 中为图像及​​其掩模制作自定义数据集?

algorithm - 无意义 “Nearest Neighbor” 的数据集?

java - 具有多个装饰器文件的站点网

java - 如何将毫秒转换为相应的日期?