java - Flink 状态架构迁移

标签 java apache-flink flink-streaming kryo

我在使用 MemoryStateBackend 的独立集群上有一个 flink 流应用程序。 Kryo 的 TaggedFieldSerializer 被用作默认序列化器。

当我更改状态架构并重新部署应用程序时,出现以下异常

Caused by: org.apache.flink.util.StateMigrationException: State migration isn't supported, yet.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:209)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:142)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createValueState(HeapKeyedStateBackend.java:234)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:315)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:312)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:392)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)

如果有人建议我解决这个问题或者我应该使用 FsStateBackend 来解决这个问题,这将非常有帮助。

附注如果我想将 S3 上的 FsStateBackend 用于在独立集群上运行的 Flink 应用程序,则必须进行哪些配置更改。

最佳答案

使用 FsStateBackend 无法解决此问题,因为它还在幕后使用 HeapKeyedStateBackend,这就是引发此异常的原因。

FLIP-22帮助解决状态迁移问题,但尚未实现。

目前我听说过的最佳选择是使用基于 Avro 的序列化器,因为它可以实现无缝处理新旧模式。但这不适合胆小的人。

关于FsStateBackend配置,请参见(写得很好的)文档here .

关于java - Flink 状态架构迁移,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49205094/

相关文章:

apache-flink - 在 Flink 中使用计数器获取 numOfRecordsIn

Java - 结果集 getString() 不一致

java - 如何将 JavaFX 导入 Eclipse?

java - Apache Flink 中的 TimeCharacteristics 和 TimerService

scala - 使用Flink中的RollingSink将用Avro序列化的对象写入HDFS [Scala]

apache-flink - 需要有关从 Flink DataStream Job 迁移到 Flink Stateful Functions 3.1 的建议

java - Android Studio 中的模块图标是什么意思?

java - JSF 复合组件和 selectItems

java - Apache Flink : transforming Broadcast variables fails, 但我无法确定原因

java - Flink Scala ClassNotFoundException : org. apache.flink.api.common.typeinfo.TypeInformation