java - dataset.groupByKey() 中的 Spark CompileException

标签 java apache-spark apache-spark-dataset

我正在尝试使用 Spark 提取邮政编码前缀,但由于尝试使用 org.apache.spark.unsafe.types.UTF8String 作为参数来初始化 java.lang.Double,Spark 生成的代码无法编译。我不太清楚这是否是 Spark 的问题或者我如何使用它。我在本地模式下使用 Java 1.8 和spark-mllib_2.10。失败的代码:

public static void read(Dataset<ZipCodeLatLon> zipCodes) {
    zipCodes.groupByKey(new MapFunction<ZipCodeLatLon, String>() {
        @Override
        public String call(ZipCodeLatLon value) throws Exception {
            return value.getZip().substring(0, 3);
        }
    }, Encoders.STRING()).keys().show();
}

结果

由以下原因引起:org.codehaus.commons.compiler.CompileException:文件“generated.java”,第 50 行,第 58 列:找不到实际参数“org.apache.spark.unsafe.types.UTF8String”的适用构造函数/方法”;候选者是:“java.lang.Double(double)”、“java.lang.Double(java.lang.String)”

生成的代码很长,所以我不会把整个代码放在这里,但导致它失败的关键部分是:

private UTF8String argValue;
final alex.floyd.lc.geocode.ZipCodeLatLon value1 = false ? null : new alex.floyd.lc.geocode.ZipCodeLatLon();
...
public java.lang.Object apply(java.lang.Object _i) {
...
    resultIsNull = false;
    if (!resultIsNull) {
        boolean isNull3 = i.isNullAt(1);
        UTF8String value3 = isNull3 ? null : (i.getUTF8String(1));
        resultIsNull = isNull3;
        argValue = value3;
    }

    final java.lang.Double value2 = resultIsNull ? null : new java.lang.Double(argValue);
    javaBean.setLat(value2);
...
}

该错误似乎与 groupByKey 函数的返回类型无关(我尝试了 Integer 和 Java bean 而不是 String)。但是,如果我将输入数据集类型更改为其他类型(例如 String 而不是 ZipCodeLatLon),则此代码可以工作。然而,据我所知,ZipCodeLatLon 似乎遵循所有必需的 Java bean 约定,因此我不确定需要做什么来更改它。我还使用 Spark 从 CSV 读取 ZipCodeLatLon,因此 Spark 可以处理该类,但不是在 groupByKey 方法的上下文中。

public class ZipCodeLatLon implements Serializable{
private String zip;
private Double lat;
private Double lng;
public String getZip() {
    return zip;
}
public void setZip(String zip) {
    this.zip = zip;
}
public Double getLat() {
    return lat;
}
public void setLat(Double lat) {
    this.lat = lat;
}
public Double getLng() {
    return lng;
}
public void setLng(Double lng) {
    this.lng = lng;
}
}

一些附加信息:这似乎与从 CSV 读取 ZipCodeLatLon 的方式有关。手动创建数据集时,代码工作正常。

完全没问题:

ZipCodeLatLon l = new ZipCodeLatLon();
l.setZip("12345");
l.setLat(0.0);
l.setLng(0.0);
read(spark.createDataset(Lists.newArrayList(l, l), Encoders.bean(ZipCodeLatLon.class)));

完全损坏:

Dataset<ZipCodeLatLon> dataset = spark.read()
    .option("header", true)
    .csv(zipCodeData.getAbsolutePath())
    .as(Encoders.bean(ZipCodeLatLon.class));
dataset.show(); // works - reading in the CSV succeeds
read(dataset); // fails on groupByKey

最佳答案

想通了。您需要为 csv 读取器创建一个架构。我假设编码器会提供架构,但似乎没有。希望错误消息更有用!

之前:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) {
    return spark.read()
            .option("header", true)
            .csv(ZIP_CODE_DATA.getAbsolutePath())
            .as(Encoders.bean(ZipCodeLatLon.class));
}

之后:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) {
    return spark.read()
            .option("header", true)
            .option("inferSchema", "true")
            .csv(ZIP_CODE_DATA.getAbsolutePath())
            .as(Encoders.bean(ZipCodeLatLon.class));
}

关于java - dataset.groupByKey() 中的 Spark CompileException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41995843/

相关文章:

python - 如何查找数组的任何元素是否在 pyspark 的范围内

java - Spark 2.2.0 :How to prevent CountVectorizer giving NoneZeroVector VectorsUDT from Dataset

java - 如何将 JavaPairRDD 转换为数据集?

Java 变量!确实初始化

java - 在数组中搜索值的总和

java - Spring Data 存储库实际上是如何实现的?

java - 调度或重定向到其他 Activity 的 Activity

hadoop - Sparks作业卡在多节点 yarn 簇中

apache-spark - Spark 2.3.1 AWS EMR 不返回某些列的数据但适用于 Athena/Presto 和 Spectrum

java - Spark Java - 将多列收集到数组列中