python - EMR Spark 附加到 parquet 文件的步骤是覆盖 parquet 文件

标签 python apache-spark amazon-emr parquet

使用 Python 3.6 在 Amazon EMR 集群(1 个主节点、2 个节点)上运行 Spark 2.4.2

我正在读取 Amazon s3 中的对象,以 parquet 格式压缩它们,并将它们添加(附加)到现有的 parquet 数据存储中。当我在 pyspark shell 中运行代码时,我能够读取/压缩对象并将新的 parquet 文件添加到现有的 parquet 文件中,并且当我对 parquet 数据运行查询时,它显示所有数据都在 Parquet 文件夹。但是,当我在 EMR 集群上的某个步骤中运行代码时,现有的 parquet 文件将被新文件覆盖。相同的查询将显示只有新数据,并且包含 parquet 数据的 s3 文件夹只有新数据。

该步骤的关键代码如下:

    spark = SparkSession.builder \
                        .appName("myApp") \
                        .getOrCreate()

    df_p = spark.read \
                .format('parquet') \
                .load(parquet_folder)

    the_schema = df_p.schema

    df2 = spark.read \
               .format('com.databricks.spark.xml') \
               .options(rowTag='ApplicationSubmission', \
                        path=input_folder) \
               .schema(the_schema) \
               .load(input_folder+'/*.xml')

    df2.coalesce(10) \
       .write \
       .option('compression', 'snappy') \
       .option('path', parquet_folder) \
       .format('parquet') \
       .mode('append') \
       .saveAsTable(table_name, mode='append')

我希望这会将 input_folder 中的数据附加到 parquet_folder 中的现有数据,但在 EMR 步骤中执行时会被覆盖。我尝试过在 .saveAsTable 中不使用 mode='append' (在 pyspark shell 中不需要)。

建议?

最佳答案

我不知道为什么你的方法不起作用,但我使用 .parquet(path) 得到了更好的结果而不是.saveAsTable(...) 。我不知道这种行为的原因,但我没有看到saveAsTable之前用于保存数据对象,因为它在 Hive 元存储中创建了一个表(不是“物理”数据对象)。

如果您的步骤通过 Apache Livy 运行,它们的行为可能与在 shell 上的行为不同。如果您确实使用 Livy,您可以在 Zeppelin 笔记本上测试您的代码,在您的代码单元上指示您应该使用 %livy-pyspark 运行它们。执行者。

关于python - EMR Spark 附加到 parquet 文件的步骤是覆盖 parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56970780/

相关文章:

python - Python 中 numpy 数组上交换行的表示法

scala - Spark 将 RDD 拆分为 block 并连接

pyspark - pandas udf showString 简单示例错误

python - 在 OS/X 上的 python 中查找可用磁盘空间

python - 如何对tf.nn.embedding_lookup进行逆向操作?

Python selenium - 如何重复选择多个页面上的按钮 - 元素不可见

json - 使用 Scala 将 DataSet 转换为 Spark Json 数组

apache-spark - 如何将JavaPairRDD转换为HashMap

amazon-web-services - EMR 命令运行程序如何提交作业

java - AWS EMR - 从 java 代码获取主节点 ip