我有一个 lambda 函数需要将记录放入 Kinesis 流中。我没有收到错误(我知道),但消息似乎永远不会进入流...
我知道流本身是有效的,因为我可以使用 aws cli 应用程序向它推送消息。 我已经验证了传递给 putRecord() 函数的流名称和其他参数。
我正在使用这段代码将记录推送到流中:
const params = {
Data: payload,
PartitionKey: partitionKey,
StreamName: this.streamName,
};
const res = await this.awsKinesis.putRecord(params);
res
是一个大而复杂的对象,但它包含error: null
...
{
"domain": null,
"service": {
"config": {
"credentials": {
"expired": false,
"expireTime": null,
"accessKeyId": "ASIAT5YUDX4OWH5FGFYE",
"sessionToken": "FQoGZXIvYXdzEJr//////////wEaDFGkyxe1r9QSkkhSSyKKAgbrbB6ef77wtuCC4zIH3YB7C0xJPPoql1YtRGaxba5ZDSCwBBRSQ0cBeTPMmtdUqRGshdJjjLosON6QG0FGWdt3TNDrENxqFtxjrQAbCHXfIx3ARtnn6r2agZjXi9cGZhkdpvUMSIUpaC3ZC+E9wLLvkZyQBfTSsv6QdcoaKGqT8tJ9Px7Wp5BSV3Nw//NE0GtJwv0pXiQrb3c6p6GkETtAxBBVVgwJP1WYdF+kh+Gg24DxMPwwy66ayD6E7oZIWB4i7JaqMXHoDjf9D51bpWPUAVCKF9AVn3t4JiKFBVw7lFQC0m91N9HdcKLzGmjpvX4JJNzKwBA/D1TfALDsprrvU1u7r/RlyabzKIHtpeIF",
"envPrefix": "AWS"
},
"credentialProvider": {
"providers": [null, null, null, null]
},
"region": "eu-west-1",
"logger": null,
"apiVersions": {},
"apiVersion": null,
"endpoint": "kinesis.eu-west-1.amazonaws.com",
"httpOptions": {
"timeout": 120000
},
"maxRedirects": 10,
"paramValidation": true,
"sslEnabled": true,
"s3ForcePathStyle": false,
"s3BucketEndpoint": false,
"s3DisableBodySigning": true,
"computeChecksums": true,
"convertResponseTypes": true,
"correctClockSkew": false,
"customUserAgent": null,
"dynamoDbCrc32": true,
"systemClockOffset": 0,
"signatureVersion": "v4",
"signatureCache": true,
"retryDelayOptions": {},
"useAccelerateEndpoint": false
},
"isGlobalEndpoint": false,
"endpoint": {
"protocol": "https:",
"host": "kinesis.eu-west-1.amazonaws.com",
"port": 443,
"hostname": "kinesis.eu-west-1.amazonaws.com",
"pathname": "/",
"path": "/",
"href": "https://kinesis.eu-west-1.amazonaws.com/"
},
"_clientId": 1
},
"operation": "putRecord",
"params": {
"Data": "<< THE MESSAGE >>",
"PartitionKey": "c770e429-52e7-47c4-bcbc-497548ff9dee",
"StreamName": "my-stream"
},
"httpRequest": {
"method": "POST",
"path": "/",
"headers": {
"User-Agent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
},
"body": "",
"endpoint": {
"protocol": "https:",
"host": "kinesis.eu-west-1.amazonaws.com",
"port": 443,
"hostname": "kinesis.eu-west-1.amazonaws.com",
"pathname": "/",
"path": "/",
"href": "https://kinesis.eu-west-1.amazonaws.com/"
},
"region": "eu-west-1",
"_userAgent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
},
"startTime": "2019-01-24T08:25:42.574Z",
"response": {
"request": "~context",
"data": null,
"error": null,
"retryCount": 0,
"redirectCount": 0,
"httpResponse": {
"headers": {},
"streaming": false,
"stream": null
},
"maxRetries": 3,
"maxRedirects": 10
},
"_asm": {
"currentState": "validate",
"states": {
"validate": {
"accept": "build",
"fail": "error"
},
"build": {
"accept": "afterBuild",
"fail": "restart"
},
"afterBuild": {
"accept": "sign",
"fail": "restart"
},
"sign": {
"accept": "send",
"fail": "retry"
},
"retry": {
"accept": "afterRetry",
"fail": "afterRetry"
},
"afterRetry": {
"accept": "sign",
"fail": "error"
},
"send": {
"accept": "validateResponse",
"fail": "retry"
},
"validateResponse": {
"accept": "extractData",
"fail": "extractError"
},
"extractError": {
"accept": "extractData",
"fail": "retry"
},
"extractData": {
"accept": "success",
"fail": "retry"
},
"restart": {
"accept": "build",
"fail": "error"
},
"success": {
"accept": "complete",
"fail": "complete"
},
"error": {
"accept": "complete",
"fail": "complete"
},
"complete": {
"accept": null,
"fail": null
}
}
},
"_haltHandlersOnError": false,
"_events": {
"validate": [null, null, null, null],
"afterBuild": [null, null, null],
"restart": [null],
"sign": [null],
"validateResponse": [null],
"send": [null],
"httpHeaders": [null],
"httpData": [null],
"httpDone": [null],
"retry": [null, null, null, null, null, null],
"afterRetry": [null],
"build": [null],
"extractData": [null, null],
"extractError": [null, null],
"httpError": [null]
}
}
我希望一条消息出现在 Kinesis 流中并触发流触发的 lambda,但这从未发生过。 即使在 Web 控制台中,Kinesis 流也不会在监控选项卡上显示任何事件。 我可能做错了什么?
最佳答案
根据 documentation返回值是 Request
对象。
putRecord(params = {}, callback) ⇒ AWS.Request
您需要调用promise()
关于这个Request
反对以获得 promise ,然后您可以 await
上。
const res = await this.awsKinesis.putRecord(params).promise();
关于aws-sdk - 如何从 NodeJS 应用程序 (lambda) 将记录放入 Kinesis 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54342555/