algorithm - 基于时间跨度的 AWS Kinesis 流聚合

标签 algorithm amazon-web-services aggregate-functions amazon-kinesis

我目前有一个 Kinesis 流,其中填充了以下形式的 JSON 消息:

{"datetime": "2017-09-29T20:12:01.755z", "payload":"4"}
{"datetime": "2017-09-29T20:12:07.755z", "payload":"5"}
{"datetime": "2017-09-29T20:12:09.755z", "payload":"12"}
etc...

我在这里试图完成的是根据时间 block 聚合数据。在这种情况下,我想对 10 分钟跨度的平均值进行分组。例如,从12:00 > 12:10,我想对payload值进行平均并保存为12:10的值。

例如,上面的数据会产生:

Datetime: 2017-09-29T20:12:10.00z
Average: 7

我正在考虑的方法是在服务级别使用缓存,然后使用某种方式来跟踪时间。如果消息进入下一个 10 分钟时间跨度,我会平均缓存数据,将其存储到数据库中,然后删除该缓存值。

目前,我的服务每分钟会收到 20,000 条消息,预计 future 会收到更多消息。我对如何实现它以确保我从 Kinesis 获得那 10 分钟时间段内的所有值有点困惑。那些更熟悉 Kinesis 和 AWS 的人,是否有一种简单的方法来解决这个问题?

这样做的原因是为了缩短对大时间跨度(例如 1 年)数据的查询时间。我不想获取数百万个值,而是一些聚合值。

编辑:

我必须同时跟踪许多不同的平均值。例如,上面的 JSON 可能只属于一个“集合”,例如 10 分钟时间跨度内每个城市的平均温度。这需要我跟踪每个城市每个时间跨度的平均值。

Toronto (12:01 - 12:10): average_temp
New York (12:01 - 12:10): average_temp
Toronto (12:11 - 12:20): average_temp
New York (12:11 - 12:20): average_temp
etc...

这可能适用于全局任何城市。如果新温度到达,例如多伦多,并且它属于 12:01 - 12:10 时间跨度,我必须重新计算并存储该平均值。

最佳答案

这就是我要做的。感谢您提出有趣的问题。

Kinesis Streams --> Lambda(事件插入器)--> DynamoDB(流)--> Lambda(计数和值增量器)--> DynamoDB(流)--> 平均值(更新器)

DynamoDB 表结构:

{ 
Timestamp: 1506794597
Count: 3
TotalValue: 21
Average: 7
Event{timestamp}-{guid}: { event }
}

timestamp -- timestamp of the actual event
guid -- avoid any collision on a timestamp that occurred at same time
Event{timestamp}-{guid} -- This should be removed by (count and value incrementor)

如果该时间戳的第四条记录到达,

获取接近 10 分钟的时间跨度,增加计数,增加总值。永远不要读取值和增量,除非您使用强一致性(读取非常昂贵),否则会导致错误。而是使用原子增量 执行增量操作。

http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters

从上表创建 DynamoDB 流,监听另一个 lambda,现在计算平均值并更新值。

计算平均值时,不要从表中读取数据。相反,数据将在流中可用,您只需要计算平均值并更新它。 (覆盖之前的平均值)。

这将适用于任何规模且具有高可用性。

希望对您有所帮助。

编辑 1:

由于 OP 不熟悉 AWS 服务,

Lambda 文档:

https://aws.amazon.com/lambda/

DynamoDB 文档:

https://aws.amazon.com/dynamodb/

用于该解决方案的 AWS 云服务。

关于algorithm - 基于时间跨度的 AWS Kinesis 流聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46505247/

相关文章:

python - botocore.exceptions.ClientError : An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

mysql - 在 MySQL 中获取行直到满足特定条件

javascript - 406(力扣)。按高度重建队列

amazon-web-services - 如何覆盖 AWS Batch 作业中的 docker 镜像?

amazon-web-services - 如何在没有提示的情况下在没有交互的情况下自动在 Amazon AWS EC2 上运行 aws configure?

mysql - SQL 仅选择列上具有最大值的行

mysql - SQL 从表中选择 MAX 和 MIN 值

algorithm - 需要帮助迭代数组,检索两种可能性,不重复,用于 Poker AI

java - 有没有更好的方法来找到最小堆中节点之间的最短路径

algorithm - 领导者选举算法,重点