返回整数数组的 Java Spark UDF 给我 ClassException

标签 java scala apache-spark

6 我正在尝试编写一个 UDF,它采用如下所示的字符串数组:

String[] lol = {"1,2,3","1,2,3","2,3,4","1,4,5,6,7"};

我希望 UDF 返回一个不重复的整数数组。

我首先在查询表单中收集列表,该数据帧有两个字段:用户 ID 和类别,类别是一个看起来像“1,2,3”的字符串,并按用户分组。

df.groupBy("userid").agg(collect_list("category").as("categories")).write().mode(SaveMode.Overwrite).parquet("path"); 然后我想运行我的 UDF:

ctx.read().parquet("path").select(col("userid"), trimCategories3("categories", ctx).as("categories")).show();

我的UDF:

public static Column trimCategories3(String column, SQLContext ctx) {
  UDF1 udf = new UDF1<String[], Integer[]>() {
    @Override
    public Integer[] call(String[] categories) throws Exception {
      Set<Integer> result = new HashSet<>();
      for(String s : categories) {
        Set<Integer> med = Arrays.stream(s.split("\\,"))
            .map(Integer::parseInt)
            .collect(Collectors.toSet());
        result.addAll(med);
      }
      return result.toArray(new Integer[result.size()]);
    }
  };
  ctx.udf().register("trimCategories", udf, DataTypes.createArrayType(DataTypes.IntegerType));
  return callUDF("trimCategories", col(column));
}

这给了我: java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef 无法转换为 [Ljava.lang.String;

由于我是编程新手并且不了解 Scala,因此我需要一些帮助。在 Spark 错误日志中,我得到行号,其中 UDF1 udf = new UDF1<String[], Integer[]>() {开始。 当我尝试在测试类中运行该代码时,该代码可以工作。希望得到一些指导。 干杯!

最佳答案

发现问题,是UDF的输入类型是Scala WrappedArray。 (对我来说)奇怪的部分是,collect_list函数(在文档中应该返回一个列表)返回一个Scala Wrapped数组,当我之前运行printSchema函数时,它说类型为array:string。这就是我将 UDF 的输入类型设置为 String[] 的原因。代码中的解决方案:

public static Column trimCategories3(String column, SQLContext ctx) {
UDF1 udf = new UDF1<WrappedArray<String>, Integer[]>() {
  @Override
  public Integer[] call(WrappedArray<String> categories) throws Exception {
    Set<Integer> result = new HashSet<>();
    scala.collection.Iterator it = categories.iterator();
    while (it.hasNext()) {
      String s = (String) it.next();
      Set<Integer> med = Arrays.stream(s.split("\\,"))
          .map(Integer::parseInt)
          .collect(Collectors.toSet());
      result.addAll(med);
    }
    return result.stream().toArray(Integer[]::new);
  }
};
ctx.udf().register("trimCategories", udf, DataTypes.createArrayType(DataTypes.IntegerType));
return callUDF("trimCategories", col(column));
}

关于返回整数数组的 Java Spark UDF 给我 ClassException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42791931/

相关文章:

java - 如何在 Retrofit2 中发布消息?

events - 扩展面板中的 Scala Swing react

python - Pyspark 多 groupby 具有不同的列

java - Full GC - Sun JVM 运行频率

java - Selenium 网络驱动程序 : not able to find the element on the 2nd page.

scala - Scala 的 Monocle 中的过滤列表

scala - Playframework:使用 Scalatags 而不是 Twirl

apache-spark - 从案例类生成 Spark StructType/Schema

scala - 使用 spark-submit 运行时无法加载 com.databricks.spark.csv

java - 什么时候使用Statement比PreparedStatement更好?