apache-flink - 如何阻止高负载导致级联 Flink 检查点故障

标签 apache-flink amazon-kinesis amazon-kinesis-analytics

我会主动提出几点:

  • 我是 Flink 的新手(现在使用它大约一个月了)
  • 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。从各方面来看,这并没有真正限制 Flink 的多功能性或容错选项,但无论如何我都会说出来。

  • 我们有一个相当直接的滑动窗口应用程序。键控流按特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。我们每 30 秒计算一次窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反射(reflect)该窗口中的事件,以便旧事件过期并且不占用内存。

    有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都很完美。当一个 IP 在 24 小时内登录 20 万次时,事情就开始变得棘手了。在这一点上,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但是对于这种用户行为,检查点开始需要 5 分钟,然后是 10,然后是 15,然后是 30,然后是 40,等等。

    令人惊讶的是,应用程序可以在这种情况下顺利运行一段时间。也许10或12个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有处理新事件等。

    在这一点上,我已经尝试了一些事情:
  • 向问题 throw 更多金属(自动缩放也已打开)
  • 对 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints 大惊小怪 https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  • 重构以减少我们存储的状态的占用空间

  • (1) 没有做太多事情。
    (2) 这似乎有所帮助,但随后又出现了比我们之前看到的更大的流量高峰,从而扼杀了任何好处
    (3) 目前还不清楚这是否有帮助。我认为与您从 Yelp 或 Airbnb 中想象的相比,我们的应用程序内存占用相当小,它们都将 Flink 集群用于大规模应用程序,因此我无法想象我的状态真的有问题。

    我会说我希望我们不必深刻改变应用程序输出的期望。这个滑动窗口是一个非常有值(value)的数据。

    编辑:有人问我的状态看起来像一个 ValueState[FooState]
    case class FooState(
                             entityType: String,
                             entityID: String,
                             events: List[BarStateEvent],
                             tableName: String,
                             baseFeatureName: String,
                           )
    
    case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
    

    编辑:
    我想强调用户 David Anderson 在评论中所说的:

    One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events.



    这是必不可少的。对于试图走这条路的其他人,我找不到一个可行的解决方案,它不会将事件分成某个时间片。我的最终解决方案涉及将事件分成 30 秒的批次,然后按照大卫的建议将它们写入 map 状态。这似乎可以解决问题。对于我们的高负载时期,检查点保持在 3mb 并且它们总是在一秒钟内完成。

    最佳答案

    如果您有一个 24 小时长的滑动窗口,并且它会滑动 30 秒,那么每次登录都会分配给 2880 个单独窗口中的每一个。没错,Flink 的滑动窗口就是做副本的。在本例中为 24 * 60 * 2 份。
    如果您只是计算登录事件,那么在窗口关闭之前无需实际缓冲登录事件。您可以改为使用 ReduceFunction 执行 incremental aggregation .
    我的猜测是您没有利用这种优化,因此当您有一个热键(IP 地址)时,处理该热键的实例的数据量不成比例,并且需要很长时间来检查点。
    另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地研究以尝试了解原因。
    一种可能的补救方法是使用 ProcessFunction 实现您自己的滑动窗口。 .通过这样做,您可以避免维护 2880 个单独的窗口,并使用更有效的数据结构。
    编辑(基于更新的问题):
    我认为问题是这样的:使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每个状态访问和更新都必须通过 ser/de。这意味着您的 List[BarStateEvent]正在反序列化,然后在每次修改时重新序列化。对于列表中包含 20 万个事件的 IP 地址,这将非常昂贵。
    你应该做的是使用 ListStateMapState .这些状态类型针对 RocksDB 进行了优化。 RocksDB 状态后端可以附加到 ListState无需反序列化列表。并与 MapState ,映射中的每个键/值对都是一个单独的 RocksDB 对象,允许高效的查找和修改。
    有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在 Flink docs 中有一个做类似事情的例子(但有翻滚的窗口)。 .
    或者,如果您的状态可以放入内存,则可以使用 FsStateBackend。那么你的所有状态都将是 JVM 堆上的对象,而 ser/de 只会在检查点和恢复期间发挥作用。

    关于apache-flink - 如何阻止高负载导致级联 Flink 检查点故障,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60591388/

    相关文章:

    apache-flink - 为什么我的 Flink 窗口使用这么多状态?

    jmx - 如何通过 JMX 远程连接 Flink?

    amazon-web-services - 将 AWS Lambda 数据推送到 Kinesis Stream

    java - 如何对基于 KinesisRecord 的 DoFn 进行单元测试?

    java - 在处理后,在 Flink 中将元素传回输入流?

    scala - 如何处理 Flink 的 Table API 窗口中的延迟元素?

    java - 无法使用 Flink Table API 打印 CSV 文件

    c# - 使用 .NET SDK 发布到 Amazon Web Services Kinesis 时出错

    amazon-web-services - 从HDFS将数据写入Kinesis流

    aws-cloudformation - 使用 cloudformation 启动 Kinesis 数据分析应用程序