使用 Python 3.6 在 Amazon EMR 集群(1 个主节点、2 个节点)上运行 Spark 2.4.2
我正在读取 Amazon s3 中的对象,以 parquet 格式压缩它们,并将它们添加(附加)到现有的 parquet 数据存储中。当我在 pyspark shell 中运行代码时,我能够读取/压缩对象并将新的 parquet 文件添加到现有的 parquet 文件中,并且当我对 parquet 数据运行查询时,它显示所有数据都在 Parquet 文件夹。但是,当我在 EMR 集群上的某个步骤中运行代码时,现有的 parquet 文件将被新文件覆盖。相同的查询将显示只有新数据,并且包含 parquet 数据的 s3 文件夹只有新数据。
该步骤的关键代码如下:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我希望这会将 input_folder
中的数据附加到 parquet_folder
中的现有数据,但在 EMR 步骤中执行时会被覆盖。我尝试过在 .saveAsTable
中不使用 mode='append'
(在 pyspark shell 中不需要)。
建议?
最佳答案
我不知道为什么你的方法不起作用,但我使用 .parquet(path)
得到了更好的结果而不是.saveAsTable(...)
。我不知道这种行为的原因,但我没有看到saveAsTable
之前用于保存数据对象,因为它在 Hive 元存储中创建了一个表(不是“物理”数据对象)。
如果您的步骤通过 Apache Livy 运行,它们的行为可能与在 shell 上的行为不同。如果您确实使用 Livy,您可以在 Zeppelin 笔记本上测试您的代码,在您的代码单元上指示您应该使用 %livy-pyspark
运行它们。执行者。
关于python - EMR Spark 附加到 parquet 文件的步骤是覆盖 parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56970780/