apache-flink - Flink 检查点导致背压

标签 apache-flink flink-streaming

我有一个 Flink 作业以大约 200k qps 的速度处理数据。没有检查点,作业运行良好。 但是,当我尝试添加检查点(间隔 50 分钟)时,它会在第一个任务(即为每个条目添加一个关键字段)中引起背压,数据滞后也会不断增加。 我的两个 Kafka 主题的滞后,上半场启用了检查点,滞后上升得非常快。第二部分(非常低的滞后是禁用检查点,滞后在毫秒内) enter image description here

我正在使用至少一次检查点模式,这应该是异步过程。有人可以建议吗? 我的检查点设置

    env.enableCheckpointing(1800000,
          CheckpointingMode.AT_LEAST_ONCE);
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      env.getCheckpointConfig()
          .enableExternalizedCheckpoints(
              CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      env.getCheckpointConfig()
          .setCheckpointTimeout(10min);
      env.getCheckpointConfig()
          .setFailOnCheckpointingErrors(
              jobConfiguration.getCheckpointConfig().getFailOnCheckpointingErrors());

我的工作有 128 个容器。

检查点时间为 10 分钟,统计数据如下: enter image description here

我正在尝试使用 30 分钟检查点并查看

我试图调整内存使用,但它似乎不起作用。 my settings

但在任务管理器中,它仍然是: enter image description here

最佳答案

TLDR;有时很难分析问题。我有两个幸运的猜测/镜头——如果你正在使用 RocksDB 状态后端,你可以切换到 FsStateBackend——它通常更快,而且 RocksDB 最适合大状态大小,不适合内存(或者如果你真的需要增量检查点功能).其次是摆弄并行性,增加或减少。

我会怀疑@ArvidHeise 写的同样的东西。您的检查点大小并不大,但也不是微不足道的。它可以增加额外的开销,使工作超过勉强跟上流量的阈值,而不是跟不上并导致背压。如果您处于背压之下,延迟将不断累积,因此即使额外开销发生几个 % 的变化也会在毫秒级的端到端延迟与无限增长的值(value)之间产生差异。

如果您不能简单地添加更多资源,则必须分析究竟是什么增加了这种额外开销,以及什么资源是瓶颈。

  1. 是 CPU 吗?检查集群上的 CPU 使用率。如果它是 ~100%,那就是你需要优化的东西。
  2. 是 IO 吗?检查集群上的 IO 使用情况,并将其与您可以实现的最大吞吐量/每秒请求数进行比较。
  3. 如果 CPU 和 IO 使用率都很低,您可能想尝试提高并行度,但是...
  4. 谨记数据偏差。背压可能是由单个任务引起的,在这种情况下很难分析问题,因为它将是单个瓶颈线程(在 IO 或 CPU 上),而不是整个机器。

在弄清楚瓶颈是什么资源之后,下一个问题就是为什么?一旦您看到它,它可能会立即显而易见,或者可能需要深入挖掘,例如检查 GC 日志、附加分析器等。

回答这些问题可以为您提供有关您可以在工作中尝试优化的信息,或者允许您调整配置,或者可以为我们(Flink 开发人员)提供一个额外的数据点,我们可以尝试在 Flink 方面进行优化。

关于apache-flink - Flink 检查点导致背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61311010/

相关文章:

java - apache flink 0.10 如何从无界输入数据流中获取第一次出现的复合键?

java - 使用 Apache Flink 从 Web 获取 JSON 元素

apache-flink - Apache 弗林克 : Merge two DataStreams with a CoFlatMapFunction

apache-flink - 弗林克 : Build is failing when I add gauge

log4j - 如何更改Flink的日志目录

amazon-web-services - Flink 到 dynamo Sink

hadoop - flink streaming job中如何读写HBase

apache-flink - 管理具有大量内存使用的状态 - 从存储中查询

parallel-processing - Apache 弗林克 : How to execute in parallel but keep order of messages?

unit-testing - 为 Flink SQL 添加单元测试