apache-spark - AWS Glue - 写入文件需要很长时间

标签 apache-spark pyspark aws-glue aws-glue-spark aws-glue3.0

您好,我在 AWS Glue 中有一个 ETL 作业需要很长时间才能编写。它从 S3 读取数据并执行一些转换(下面没有全部列出,但转换似乎不是问题所在),然后最后将数据帧写入 S3。但是,这个写入操作似乎需要很长时间。 一个大约 20 MB 的文件需要大约 30 分钟,即使我使用的是 10 个工作人员(工作人员类型 G.1X)。我已经使用 print 语句查看需要时间的内容,这似乎是将文件写入 S3 的最后一个操作。在使用相同类型的设置之前,我没有遇到过这个问题。

我使用的是 Glue 3.0 版、Python 3 版和 Spark 3.1 版。

源中的文件数量接近 50 000 个文件,分布在许多文件夹中,每天都会自动生成新文件。近似平均文件大小约为 10 KB

对这个问题有什么建议吗?

#Glue context & spark session
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
#Solves the issues with old datetime in the new version of Spark
spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.sql.legacy.parquet.int96RebaseModeInRead', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.int96RebaseModeInWrite', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.datetimeRebaseModeInRead', 'CORRECTED'), 
    ('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', 'CORRECTED')
    ])
conf = SparkConf().set('spark.sql.legacy.parquet.datetimeRebaseModeInRead','CORRECTED')
sc = SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate()
#sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#Source(/s) - create dynamic frame
dy = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": [
            "s3://.../files/abc/"
        ],
        "recurse": True,
        "groupFiles": "inPartition"
    },
    transformation_ctx="dy",
)

df = dy.toDF()

#Transformation(/s)
df_ready = df\
    .sort(['ID', 'timestamp'], descending=True)\
    .withColumn("timestamp_prev", 
                lag(df.timestamp)
                .over(Window()
                      .partitionBy("ID").orderBy('timestamp')))

df_ready.repartition(1).write.mode('overwrite').parquet("s3a://.../thisismywritefolder/df_ready/")

最佳答案

你最后是重新分区,这样Glue就不能并行写入了。如果删除重新分区,您应该会看到写入速度有所提高。

关于apache-spark - AWS Glue - 写入文件需要很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71658134/

相关文章:

python - 将日期从整数转换为日期格式

scala - 生成用于查找的单行数据框

python - 高效的字符串后缀检测

date - 由于时间戳记长度,从Spark到Elasticsearch写入日期时出错

python - AWS Glue完成后,如何执行SQL脚本或存储过程?

node.js - 使用 Node.JS 调用 AWSglue 的 lambda 函数没有 console.log 的原因是什么?

apache-spark - Spark UDAF : java. lang.InternalError:类名格式错误

apache-spark - 如何从 pyspark 连接到 Teradata?

apache-spark - 如何根据基于 Pyspark 中另一列的表达式的评估有条件地替换列中的值?

python - 将pyspark偏移滞后动态值检索到其他数据帧