amazon-s3 - AWS Glue - 从时间戳字段创建日期分区

标签 amazon-s3 pyspark etl aws-glue

有一个带有时间戳字段的数据框,如下所示:

<表类="s-表"> <头> 时间戳 编号 版本 <正文> 2022-01-01 01:02:00.000 1 2 2022-01-01 05:12:00.000 1 2

我创建了一个使用 ApplyMapping 的 Glue 作业将数据保存到新的 S3 位置。目前,我通过在可视化编辑器中选择这些字段添加了 idversion 分区,并且我的数据使用以下结构保存:id=1/version=2/我想解析时间戳并提取日期值,以便文件系统结构为 id=1/version=2/dt=2022-01-01/ .但是,在可视化编辑器中,我只能选择时间戳,不能对该字段执行任何操作。我猜我需要更改代码,但不确定如何更改。

代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://my-data"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("timestamp", "timestamp", "timestamp", "timestamp"),
        ("id", "string", "id", "string"),
        ("version", "string", "version", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping_node2,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

job.commit()

最佳答案

使用 Map Class .

将此方法添加到您的脚本中

def AddDate(rec):
    ts = str(rec["timestamp"])
    rec["dt"] = ts[:10]
    return rec

ApplyMapping 步骤之后插入 map 转换。

Mapped_dyF = Map.apply(frame = ApplyMapping_node2, f = AddDate)

更新写入 S3 步骤,注意 framepartitionKeys 的变化。

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=Mapped_dyF,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version", "dt"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

关于amazon-s3 - AWS Glue - 从时间戳字段创建日期分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72434323/

相关文章:

javascript - AWS SDK 预签名 URL + 分段上传

ruby-on-rails - 在 S3 上存储系统生成的 PDF

python - 如何将 json 对象列表转换为单个 pyspark 数据帧?

apache-spark - 在spark中,如何将一列聚合到频率图中,其中包含该列中的唯一值及其频率

apache-spark - PySpark PCA : how to convert dataframe rows from multiple columns to a single column DenseVector?

sql - "Data Load"或 "ETL"的工具——从 SQL Server 到 Amazon Redshift

hadoop - 使用 Hive、S3、EMR 和恢复分区加载数据

dns - 无法查看我的由 S3 和 Route 53 托管的静态网站

pentaho - pentaho 数据集成中新插入或更新的行数

c# - 在 SSIS 脚本任务中使用 c# 批量插入到 .CSV 目标文件