aws-sdk - 如何从 NodeJS 应用程序 (lambda) 将记录放入 Kinesis 流

标签 aws-sdk amazon-kinesis

我有一个 lambda 函数需要将记录放入 Kinesis 流中。我没有收到错误(我知道),但消息似乎永远不会进入流...

我知道流本身是有效的,因为我可以使用 aws cli 应用程序向它推送消息。 我已经验证了传递给 putRecord() 函数的流名称和其他参数。

我正在使用这段代码将记录推送到流中:

const params = {
      Data: payload,
      PartitionKey: partitionKey,
      StreamName: this.streamName,
    };
    const res = await this.awsKinesis.putRecord(params);

res 是一个大而复杂的对象,但它包含error: null...

{
    "domain": null,
    "service": {
        "config": {
            "credentials": {
                "expired": false,
                "expireTime": null,
                "accessKeyId": "ASIAT5YUDX4OWH5FGFYE",
                "sessionToken": "FQoGZXIvYXdzEJr//////////wEaDFGkyxe1r9QSkkhSSyKKAgbrbB6ef77wtuCC4zIH3YB7C0xJPPoql1YtRGaxba5ZDSCwBBRSQ0cBeTPMmtdUqRGshdJjjLosON6QG0FGWdt3TNDrENxqFtxjrQAbCHXfIx3ARtnn6r2agZjXi9cGZhkdpvUMSIUpaC3ZC+E9wLLvkZyQBfTSsv6QdcoaKGqT8tJ9Px7Wp5BSV3Nw//NE0GtJwv0pXiQrb3c6p6GkETtAxBBVVgwJP1WYdF+kh+Gg24DxMPwwy66ayD6E7oZIWB4i7JaqMXHoDjf9D51bpWPUAVCKF9AVn3t4JiKFBVw7lFQC0m91N9HdcKLzGmjpvX4JJNzKwBA/D1TfALDsprrvU1u7r/RlyabzKIHtpeIF",
                "envPrefix": "AWS"
            },
            "credentialProvider": {
                "providers": [null, null, null, null]
            },
            "region": "eu-west-1",
            "logger": null,
            "apiVersions": {},
            "apiVersion": null,
            "endpoint": "kinesis.eu-west-1.amazonaws.com",
            "httpOptions": {
                "timeout": 120000
            },
            "maxRedirects": 10,
            "paramValidation": true,
            "sslEnabled": true,
            "s3ForcePathStyle": false,
            "s3BucketEndpoint": false,
            "s3DisableBodySigning": true,
            "computeChecksums": true,
            "convertResponseTypes": true,
            "correctClockSkew": false,
            "customUserAgent": null,
            "dynamoDbCrc32": true,
            "systemClockOffset": 0,
            "signatureVersion": "v4",
            "signatureCache": true,
            "retryDelayOptions": {},
            "useAccelerateEndpoint": false
        },
        "isGlobalEndpoint": false,
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "_clientId": 1
    },
    "operation": "putRecord",
    "params": {
        "Data": "<< THE MESSAGE >>",
        "PartitionKey": "c770e429-52e7-47c4-bcbc-497548ff9dee",
        "StreamName": "my-stream"
    },
    "httpRequest": {
        "method": "POST",
        "path": "/",
        "headers": {
            "User-Agent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
        },
        "body": "",
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "region": "eu-west-1",
        "_userAgent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
    },
    "startTime": "2019-01-24T08:25:42.574Z",
    "response": {
        "request": "~context",
        "data": null,
        "error": null,
        "retryCount": 0,
        "redirectCount": 0,
        "httpResponse": {
            "headers": {},
            "streaming": false,
            "stream": null
        },
        "maxRetries": 3,
        "maxRedirects": 10
    },
    "_asm": {
        "currentState": "validate",
        "states": {
            "validate": {
                "accept": "build",
                "fail": "error"
            },
            "build": {
                "accept": "afterBuild",
                "fail": "restart"
            },
            "afterBuild": {
                "accept": "sign",
                "fail": "restart"
            },
            "sign": {
                "accept": "send",
                "fail": "retry"
            },
            "retry": {
                "accept": "afterRetry",
                "fail": "afterRetry"
            },
            "afterRetry": {
                "accept": "sign",
                "fail": "error"
            },
            "send": {
                "accept": "validateResponse",
                "fail": "retry"
            },
            "validateResponse": {
                "accept": "extractData",
                "fail": "extractError"
            },
            "extractError": {
                "accept": "extractData",
                "fail": "retry"
            },
            "extractData": {
                "accept": "success",
                "fail": "retry"
            },
            "restart": {
                "accept": "build",
                "fail": "error"
            },
            "success": {
                "accept": "complete",
                "fail": "complete"
            },
            "error": {
                "accept": "complete",
                "fail": "complete"
            },
            "complete": {
                "accept": null,
                "fail": null
            }
        }
    },
    "_haltHandlersOnError": false,
    "_events": {
        "validate": [null, null, null, null],
        "afterBuild": [null, null, null],
        "restart": [null],
        "sign": [null],
        "validateResponse": [null],
        "send": [null],
        "httpHeaders": [null],
        "httpData": [null],
        "httpDone": [null],
        "retry": [null, null, null, null, null, null],
        "afterRetry": [null],
        "build": [null],
        "extractData": [null, null],
        "extractError": [null, null],
        "httpError": [null]
    }
}

我希望一条消息出现在 Kinesis 流中并触发流触发的 lambda,但这从未发生过。 即使在 Web 控制台中,Kinesis 流也不会在监控选项卡上显示任何事件。 我可能做错了什么?

最佳答案

根据 documentation返回值是 Request对象。

putRecord(params = {}, callback) ⇒ AWS.Request

您需要调用promise()关于这个Request反对以获得 promise ,然后您可以 await上。

const res = await this.awsKinesis.putRecord(params).promise();

关于aws-sdk - 如何从 NodeJS 应用程序 (lambda) 将记录放入 Kinesis 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54342555/

相关文章:

aws-sdk - 使用 AWS-SDK 将数据发送到 CloudWatch

amazon-web-services - 有没有办法知道 AWS AppConfig 中更新的配置?

javascript - 如何为 Amazon SES sendRawEmail 创建邮件正文

amazon-web-services - 自动电线运动流到运动消防水管?

amazon-web-services - 将Apache Camel集成到Amazon Kinesis Streaming Service

amazon-web-services - 仅针对特定分区键读取 Kinesis Stream 上的数据记录

node.js - DynamoDB createTable 如果不存在

java - AWS EC2 实例 : Launch Multiple Instance

amazon-web-services - 如何将 Spring Boot 应用程序中的数据输入 Amazon Kinesis?

amazon-redshift - 从 MySQL 到 Redshift 的近实时 ETL