java - Apache Spark 数据集 API - 不接受架构 StructType

标签 java csv apache-spark apache-spark-sql databricks

我有以下使用 Spark 数据 API 加载 headless CSV 文件的类。

我遇到的问题是我无法让 SparkSession 接受应该定义每一列的架构 StructType。结果数据框是字符串类型的未命名列

public class CsvReader implements java.io.Serializable {

public CsvReader(StructType builder) {
        this.builder = builder;
    }
private StructType builder;

SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession
        .read()
        .format("com.databricks.spark.csv")
        .option("header", false)
        //.option("inferSchema", true)
        .schema(builder)
        .load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg

public void printSchema() {
    System.out.println(builder.length());
    df.printSchema();
}

public void printData() {
    df.show();
}

public void printMeters() {
    df.select("meter").show();
}

public void printMeterCountByGeocode_result() {
    df.groupBy("geocode_result").count().show();
}

public Dataset getDataframe() {
            return df;
 }

}

生成的数据框架构是:

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)

调试器显示“构建器”StrucType 已正确定义:

0 = {StructField@4904} "StructField(geocode_result,DoubleType,false)"
1 = {StructField@4905} "StructField(meter,StringType,false)"
2 = {StructField@4906} "StructField(orig_easting,StringType,false)"
3 = {StructField@4907} "StructField(orig_northing,StringType,false)"
4 = {StructField@4908} "StructField(temetra_easting,StringType,false)"
5 = {StructField@4909} "StructField(temetra_northing,StringType,false)"
6 = {StructField@4910} "StructField(orig_address,StringType,false)"
7 = {StructField@4911} "StructField(orig_postcode,StringType,false)"
8 = {StructField@4912} "StructField(postcode_easting,StringType,false)"
9 = {StructField@4913} "StructField(postcode_northing,StringType,false)"
10 = {StructField@4914} "StructField(distance_calc_method,StringType,false)"
11 = {StructField@4915} "StructField(distance,StringType,false)"
12 = {StructField@4916} "StructField(geocoded_address,StringType,false)"
13 = {StructField@4917} "StructField(geocoded_postcode,StringType,false)"

我做错了什么?非常感谢任何帮助!

最佳答案

定义变量 Dataset<Row> df并将读取CSV文件的代码块移到getDataframe()中方法如下。

private Dataset<Row> df = null;

public Dataset getDataframe() {
    df = sparkSession
        .read()
        .format("com.databricks.spark.csv")
        .option("header", false)
        //.option("inferSchema", true)
        .schema(builder)
        .load("src/main/java/resources/test.csv"); //TODO: CMD line arg
        return df;
}

现在你可以像下面这样调用它。

    CsvReader cr = new CsvReader(schema);
    Dataset df = cr.getDataframe();
    cr.printSchema();

我建议您重新设计您的类(class)。一种选择是您可以将 df 作为参数传递给其他方法。如果您使用的是 Spark 2.0,则不需要 SparkConf。请引用documentation创建 SparkSession。

关于java - Apache Spark 数据集 API - 不接受架构 StructType,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43608183/

相关文章:

命令行上的java

javascript - 使用 CSV 或 JS 文件填充 HTML 表中的特定 TD

scala - Hive UDF 在 Scala 中处理整数数组

scala - 修改 Spark RDD foreach 中的集合

scala - 星火笔记本 : How can I filter rows based on a column value where each column cell is an array of strings?

java - Alfresco 不允许超过 1000 个 Activity session

java.sql.BatchUpdateException : Table doesn't exist

java - HttpServletRequest#getRemoteAddr() 返回 NULL

ruby-on-rails - 有什么方法可以确定 csv 是否带有标题或不在 rails 中

python - Pandas - 两列作为索引(有效时间和行号)?