6 我正在尝试编写一个 UDF,它采用如下所示的字符串数组:
String[] lol = {"1,2,3","1,2,3","2,3,4","1,4,5,6,7"};
我希望 UDF 返回一个不重复的整数数组。
我首先在查询表单中收集列表,该数据帧有两个字段:用户 ID 和类别,类别是一个看起来像“1,2,3”的字符串,并按用户分组。
df.groupBy("userid").agg(collect_list("category").as("categories")).write().mode(SaveMode.Overwrite).parquet("path");
然后我想运行我的 UDF:
ctx.read().parquet("path").select(col("userid"), trimCategories3("categories", ctx).as("categories")).show();
我的UDF:
public static Column trimCategories3(String column, SQLContext ctx) {
UDF1 udf = new UDF1<String[], Integer[]>() {
@Override
public Integer[] call(String[] categories) throws Exception {
Set<Integer> result = new HashSet<>();
for(String s : categories) {
Set<Integer> med = Arrays.stream(s.split("\\,"))
.map(Integer::parseInt)
.collect(Collectors.toSet());
result.addAll(med);
}
return result.toArray(new Integer[result.size()]);
}
};
ctx.udf().register("trimCategories", udf, DataTypes.createArrayType(DataTypes.IntegerType));
return callUDF("trimCategories", col(column));
}
这给了我: java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef 无法转换为 [Ljava.lang.String;
由于我是编程新手并且不了解 Scala,因此我需要一些帮助。在 Spark 错误日志中,我得到行号,其中 UDF1 udf = new UDF1<String[], Integer[]>() {
开始。
当我尝试在测试类中运行该代码时,该代码可以工作。希望得到一些指导。
干杯!
最佳答案
发现问题,是UDF的输入类型是Scala WrappedArray。 (对我来说)奇怪的部分是,collect_list函数(在文档中应该返回一个列表)返回一个Scala Wrapped数组,当我之前运行printSchema函数时,它说类型为array:string。这就是我将 UDF 的输入类型设置为 String[] 的原因。代码中的解决方案:
public static Column trimCategories3(String column, SQLContext ctx) {
UDF1 udf = new UDF1<WrappedArray<String>, Integer[]>() {
@Override
public Integer[] call(WrappedArray<String> categories) throws Exception {
Set<Integer> result = new HashSet<>();
scala.collection.Iterator it = categories.iterator();
while (it.hasNext()) {
String s = (String) it.next();
Set<Integer> med = Arrays.stream(s.split("\\,"))
.map(Integer::parseInt)
.collect(Collectors.toSet());
result.addAll(med);
}
return result.stream().toArray(Integer[]::new);
}
};
ctx.udf().register("trimCategories", udf, DataTypes.createArrayType(DataTypes.IntegerType));
return callUDF("trimCategories", col(column));
}
关于返回整数数组的 Java Spark UDF 给我 ClassException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42791931/