pyspark - 为什么在胶水 pyspark ETL 作业中无法添加到 Parquet 表中的新列?

标签 pyspark parquet aws-glue

我们一直在探索使用 Glue 将一些 JSON 数据转换为 parquet。我们尝试过的一个场景是向 Parquet 表添加一列。所以分区 1 有 [A] 列,分区 2 有 [A,B] 列。然后我们想编写更多的 Glue ETL 作业来聚合 parquet 表,但新列不可用。使用 glue_context.create_dynamic_frame.from_catalog为了加载动态框架,我们的新列从未出现在架构中。

我们为我们的表格爬虫尝试了几种配置。所有分区使用单一架构,s3 路径使用单一架构,每个分区使用架构。我们总是可以看到 Glue 表数据中的新列,但如果我们使用 pyspark 从 Glue 作业中查询它,它总是为空。当我们下载一些样本并且可以通过 Athena 查询时,该列位于 Parquet 中。

为什么 pyspark 无法使用新列?

最佳答案

结果证明这是一个 Spark 配置问题。来自 the spark docs :

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

  1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true.


我们可以通过两种方式启用模式合并。
  • 在 spark session 中设置选项 spark.conf.set("spark.sql.parquet.mergeSchema", "true")
  • 套装mergeSchemaadditional_options 中为真加载动态帧时。

  • source = glueContext.create_dynamic_frame.from_catalog(
       database="db",
       table_name="table",
       additional_options={"mergeSchema": "true"}
    )
    

    之后,新列在框架的架构中可用。

    关于pyspark - 为什么在胶水 pyspark ETL 作业中无法添加到 Parquet 表中的新列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55585066/

    相关文章:

    pandas - Pyspark - 如何回填 DataFrame?

    python - 作为 newAPIHadoopRDD 加载的数据可以转换为 DataFrame 吗?

    apache-kafka - 如何在达到特定大小 (128 Mb) 时将 Kafka 消息提交到 HDFS 接收器

    amazon-web-services - 如何在cloudformation模板中生成并插入ssh公钥定义?

    sql - 在 Spark SQL 中查找表大小(以 MB/GB 为单位)

    python - 似乎无法初始化 Spark 上下文 (pyspark)

    python - 在 PySpark 中使用列对象代替字符串有什么优点

    datetime - Spark的int96时间类型

    apache-kafka - 如何配置 Kafka Connect Worker 将更多消息传输到 HDFS

    aws-glue - AWS 胶水 : Removing quote character from a CSV file while writing