amazon-web-services - 在 AWS Glue 中删除具有空值的行的问题

标签 amazon-web-services apache-spark pyspark amazon-redshift aws-glue

当前,AWS Glue 作业读取 S3 集合并将其写入 AWS Redshift 时遇到问题,我们在其中有一列带有 null值(value)观。

这项工作应该相当简单,并且大部分代码都是由 Glue 接口(interface)自动生成的,但是由于我们在 Redshift 中没有空列,这些列在我们的数据集中有时为空,我们无法完成这项工作。

代码的精简版本如下所示,代码在 Python 中,环境是 PySpark。

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_1", table_name = "table_1", transformation_ctx = "datasource0")

resolvedDDF = datasource0.resolveChoice(specs = [
  ('price_current','cast:double'),
  ('price_discount','cast:double'),
])

applymapping = ApplyMapping.apply(frame = resolvedDDF, mappings = [
  ("id", "string", "id", "string"), 
  ("status", "string", "status", "string"), 
  ("price_current", "double", "price_current", "double"), 
  ("price_discount", "double", "price_discount", "double"), 
  ("created_at", "string", "created_at", "string"), 
  ("updated_at", "string", "updated_at", "string"), 
], transformation_ctx = "applymapping")

droppedDF = applymapping.toDF().dropna(subset=('created_at', 'price_current'))

newDynamicDF = DynamicFrame.fromDF(droppedDF, glueContext, "newframe")

dropnullfields = DropNullFields.apply(frame = newDynamicDF, transformation_ctx = "dropnullfields")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields, catalog_connection = "RedshiftDataStaging", connection_options = {"dbtable": "dbtable_1", "database": "database_1"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

我们对 price_current 有一个非空约束和 created_at Redshift 中的表,并且由于我们系统中的一些早期错误,一些记录在没有所需数据的情况下到达了 S3 存储桶。我们只想删除这些行,因为它们只占要处理的整体数据的一小部分。

尽管 dropna代码我们仍然从 Redshift 得到以下错误。
Error (code 1213) while loading data into Redshift: "Missing data for not-null field"
Table name: "PUBLIC".table_1
Column name: created_at
Column type: timestampt(0)
Raw field value: @NULL@

最佳答案

如果您不想删除它们,可以传递默认值

df= dropnullfields.toDF()

df = df.na.fill({'price_current': 0.0, 'created_at': ' '})

dyf = DynamicFrame.fromDF(df,'glue_context_1')

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dyf, catalog_connection = "RedshiftDataStaging", connection_options = {"dbtable": "dbtable_1", "database": "database_1"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

如果您想删除 ,请使用以下代码代替 df.na.fill
df = df.na.drop(subset=["price_current", "created_at"])

关于amazon-web-services - 在 AWS Glue 中删除具有空值的行的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54713823/

相关文章:

python - 我如何在飞艇中使用 pyspark?

python - 如何对包含 R 函数的 pyspark RDD 进行分区

amazon-web-services - 通过传入自定义 header 使 Cloudfront 的缓存数据无效

java - 如何在 Spark 中按字段对数据进行分组?

apache-spark - 无法导入 org.apache.spark.sql.cassandra.CassandraSQLContext

apache-spark - Spark Streaming 调整每批大小的记录数不起作用?

amazon-web-services - AWS Cloudformation 有条件地添加资源属性

ios - 亚马逊 MWS 产品 API 返回 401 错误 "Access denied"

amazon-web-services - AWS 安全组和 IAM 角色

apache-spark - 在Hive-S3表的情况下pyspark命令行错误