python - 如何使用 aws CDK 和 Python 部署层并将其附加到 aws lambda 函数

标签 python amazon-web-services aws-lambda aws-cdk

如何使用 aws CDK 将层部署和附加到 aws lambda 函数?

我需要一个简单的 cdk 代码来部署一个层并将其附加到 aws lambda 函数。

最佳答案

以下 aws CDK Python 代码部署一个层并将其附加到 aws lambda 函数。

由 yl.

项目目录结构

--+
  +-app.py
  +-cdk_layers_deploy.py
  +--/functions+
               +-testLambda.py
  +--/layers+
            +-custom_func.py

app.py文件

#!/usr/bin/env python3

import sys

from aws_cdk import (core)
from cdk_layers_deploy import CdkLayersStack

app = core.App()
CdkLayersStack(app, "cdk-layers")

app.synth()

cdk_layers_deploy 文件

from aws_cdk import (
    aws_lambda as _lambda,
    core,
    aws_iam)
from aws_cdk.aws_iam import PolicyStatement
from aws_cdk.aws_lambda import LayerVersion, AssetCode


class CdkLayersStack(core.Stack):

    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # 1) deploy lambda functions
        testLambda : _lambda.Function = CdkLayersStack.cdkResourcesCreate(self)

        # 2) attach policy to function
        projectPolicy = CdkLayersStack.createPolicy(self, testLambda)

    # -----------------------------------------------------------------------------------
    @staticmethod
    def createPolicy(this, testLambda:_lambda.Function) -> None:
        projectPolicy:PolicyStatement = PolicyStatement(
            effect=aws_iam.Effect.ALLOW,
            # resources=["*"],
            resources=[testLambda.function_arn],
            actions=[ "dynamodb:Query",
                      "dynamodb:Scan",
                      "dynamodb:GetItem",
                      "dynamodb:PutItem",
                      "dynamodb:UpdateItem",
                      "states:StartExecution",
                      "states:SendTaskSuccess",
                      "states:SendTaskFailure",
                      "cognito-idp:ListUsers",
                      "ses:SendEmail"
                    ]
        )
        return projectPolicy;
    # -----------------------------------------------------------------------------------
    @staticmethod
    def cdkResourcesCreate(self) -> None:
        lambdaFunction:_lambda.Function = _lambda.Function(self, 'testLambda',
                                                           function_name='testLambda',
                                                           handler='testLambda.lambda_handler',
                                                           runtime=_lambda.Runtime.PYTHON_3_7,
                                                           code=_lambda.Code.asset('functions'),
                                                           )
        ac = AssetCode("layers")
        layer  = LayerVersion(self, "l1", code=ac, description="test-layer", layer_version_name='Test-layer-version-name')
        lambdaFunction.add_layers(layer)

        return lambdaFunction

    # -----------------------------------------------------------------------------------

测试Lambda.py

# -------------------------------------------------
# testLambda
# -------------------------------------------------

import custom_func as cf  # this line is errored in pyCharm -> will be fixed on aws when import the layer

def lambda_handler(event, context):
    print(f"EVENT:{event}")
    ret = cf.cust_fun()
    return {
        'statusCode': 200,
        'body': ret
    }

custom_func.py - 图层函数

import datetime

def cust_fun():                           
    print("Hello from the deep layers!!")
    date_time = datetime.datetime.now().isoformat()
    print("dateTime:[%s]\n" % (date_time))

    return 1

关于python - 如何使用 aws CDK 和 Python 部署层并将其附加到 aws lambda 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59716366/

相关文章:

go - 构建一个多可执行的 Go 项目

python - 访问hadoop作业跟踪器ui

python - 查询币安智能链上的账户余额 - 解码bech32失败

amazon-web-services - Route 53 记录集未解析 Elastic Beanstalk 实例

amazon-web-services - AWS Cloudwatch 在动态标签上触发 Codepipeline

aws-lambda - 如何在 Cloudformation 的输出中显示 lambda 函数返回值

python - Python 的任何 'pretty' 数据可视化库?

python appengine授权或授权

amazon-web-services - 如果并行状态中的某个状态失败,如何继续执行 aws 中的步骤函数?

python - Python 中的 AWS Lambda : Import parent package/directory in Lambda function handler