amazon-web-services - 使用 Go SDK 检查 AWS Data Pipeline 的状态

标签 amazon-web-services go amazon-data-pipeline data-pipeline

情况:我有 2 个按需运行的数据管道。在流水线 A 完成之前,流水线 B 无法运行。我正在尝试在单个脚本/程序中自动运行两个管道,但我不确定如何在 Go 中执行所有这些操作。

我有一些激活数据管道的 Go 代码:

func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
    svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
    input := &datapipeline.ActivatePipelineInput{
        PipelineId: aws.String(pipelineID),
    }
    result, err := svc.ActivatePipeline(input)
    if err != nil {
        fmt.Println("error activating pipeline: ", err)
    }
    fmt.Println(result)
    return result, nil
}

激活后,我希望能够监控该管道并确定它何时完成,以便我可以运行第二个管道。类似于 list-runs CLI 命令,但我不确定对应的 Go 函数是什么。

$ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
       Name                                                Scheduled Start      Status                 
       ID                                                  Started              Ended              
---------------------------------------------------------------------------------------------------
   1.  EC2ResourceObj                                      2017-09-12T17:49:55  FINISHED               
       @EC2ResourceObj_2017-09-12T17:49:55                 2017-09-12T17:49:58  2017-09-12T17:56:52

   2.  Installation                                        2017-09-12T17:49:55  FINISHED               
       @Installation_@ShellCommandActivityObj_2017-09-12T  2017-09-12T17:49:57  2017-09-12T17:54:09

   3.  S3OutputLocation                                    2017-09-12T17:49:55  FINISHED               
       @S3OutputLocation_2017-09-12T17:49:55               2017-09-12T17:49:58  2017-09-12T17:54:50

   4.  ShellCommandActivityObj                             2017-09-12T17:49:55  FINISHED               
       @ShellCommandActivityObj_2017-09-12T17:49:55        2017-09-12T17:49:57  2017-09-12T17:54:49

因此,一旦所有操作都标记为“已完成”,我想激活我的第二个管道。实现此目标的最佳方法是什么?

最佳答案

仅供引用,以防其他人遇到此问题,这就是我解决此问题的方法:

Golang AWS API 调用来描述数据管道的对象/操作,如果所有对象都完成则返回 true

    func awsDescribeObjects(pipelineID, region string, objects []string) bool {
        var r Object
        var s []string
        var f bool
        svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
        input := &datapipeline.DescribeObjectsInput{
            PipelineId: aws.String(pipelineID),
            ObjectIds:  aws.StringSlice(objects),
        }
        result, err := svc.DescribeObjects(input)
        if err != nil {
            fmt.Println("error describing pipeline objects: ", err)
            f = false
            return f
        }
        //fmt.Println("original result: ", result)
        result2 := re.ReplaceAllString(result.String(), `"$1"$2`) //add "" around keys
        result3 := re1.ReplaceAllString(result2, `$3$2`)          //remove key and string/ref value from fields struct
        result4 := strings.Replace(result3, "@", "", -1)          //remove @ from keys and values
        result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`)    //remove "" from timestamps
        result6 := re3.ReplaceAllString(result5, `$1,`)           // remove {} from fields struct
        json.Unmarshal([]byte(result6), &r)
        // fmt.Printf("R: %+v\n", r)
        p := r.PipelineObjects
        // fmt.Printf("P: %+v\n", p)
        for i := range p {
            for m := range p[i].Fields {
                fmt.Printf("%v STATUS: %v\n", p[i].Name, p[i].Fields[m].Status)
                s = append(s, p[i].Fields[m].Status)
                if p[i].Fields[m].Status != "FINISHED" {
                    f = false
                } else {
                    f = true
                }
            }
            // fmt.Println("bool: ", f)
        }
        return f
    }

我的主要功能

    func main() {
        if *action == "describe" {
            obj := strings.Split(*object, ",")

            for i := 0; i <= 20; i++ {
                f := awsDescribeObjects(*pipeline, *region, obj)
                fmt.Printf("%v - Status Check %v - Finished?: %v\n", time.Now(), i, f)
                if f == true {
                    fmt.Println("FINISHED describing pipeline complete")
                    break
                }
                time.Sleep(5 * time.Minute)
                if i == 20 {
                    fmt.Println("TIME OUT - describe pipeline timed out, max time reached")
                    os.Exit(1)
                }
            }
        }
    }

带有 go 可执行文件的 Shell 脚本:

#PIPELINE 1
echo "Starting Pipeline 1..."
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
echo "Pipeline 1 complete"
#PIPELINE 2
echo "Starting Pipeline 2..."
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
echo "Pipeline 2 complete"
echo "FINISHED"

关于amazon-web-services - 使用 Go SDK 检查 AWS Data Pipeline 的状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46184057/

相关文章:

amazon-web-services - AWS Data Pipeline - 尝试重新运行失败的事件时出错

amazon-web-services - AWS EC2 负载均衡器 - 具体实例是否只是 "not available"?

amazon-web-services - 通过 SQS sam yaml 订阅 SNS

amazon-web-services - 如何从AWS ELB记录HTTP "referer"和 "user-agent"?

amazon-web-services - Amazon SQS Multi-Tenancy 和 HIPAA 合规性

json - 如何在golang中对特殊的json字符串进行base64编码?

windows - 如何使用Go创建的exe创建Windows服务?

encryption - 从 PHP 到 Go 的 Mcrypt

amazon-web-services - 将数据从 Excel 工作表导入到 DynamoDB 表

amazon-web-services - 按需运行的 AWS Data Pipeline 定价