amazon-web-services - 通过AWS数据管道将json文件导入到DynamoDB

标签 amazon-web-services amazon-s3 amazon-dynamodb amazon-data-pipeline

我正在尝试弄清楚如何创建一个 AWS 数据管道,该管道可以从 S3 获取 json 文件并将其导入 DynamoDB 表中。我能够创建一些java代码来实现这一点,但我想通过数据管道来完成它。我可以看到有用于从 DynamoDB 导出到 S3 并导入备份的模板,但我正在努力弄清楚如何导入纯 json 文件。

最佳答案

在文档中,您将找到从 DynamoDb ( http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html ) 导入和导出数据的示例

文档中的描述如下:

To create the pipeline

Open the AWS Data Pipeline console at

The first screen that you see depends on whether you've created a pipeline in the current region.

If you haven't created a pipeline in this region, the console displays an introductory screen. Choose Get started now.

If you've already created a pipeline in this region, the console displays a page that lists your pipelines for the region. Choose Create new pipeline.

In Name, enter a name for your pipeline.

(Optional) In Description, enter a description for your pipeline.

For Source, select Build using a template, and then select the following template: Import DynamoDB backup data from S3.

Under Parameters, set Input S3 folder to s3://elasticmapreduce/samples/Store/ProductCatalog, which is a sample data source, and set DynamoDB table name to the name of your table.

Under Schedule, choose on pipeline activation.

Under Pipeline Configuration, leave logging enabled. Choose the folder icon under S3 location for logs, select one of your buckets or folders, and then choose Select.

If you prefer, you can disable logging instead.

Under Security/Access, leave IAM roles set to Default.

Click Edit in Architect.

Next, configure the Amazon SNS notification actions that AWS Data Pipeline performs depending on the outcome of the activity.

To configure the success and failure actions

In the right pane, click Activities.

From Add an optional field, select On Success.

From the newly added On Success, select Create new: Action.

From Add an optional field, select On Fail.

From the newly added On Fail, select Create new: Action.

In the right pane, click Others.

For DefaultAction1, do the following:

Change the name to SuccessSnsAlarm.

From Type, select SnsAlarm.

In Topic Arn, enter the ARN of the topic that you created.

Enter a subject and a message.

For DefaultAction2, do the following:

Change the name to FailureSnsAlarm.

From Type, select SnsAlarm.

In Topic Arn, enter the ARN of the topic that you created.

Enter a subject and a message.

公共(public) github 站点提供了一些使用 DynamoDB 的示例 ( https://github.com/awslabs/data-pipeline-samples )。以下是管道定义的示例:

{
    "objects": [
        {
            "occurrences": "1",
            "period": "1 Day",
            "name": "RunOnce",
            "id": "DefaultSchedule",
            "type": "Schedule",
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
        "maxActiveInstances" : "1"
        },
        {
            "failureAndRerunMode": "CASCADE",
            "schedule": {
                "ref": "DefaultSchedule"
            },
            "resourceRole": "DataPipelineDefaultResourceRole",
            "role": "DataPipelineDefaultRole",
            "pipelineLogUri": "s3://",
            "scheduleType": "cron",
            "name": "Default",
            "id": "Default"
        },
        {
            "maximumRetries": "2",
            "name": "TableBackupActivity",
            "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
            "id": "TableBackupActivity",
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "type": "EmrActivity"
        },
        {
            "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "id": "EmrClusterForBackup",
            "type": "EmrCluster",
            "masterInstanceType": "m1.medium",
            "coreInstanceType": "#{myInstanceType}",
            "coreInstanceCount": "#{myInstanceCount}",
        "terminateAfter" : "12 hours"
        }
    ],
    "parameters": [
        {
            "description": "OutputS3folder",
            "id": "myOutputS3Loc",
            "type": "AWS::S3::ObjectKey"
        },
        {
            "default": "0.2",
            "watermark": "Valuebetween0.1-1.0",
            "description": "DynamoDB Read Throughput Ratio",
            "id": "myDDBReadThroughputRatio",
            "type": "Double"
        },
        {
            "description": "DynamoDB Table Name",
            "id": "myDDBTableName",
            "type": "String"
        },
        {
            "description": "Instance Type",
            "id": "myInstanceType",
            "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
            "type": "String",
            "default": "m3.xlarge"
        },
        {
            "description": "Instance Count",
        "watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
            "id": "myInstanceCount",
            "type": "Integer",
            "default": "1"
        },
    {
        "description" : "Burst IOPs",
        "watermark"   : "Add IOPS to the DDB table by this percent for the duration of the export job",
            "id"          : "myBurstIOPS",
            "type"     :    "Double",
            "default"     : "0.0"
    }
    ]
}

关于amazon-web-services - 通过AWS数据管道将json文件导入到DynamoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36741395/

相关文章:

amazon-web-services - AWS cloudwatch中的metrics数据维护多久

java - 如何将AWS SDK添加到OSGi环境

python - Tensorflow - S3 对象不存在

git - 将 GIT 和 CDN 用于静态内容

java - "Unable to execute HTTP Request: Broken Pipe"与 Amazon EMR 上的 Hadoop/s3

ios - 使用 Swift 使用 AWS sdk v2 上传 S3 视频失败

amazon-s3 - 如何将树莓派拍摄的图像上传到 AWS IoT

javascript - Dynogels:使用 OR 比较查询

java - 具有多个 In 条件的 Spring CrudRepository 查询

amazon-dynamodb - DynamoDB放置项目ConditionalCheckFailedException