amazon-kinesis - 如何延迟处理 AWS Kinesis 消息?

标签 amazon-kinesis amazon-kcl

我想将来自 AWS Kinesis 流的消息处理延迟一小时。我已将 KCL 使用者配置为每四分钟读取一批记录,检查每条记录的时间戳,并在任何记录不到一小时的情况下停止处理该批处理,没有检查点。我希望同一个消费者实例每四分钟重新读取相同的消息,直到整个批处理都足够旧可以处理,然后为消费者设置检查点。然而,在实践中,消费者只读取一次消息,这意味着它们会被忽略,并且在准备好处理时再也不会读取。有没有办法配置消费者每次都重新读取上一个检查点的所有消息?

最佳答案

我会喜欢 AWS Kinesis Stream 开箱即用的类似功能(一种可以延迟事件传递的配置)。如果没有它,您可以通过一种方式延迟事件的处理,但代价是浪费计算。

使用 SQS(或 FIFO SQS,如果您关心事件排序)而不是 Kinesis,或使用 Kinesis Stream 上的 AWS Lambda 将事件传输到 SQS。 SQS 支持delaying the delivery of a message最多 15 分钟。由于您需要延迟 60 分钟,因此您可以运行另一个 Lambda(或您自己的 SQS 消费者)来处理消息。在第一次向 Lambda(或您的 SQS 消费者)发送消息时,不处理该消息,而只是设置 visibility timeout消息延迟 45 分钟(加起来需要延迟 60 分钟)。只有在您第二次收到 SQS 消息后才处理它。您可以查看一条消息之前传递了多少次,以决定是要处理还是跳过处理该消息。

关于amazon-kinesis - 如何延迟处理 AWS Kinesis 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59160627/

相关文章:

go - 如何在多个记录处理器之间平衡运动碎片?

amazon-web-services - 由于超过运动速率限制导致 Cloudformation 失败

pojo - Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO

amazon-web-services - TRIM_HORIZON与最新

amazon-web-services - AWS Kinesis Stream 检查点

java - 有没有办法使用 kinesis 中的样本记录?

streaming - 如何测量 DynamoDB Streams 的传播延迟?

java - 使用 Spring Boot 构建的 AWS Kinesis 消费者的集成测试

amazon-web-services - 同一 Kinesis 流的多个不同使用者

java - 将 Kinesis Client Library (KCL) 日志转储到文件