我目前正在向 aws kinesis 流发送一系列 xml 消息,我一直在不同的项目中使用它,所以我非常有信心这部分有效。然后我编写了一个 lambda 来处理从 kinesis 流到 kinesis firehose 的事件:
import os
import boto3
import base64
firehose = boto3.client('firehose')
def lambda_handler(event, context):
deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']
# Send record directly to firehose
for record in event['Records']:
data = record['kinesis']['data']
response = firehose.put_record(
DeliveryStreamName=deliveryStreamName,
Record={'Data': data}
)
print(response)
我已将 kinesis 流设置为 lamdba 触发器,并将批处理大小设置为 1,起始位置为 LATEST。
对于 kinesis firehose,我有以下配置:
Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled
我发送了 162 个事件,并从 s3 中读取它们,我设法获得的最多 160 个事件,通常更少。我什至试图等待几个小时,以防在重试时发生奇怪的事情。
任何人都有使用 kinesis-> lamdba -> firehose 的经验,并看到数据丢失的问题?
最佳答案
从我在这里看到的情况来看,当您将数据发布到 Kinesis Stream(而不是 FireHose)时,很可能会丢失项目。
由于您使用的是 put_record
写入 FireHose 时,它将抛出异常,在这种情况下将重试 lambda。 (检查该级别是否存在故障是有意义的)。
因此,考虑到我可能会假设记录在到达 Kinesis 流之前丢失了。
如果您使用 put_records
将项目发送到 Kinesis 流方法,这并不能保证所有记录都将发送到流中(由于超出写入吞吐量或内部错误),某些记录可能无法发送。在这种情况下,失败的记录子集应该由您的代码重新发送(这是 Java example,抱歉我找不到 Python 的)。
关于amazon-web-services - 有没有人在使用 AWS kinesis 流、lambda 和 firehose 时遇到过数据丢失的情况?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44909875/