apache-spark - 为什么Apache Spark会读取嵌套结构中不必要的Parquet列?

标签 apache-spark spark-dataframe parquet

我的团队正在构建ETL流程,以使用Spark将原始的定界文本文件加载到基于Parquet的“数据湖”中。 Parquet列存储的 promise 之一是查询将仅读取必要的“列条纹”。

但是,我们看到正在读取嵌套模式结构的意外列。

为了演示,这是使用Scala和Spark 2.0.1 shell的POC:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")

然后,我们将文件读回到DataFrame并投影到列的子集:
// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show

当运行时,我们看到预期的输出:
+---+-------+
| F1|Orig_F1|
+---+-------+
|  1|      1|
|  3|      3|
+---+-------+

但是...查询计划显示的故事略有不同:

“优化计划”显示:
val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet

并且“说明”显示:
projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>

执行期间生成的INFO日志还确认Orig.F2列被意外读取:
16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema {
  optional int32 F1;
  optional group Orig {
    optional binary F1 (UTF8);
    optional binary F2 (UTF8);
  }
}

Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))

根据Dremel paperParquet documentation,用于复杂嵌套结构的列应独立存储和独立检索。

问题:
  • 此行为是当前Spark查询引擎的限制吗?换句话说,Parquet是否支持最佳地执行此查询,但是Spark的查询计划程序是幼稚的?
  • 还是这是当前Parquet实现的限制?
  • 还是我没有正确使用Spark API?
  • 还是我误解了Dremel / Parquet列存储应该如何工作?

  • 可能相关:Why does the query performance differ with nested columns in Spark SQL?

    最佳答案

    目前这是对Spark查询引擎的限制,相关的JIRA票证在下面,spark仅处理Parquet中简单类型的谓词下推,而不处理嵌套的StructTypes

    https://issues.apache.org/jira/browse/SPARK-17636

    关于apache-spark - 为什么Apache Spark会读取嵌套结构中不必要的Parquet列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40185039/

    相关文章:

    hadoop - Spark on Hive 进度条停留在 10%

    parquet - 无法从命令行获取 Parquet 工具

    twitter - 启动 Spark 流上下文时出错

    apache-spark - Hive 分区、Spark 分区和 Spark 中的连接 - 它们之间的关系

    pandas - 类型错误 : field Customer: Can not merge type <class 'pyspark.sql.types.StringType' > and <class 'pyspark.sql.types.DoubleType' >

    spark-dataframe - 如何在每个执行器节点收集 Spark 数据帧?

    apache-spark - 使用 Spark Dataframe API 计算列中的特定字符

    java - 如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?

    parquet - AWS 胶水作业 : Command failed with error code 1

    hadoop - 使用 Apache Spark Streaming 和 Dataframes 交互式搜索 Parquet 存储的数据