amazon-redshift - aws 胶水作业如何在 Redshift 中上传多个表

标签 amazon-redshift aws-glue

是否可以使用 AWS Glue 作业在 Redshift 中加载多个表?

这些是我遵循的步骤。

  • 从 S3 爬取 json,数据已转换为数据目录表。
  • 我创建了一个将在 redshift 中上传数据目录表的作业,但它只限制我为每个作业上传 1 个表。在作业属性(在添加作业中)中,我选择的此作业运行选项是:AWS Glue 生成的建议脚本。

  • 我不熟悉 python,我是 AWS Glue 的新手。但我有几个表需要上传。

    这是一个示例脚本:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [TempDir, JOB_NAME]
    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "sampledb", table_name = "abs", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
    ## @type: ApplyMapping
    ## @args: [mapping = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1"]
    ## @return: applymapping1
    ## @inputs: [frame = datasource0]
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1")
    ## @type: ResolveChoice
    ## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
    ## @return: resolvechoice2
    ## @inputs: [frame = applymapping1]
    resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
    ## @type: DropNullFields
    ## @args: [transformation_ctx = "dropnullfields3"]
    ## @return: dropnullfields3
    ## @inputs: [frame = resolvechoice2]
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    ## @type: DataSink
    ## @args: [catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
    ## @return: datasink4
    ## @inputs: [frame = dropnullfields3]
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
    job.commit()
    

    aws 胶水数据库:sampledb

    aws 胶中的表名:abs

    Redshift 数据库:dbmla

    请提供有关如何上传它们的示例。谢谢!

    最佳答案

    根据 AWS Glue 常见问题解答,您可以修改生成的代码并运行作业。

    Q: How can I customize the ETL code generated by AWS Glue?

    AWS Glue’s ETL script recommendation system generates Scala or Python code. It leverages Glue’s custom ETL library to simplify access to data sources as well as manage job execution. You can find more details about the library in our documentation. You can write ETL code using AWS Glue’s custom library or write arbitrary code in Scala or Python by using inline editing via the AWS Glue Console script editor, downloading the auto-generated code, and editing it in your own IDE. You can also start with one of the many samples hosted in our Github repository and customize that code.



    因此,请尝试将其他表的代码片段添加到相同的脚本中,如下所示,
    datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
    applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
    resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
    
    datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
    applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
    resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
    
    datasource3 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
    applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
    resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
    
    job.commit()
    

    相应地更改变量名称以使其唯一。谢谢

    关于amazon-redshift - aws 胶水作业如何在 Redshift 中上传多个表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50459840/

    相关文章:

    google-bigquery - 选择大数据仓库

    sql - Amazon Redshift 中的递归 CTE

    amazon-web-services - 获取 Redshift `STL_load_errors` 错误的表信息

    mysql - Coalesce 函数如何处理数据类型

    hive - 使用符号链接(symbolic link)格式 list 对 Delta Lake 表进行 Spark SQL 查询

    concurrency - AWS Athena 并发限制 : Number of submitted queries VS number of running queries

    python - AWS Glue ETL 作业缺少爬网程序可见的字段

    scala - 如何覆盖 awsglue 作业中的 couchbase 查询超时?

    sql - 如何在 redshift 上取消嵌套 json 字符串数组

    amazon-web-services - 使用 AWS Glue 时是否有可以访问的临时文件夹?