kubernetes - 在 kubernetes 上持续部署有状态的 apache flink 应用程序

标签 kubernetes apache-flink flink-streaming

我想在 kubernetes 上运行 apache flink (1.11.1) 流应用程序。使用文件系统状态后端保存到 s3。到 s3 的检查点工作正常

args:
  - "standalone-job"
    - "-s"
    - "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata"
    - "--job-classname"
    - "com.abc.def.MY_JOB"
    - "--kafka-broker"
    - "KAFKA_HOST:9092"

所以我面临的问题是:

  • 我必须手动选择以前的状态目录。有没有可能让它变得更好?
  • 作业递增 chk 目录但不使用检查点。意味着我在第一次看到一个事件时抛出一个新事件并将其存储到 ListState<String>每当我通过 Gitlab 部署更新版本的应用程序时,它都会再次引发此事件。
  • 当我已经将 state.backend 定义到文件系统时,为什么我必须在我的代码中显式启用检查点? env.enableCheckpointing(Duration.ofSeconds(60).toMillis());env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);

最佳答案

  • 您可能对 Ververica Platform: Community Edition 更满意,这将抽象级别提高到您不必在此级别处理细节的程度。它有一个专为 CI/CD 设计的 API。
  • 我不确定我是否理解您的第二点,但您的作业在恢复期间倒带和重新处理一些数据是正常的。 Flink 不保证 exactly once 处理,而是 exactly once 语义:每个事件都会影响 Flink 管理的状态 exactly once。这是通过回滚到最近检查点中的偏移量,并将所有其他状态回滚到使用完这些偏移量之前的所有数据后的状态来完成的。
  • 有一个状态后端是必要的,因为它可以在作业运行时存储作业的工作状态。如果不启用检查点,则工作状态不会被检查点,也无法恢复。但是,从 Flink 1.11 开始,您可以通过配置文件启用检查点,使用
execution.checkpointing.interval: 60000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

关于kubernetes - 在 kubernetes 上持续部署有状态的 apache flink 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63278123/

相关文章:

kubernetes - 如何从 Kubernetes 中的 secret 挂载单个文件?

docker - 如何在Docker容器和Kubernetes容器中获取应用程序指标

kubernetes - GCP:Kubernetes引擎可分配资源

java - pom.xml 中的依赖项在 flink kafka 连接器示例中不起作用

scala - Flink 通用 Avro 解串器 : override getProducedType

apache-flink - 什么可能导致 Apache Flink 作业中的屏障对齐持续时间过长?

apache-flink - Flink 任务槽在设置算子并行度大于默认并行度时分布不均匀

apache-flink - Flink State 可以替代外部数据库吗

kubernetes - 如何允许Kubernetes Job访问主机上的文件

java - 如何获取 Flink 中一条记录的 Kafka 时间戳?