我有一个 Spark 数据集的列(在 java 中),我希望该列的所有值都成为新列的列名(新列可以用常量值填充)。
For example I have:
+------------+
| Column |
+------------+
| a |
| b |
| c |
+------------+
And I want:
+------+----+----+---+
|Column| a | b | c |
+------+----+----+---+
| a | 0 | 0 |0 |
| b | 0 | 0 |0 |
| c | 0 | 0 |0 |
+------+----+----+---+
我尝试的是:
public class test{
static SparkSession spark = SparkSession.builder().appName("Java")
.config("spark.master", "local").getOrCreate();
static Dataset<Row> dataset = spark.emptyDataFrame();
public Dataset<Row> test(Dataset<Row> ds, SparkSession spark) {
SQLContext sqlContext = new SQLContext(spark);
sqlContext.udf().register("add", add, DataTypes.createArrayType(DataTypes.StringType));
ds = ds.withColumn("substrings", functions.callUDF("add", ds.col("Column")));
return ds;
}
private static UDF1 addSubstrings = new UDF1<String, String[]>() {
public String[] call(String str) throws Exception {
dataset = dataset.withColumn(str, functions.lit(0));
String[] a = {"placeholder"};
return a;
}
};
}
我的问题是,有时我得到正确的结果,有时却得不到(列未添加)。我不太明白为什么。我正在寻找一种将数据集传递给 UDF 的方法,但我不知道如何进行。
目前我正在通过使用列的collectAsList()来解决这个问题,然后迭代Arraylist并从而添加新列。但这确实效率很低,因为我的数据太多了。
最佳答案
对于此用例,您可以使用 pivot
:
ds
.withColumn("pivot_column", $"first_column")
.groupBy($"first_column")
.pivot("pivot_column")
.count
如果您想要更好的性能,您可能需要在数据透视表中提供可能的值,例如 pivot("pivot_column", Seq("a", "b", "c"))
我用过count
用于聚合,但您可以进行任何您想要的聚合。
From
+------------+
|first_column|
+------------+
| a |
| b |
| c |
+------------+
To
+------------+---+---+---+
|first_column| a | b | c |
+------------+---+---+---+
| a | 1 | 0 | 0 |
| b | 0 | 1 | 0 |
| c | 0 | 0 | 1 |
+------------+---+---+---+
关于java - 有没有办法在 UDF 中添加新列(在 java Spark 中),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57408717/