java - 有没有办法在 UDF 中添加新列(在 java Spark 中)

标签 java apache-spark user-defined-functions

我有一个 Spark 数据集的列(在 java 中),我希望该列的所有值都成为新列的列名(新列可以用常量值填充)。

For example I have:
+------------+
|    Column  | 
+------------+
| a          | 
| b          |
| c          |
+------------+

And I want: 
+------+----+----+---+
|Column| a  |  b | c |
+------+----+----+---+
| a    | 0  | 0  |0  |
| b    | 0  | 0  |0  |
| c    | 0  | 0  |0  |
+------+----+----+---+

我尝试的是:

public class test{

    static SparkSession spark = SparkSession.builder().appName("Java")
            .config("spark.master", "local").getOrCreate();
    static Dataset<Row> dataset = spark.emptyDataFrame();

    public Dataset<Row> test(Dataset<Row> ds, SparkSession spark) {
        SQLContext sqlContext = new SQLContext(spark);
        sqlContext.udf().register("add", add, DataTypes.createArrayType(DataTypes.StringType));
        ds = ds.withColumn("substrings", functions.callUDF("add", ds.col("Column")));
        return ds;
    }

    private static UDF1 addSubstrings = new UDF1<String, String[]>() {
        public String[] call(String str) throws Exception {
            dataset = dataset.withColumn(str, functions.lit(0));
            String[] a = {"placeholder"};
            return a;
        }
    };
}

我的问题是,有时我得到正确的结果,有时却得不到(列未添加)。我不太明白为什么。我正在寻找一种将数据集传递给 UDF 的方法,但我不知道如何进行。

目前我正在通过使用列的collectAsList()来解决这个问题,然后迭代Arraylist并从而添加新列。但这确实效率很低,因为我的数据太多了。

最佳答案

对于此用例,您可以使用 pivot :

ds
 .withColumn("pivot_column", $"first_column")
 .groupBy($"first_column")
 .pivot("pivot_column")
 .count

如果您想要更好的性能,您可能需要在数据透视表中提供可能的值,例如 pivot("pivot_column", Seq("a", "b", "c"))

我用过count用于聚合,但您可以进行任何您想要的聚合。

From
+------------+
|first_column| 
+------------+
| a          | 
| b          |
| c          |
+------------+

To

+------------+---+---+---+
|first_column| a | b | c |
+------------+---+---+---+
| a          | 1 | 0 | 0 |
| b          | 0 | 1 | 0 |
| c          | 0 | 0 | 1 |
+------------+---+---+---+

关于java - 有没有办法在 UDF 中添加新列(在 java Spark 中),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57408717/

相关文章:

java - JAXB "If a class has @XmlElement property, it cannot have @XmlValue property."

python - 使用 UDF 加入 Pyspark Dataframe

r - 使用 expss::cro() 编写用户定义的函数

Javafx 如何在 GridPane 中找到特定按钮

java - 如何在现有类型中声明或定义通用 xsd 类型。

java继承的最佳实践?

apache-spark - Apache Spark 中 `registerTempTable` 和 `createTempView` 之间的区别

hadoop - Spark Streaming检查点到远程HDFS

scala - 如何根据条件(组中的值)更新列?

hadoop - Pig - FilterFunc 不接受整个元组