Error "key group must belong to the backend" on restore

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Error "key group must belong to the backend" on restore

Gyula Fóra
Hi all!

I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well.

I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting.

I know this is a long shot but you might have encountered similar.

Thanks,
Gyula

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The key group must belong to the backend
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 more

or 

java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: Error "key group must belong to the backend" on restore

Stefan Richter
Hi,

I have seen the first exception in cases where the key had no proper and stable hash code method, e.g. when the key was an array. What the first exception basically means is that the backend received a key, which it does not expect because determined by the hash the key belongs to a key group for which the backend is not responsible. My guess would be: the hash of the object has changed between the time the checkpoint was taken and now.

Best,
Stefan

Am 22.06.2017 um 17:48 schrieb Gyula Fóra <[hidden email]>:

Hi all!

I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well.

I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting.

I know this is a long shot but you might have encountered similar.

Thanks,
Gyula

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The key group must belong to the backend
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 more

or 

java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: Error "key group must belong to the backend" on restore

Gyula Fóra
Thanks Stefan for the tip, in this case I have a Long key so it's unlikely that the hash code has changed. And as I mentioned I have several jobs with the same exact topolgy which run just fine after migration.

It is super weird... Maybe I am blind to some stupid error, so I'll keep looking.

Gyula

Stefan Richter <[hidden email]> ezt írta (időpont: 2017. jún. 22., Cs, 18:10):
Hi,

I have seen the first exception in cases where the key had no proper and stable hash code method, e.g. when the key was an array. What the first exception basically means is that the backend received a key, which it does not expect because determined by the hash the key belongs to a key group for which the backend is not responsible. My guess would be: the hash of the object has changed between the time the checkpoint was taken and now.

Best,
Stefan

Am 22.06.2017 um 17:48 schrieb Gyula Fóra <[hidden email]>:

Hi all!

I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well.

I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting.

I know this is a long shot but you might have encountered similar.

Thanks,
Gyula

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The key group must belong to the backend
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 more

or 

java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: Error "key group must belong to the backend" on restore

Stefan Richter
Hi,

I had a closer look at those exceptions now, and I would expect so see this in the case where there is suddenly a mismatch between the key-group range assigned to the keyed backend and the key-groups covered by the state handle we try to restore. For example like when the wrong state handle was sent to restore a task. What I would suggest for debugging before and after migrating, on restore, log the key group ranges of the keyed backends and the key group ranges of all the keyed state handles it receives for the restore. There should be some change between the original and the migrated one and we need to track down which of the two changed and how that change was introduced by the converting.

Best,
Stefan  

Am 22.06.2017 um 18:50 schrieb Gyula Fóra <[hidden email]>:

Thanks Stefan for the tip, in this case I have a Long key so it's unlikely that the hash code has changed. And as I mentioned I have several jobs with the same exact topolgy which run just fine after migration.

It is super weird... Maybe I am blind to some stupid error, so I'll keep looking.

Gyula

Stefan Richter <[hidden email]> ezt írta (időpont: 2017. jún. 22., Cs, 18:10):
Hi,

I have seen the first exception in cases where the key had no proper and stable hash code method, e.g. when the key was an array. What the first exception basically means is that the backend received a key, which it does not expect because determined by the hash the key belongs to a key group for which the backend is not responsible. My guess would be: the hash of the object has changed between the time the checkpoint was taken and now.

Best,
Stefan

Am 22.06.2017 um 17:48 schrieb Gyula Fóra <[hidden email]>:

Hi all!

I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well.

I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting.

I know this is a long shot but you might have encountered similar.

Thanks,
Gyula

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The key group must belong to the backend
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 more

or 

java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)


Reply | Threaded
Open this post in threaded view
|

Re: Error "key group must belong to the backend" on restore

Gyula Fóra

Hi,
Thanks for the suggestion, I will definitely try this over the weekend.

I wonder if trying to restore it with parallelism = 1 could magically solve this problem. Maybe that can give us some additional insights.

Cheers
Gyula


On Fri, Jun 23, 2017, 10:35 Stefan Richter <[hidden email]> wrote:
Hi,

I had a closer look at those exceptions now, and I would expect so see this in the case where there is suddenly a mismatch between the key-group range assigned to the keyed backend and the key-groups covered by the state handle we try to restore. For example like when the wrong state handle was sent to restore a task. What I would suggest for debugging before and after migrating, on restore, log the key group ranges of the keyed backends and the key group ranges of all the keyed state handles it receives for the restore. There should be some change between the original and the migrated one and we need to track down which of the two changed and how that change was introduced by the converting.

Best,
Stefan  

Am 22.06.2017 um 18:50 schrieb Gyula Fóra <[hidden email]>:

Thanks Stefan for the tip, in this case I have a Long key so it's unlikely that the hash code has changed. And as I mentioned I have several jobs with the same exact topolgy which run just fine after migration.

It is super weird... Maybe I am blind to some stupid error, so I'll keep looking.

Gyula

Stefan Richter <[hidden email]> ezt írta (időpont: 2017. jún. 22., Cs, 18:10):
Hi,

I have seen the first exception in cases where the key had no proper and stable hash code method, e.g. when the key was an array. What the first exception basically means is that the backend received a key, which it does not expect because determined by the hash the key belongs to a key group for which the backend is not responsible. My guess would be: the hash of the object has changed between the time the checkpoint was taken and now.

Best,
Stefan

Am 22.06.2017 um 17:48 schrieb Gyula Fóra <[hidden email]>:

Hi all!

I am wondering if anyone has any practical idea why I might get this error when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as well.

I have several almost exactly similar jobs (minor config differences) and all of them succeed except for this single job. I have seen similar error when trying to change max parallelism but that's not the case here. I am not changing any parallelism setting.

I know this is a long shot but you might have encountered similar.

Thanks,
Gyula

java.lang.IllegalStateException: Could not initialize keyed state backend.
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The key group must belong to the backend
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
	... 6 more

or 

java.lang.IllegalArgumentException: Key Group 56 does not belong to the local range.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)