Hi all! I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well. I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting. I know this is a long shot but you might have encountered similar. Thanks, Gyula java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: The key group must belong to the backend at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more or java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745) |
Hi,
I have seen the first exception in cases where the key had no proper and stable hash code method, e.g. when the key was an array. What the first exception basically means is that the backend received a key, which it does not expect because determined by the hash the key belongs to a key group for which the backend is not responsible. My guess would be: the hash of the object has changed between the time the checkpoint was taken and now. Best, Stefan
|
Thanks Stefan for the tip, in this case I have a Long key so it's unlikely that the hash code has changed. And as I mentioned I have several jobs with the same exact topolgy which run just fine after migration. It is super weird... Maybe I am blind to some stupid error, so I'll keep looking. Gyula Stefan Richter <[hidden email]> ezt írta (időpont: 2017. jún. 22., Cs, 18:10):
|
Hi,
I had a closer look at those exceptions now, and I would expect so see this in the case where there is suddenly a mismatch between the key-group range assigned to the keyed backend and the key-groups covered by the state handle we try to restore. For example like when the wrong state handle was sent to restore a task. What I would suggest for debugging before and after migrating, on restore, log the key group ranges of the keyed backends and the key group ranges of all the keyed state handles it receives for the restore. There should be some change between the original and the migrated one and we need to track down which of the two changed and how that change was introduced by the converting. Best, Stefan
|
Hi, I wonder if trying to restore it with parallelism = 1 could magically solve this problem. Maybe that can give us some additional insights. Cheers On Fri, Jun 23, 2017, 10:35 Stefan Richter <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |