amazon-web-services - 为什么我的 awsglue 作业仅使用一个执行程序和驱动程序?

标签 amazon-web-services pyspark aws-glue

在我的脚本中,我将 pyspark 中的所有 dynamicframe 转换为 dataframe,并执行 groupbyjoin手术。然后在matrics View 中,我发现无论我设置多少DPU,只有一个执行器处于事件状态。

作业在大约 2 小时后失败,

Diagnostics: Container [pid=8417,containerID=container_1532458272694_0001_01_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.7 GB of 27.5 GB virtual memory used. Killing container.

我有大约 20 亿行数据。我的 DPU 设置为 80。

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")


applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")

df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))

df1.join(df2, "fieldB")

result = DynamicFrame.fromDF(result_joined, glueContext, "result")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://test-bucket"}, format = "json", transformation_ctx = "datasink2")
job.commit()

我错过了什么吗?

最佳答案

尝试重新分区您的DataFrame。可以重新分区based on a column ,或to an arbitrary number of partitionsboth .

类似这样的事情:

df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))

df1_r = df1.repartition(df1("fieldB"))
df2_r = df2.repartition(df2("fieldB"))

df1_r.join(df2_r, "fieldB")

关于amazon-web-services - 为什么我的 awsglue 作业仅使用一个执行程序和驱动程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51508721/

相关文章:

python - 在sparksql中将日期从字符串转换为日期

amazon-web-services - AWS SQS : Which is the order of message in a FIFO queue after visibility-timeout expires?

mysql - 从 MySQL 迁移到 AWS DynamoDB? 5m 行,4 个表,1k 写入 p/s

sql - 如何在pyspark中将数据分成几组

pyspark - 应用窗口函数计算 pySpark 中的差异

amazon-web-services - 如何使用 CloudFormation 模板为 AWS Glue 作业创建脚本

apache-spark - 有没有一种方法可以使用 Spark 使用 TLS 在 FTP 中加载文件

amazon-web-services - 使用非标准分隔符粘合 CSV 的自定义分类器

javascript - S3 putObject 回调不返回预期的对象

python - 使用亚马逊的 API 查找产品的 UPC (Python)