我会主动提出几点:
我们有一个相当直接的滑动窗口应用程序。键控流按特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。我们每 30 秒计算一次窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反射(reflect)该窗口中的事件,以便旧事件过期并且不占用内存。
有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都很完美。当一个 IP 在 24 小时内登录 20 万次时,事情就开始变得棘手了。在这一点上,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但是对于这种用户行为,检查点开始需要 5 分钟,然后是 10,然后是 15,然后是 30,然后是 40,等等。
令人惊讶的是,应用程序可以在这种情况下顺利运行一段时间。也许10或12个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有处理新事件等。
在这一点上,我已经尝试了一些事情:
(1) 没有做太多事情。
(2) 这似乎有所帮助,但随后又出现了比我们之前看到的更大的流量高峰,从而扼杀了任何好处
(3) 目前还不清楚这是否有帮助。我认为与您从 Yelp 或 Airbnb 中想象的相比,我们的应用程序内存占用相当小,它们都将 Flink 集群用于大规模应用程序,因此我无法想象我的状态真的有问题。
我会说我希望我们不必深刻改变应用程序输出的期望。这个滑动窗口是一个非常有值(value)的数据。
编辑:有人问我的状态看起来像一个 ValueState[FooState]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
编辑:
我想强调用户 David Anderson 在评论中所说的:
One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events.
这是必不可少的。对于试图走这条路的其他人,我找不到一个可行的解决方案,它不会将事件分成某个时间片。我的最终解决方案涉及将事件分成 30 秒的批次,然后按照大卫的建议将它们写入 map 状态。这似乎可以解决问题。对于我们的高负载时期,检查点保持在 3mb 并且它们总是在一秒钟内完成。
最佳答案
如果您有一个 24 小时长的滑动窗口,并且它会滑动 30 秒,那么每次登录都会分配给 2880 个单独窗口中的每一个。没错,Flink 的滑动窗口就是做副本的。在本例中为 24 * 60 * 2 份。
如果您只是计算登录事件,那么在窗口关闭之前无需实际缓冲登录事件。您可以改为使用 ReduceFunction
执行 incremental aggregation .
我的猜测是您没有利用这种优化,因此当您有一个热键(IP 地址)时,处理该热键的实例的数据量不成比例,并且需要很长时间来检查点。
另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地研究以尝试了解原因。
一种可能的补救方法是使用 ProcessFunction
实现您自己的滑动窗口。 .通过这样做,您可以避免维护 2880 个单独的窗口,并使用更有效的数据结构。
编辑(基于更新的问题):
我认为问题是这样的:使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每个状态访问和更新都必须通过 ser/de。这意味着您的 List[BarStateEvent]
正在反序列化,然后在每次修改时重新序列化。对于列表中包含 20 万个事件的 IP 地址,这将非常昂贵。
你应该做的是使用 ListState
或 MapState
.这些状态类型针对 RocksDB 进行了优化。 RocksDB 状态后端可以附加到 ListState
无需反序列化列表。并与 MapState
,映射中的每个键/值对都是一个单独的 RocksDB 对象,允许高效的查找和修改。
有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在 Flink docs 中有一个做类似事情的例子(但有翻滚的窗口)。 .
或者,如果您的状态可以放入内存,则可以使用 FsStateBackend。那么你的所有状态都将是 JVM 堆上的对象,而 ser/de 只会在检查点和恢复期间发挥作用。
关于apache-flink - 如何阻止高负载导致级联 Flink 检查点故障,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60591388/