apache-flink - 无法将保存点从 1.2.1 恢复到 1.4

标签 apache-flink flink-streaming

我们部署了版本 1.4 的新 Flink 实例。 在尝试从旧的 1.2.1 部署中恢复保存点时,我们尝试恢复的所有作业都遇到相同的错误:

org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1360)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1350)
    ... 10 more

错误信息:

从 Flink 1.4 开始,不再支持旧状态(从 Flink <= 1.1 开始,通过“检查点”接口(interface)创建)。请重写您的作业以使用“CheckpointedFunction”!

不过,这似乎是错误的,因为我们的其他部署正在运行 1.2.1。

1.4 的文档页面仍未更新:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html但似乎并行性过去一直是一个问题。我尝试使用与保存点即将到来的作业相同的方法,但仍然存在相同的问题。

有关可能导致此问题的原因以及如何解决该问题的任何提示吗?

谢谢!

最佳答案

在 1.4.0 版本中,Flink 不再支持从使用 Checkpointed 接口(interface)获取的状态进行恢复。为了进行有状态升级,您必须执行以下操作:

  1. 保存在 Flink 1.2.1 上运行的作业的保存点
  2. 在所有有状态函数中将 Checkpointed 替换为 CheckpointedFunction
  3. 实现 CheckpointedRestoring 接口(interface)以从 Checkpointed 保存点恢复
  4. 在 Flink 1.2.1 上执行修改后的作业并获取第二个保存点
  5. 从所有有状态函数中删除 CheckpointedRestoring 接口(interface)
  6. 在 Flink 1.4.0 上使用第二个保存点运行修改后的作业

如果在迁移您的作业时还存在其他问题,请告诉我。

关于apache-flink - 无法将保存点从 1.2.1 恢复到 1.4,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48098737/

相关文章:

apache-flink - Flink中为什么DataStream不支持聚合

apache-flink - Flink TwoPhaseCommitSinkFunction

apache-kafka - flink kafka 消费者组 ID 不起作用

java - 使用 cassandra 数据库查询作为 Flink 程序的源

apache-flink - 弗林克 : Stateful stream processing by key

apache-flink - Flink大尺寸/小尺寸推进滑动窗口性能

apache-flink - 如何对flink无水印的union数据流进行排序

apache-spark - 在无限流中计数不同

apache-flink - 在 Flink 中解析 JSON 时如何处理异常

java - Apache Flink CEP如何根据事件值传入时间窗口?