我正在使用 RocksDB 作为我的 statebackend 运行 Flink 图。对于图中的连接运算符之一,我得到以下异常。 (实际组#s 当然因运行而异)。
java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
我的运营商是不是太如下
Source1 -----> Map1A ---> KeyBy--->\___ >
\----> Map1B ---> KeyBy--->-----> Join1AB ---->
\____>
Source2 ----->------------KeyBy---> -----------------> Join2,1AB ---->
在 Join2,1AB 运算符中抛出错误,该运算符将 (a) Join1AB 的结果与(键控的)source2 连接起来。
任何想法可能导致这种情况?我在下面有完整的堆栈跟踪,我知道这仍然非常模糊 - 但非常感谢任何指向正确方向的指针。
Caused by: java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:664)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:521)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:417)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
[CIRCULAR REFERENCE:java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.]
编辑 :如果我将状态后端更改为文件系统(即 FsStateBackend),则会得到以下堆栈跟踪。与键组索引有关的东西。
java.lang.IllegalArgumentException: Key group index out of range of key group range [43, 86).
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
at org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
<snip user code stack trace>
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:77)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
最佳答案
问题是我的数据对象 (POJO) 有一个可变的哈希码。具体来说,哈希码包含枚举。例如,如果我有一个汽车流,其中哈希码由汽车年份和汽车类型(枚举)组成,如下所示。
Car {
private final CarType carType;
private final int carYear
public long hashCode() {
int result = 17;
result = 31 * result + carYear;
result = 31 * result + carType.hasCode(); <---- This is mutable!
}
}
枚举的 hashCode 本质上是 Object.hashCode() (它依赖于内存地址)。随后,一台机器(或进程)上的 hashCode 将与另一台机器(或进程)上的 hashCode 不同。这也解释了为什么我只在分布式环境中运行而不是在本地运行时遇到这个问题。
为了解决这个问题,我将 hashCode() 更改为不可变的。执行 String.hashCode() 的性能很差,所以我可能需要优化它。但是下面的 Car 定义将解决这个问题。
Car {
private final CarType carType;
private final int carYear
public long hashCode() {
int result = 17;
result = 31 * result + carYear;
result = 31 * result + carType.name().hasCode(); <---- This is IMMUTABLE!
}
}
关于apache-flink - Flink 错误 - key 组不在 KeyGroupRange 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49140654/