Could not initialize keyed state backend.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Could not initialize keyed state backend.

PedroMrChaves
Hello,

The state of my RichWindowFunction operator fails to initialize upon a
failure. I see the following error in the log file:

/
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException:
/data/flink/a88ff1c2c919dede80ecc5c9f045865b/chk-17796/3b9ecbb2-ba40-4ad9-9589-09db61cd15d5
(No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
        at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
        at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
        at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
        at
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:423)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more/

Does anyone know what might be the issue?

Flink Version: 1.3.2
State backend: Filesystem


Regards,
Pedro.




-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves
Reply | Threaded
Open this post in threaded view
|

Re: Could not initialize keyed state backend.

Stefan Richter
Hi,

are your checkpoints going against a local filesystem or against a distributed filesystem that is reachable from all task managers. This exception can happen in the first case: imagine your task restarts on a different machine, how could it find a file that was local to a different machine?

Best,
Stefan

> Am 18.09.2017 um 11:40 schrieb PedroMrChaves <[hidden email]>:
>
> Hello,
>
> The state of my RichWindowFunction operator fails to initialize upon a
> failure. I see the following error in the log file:
>
> /
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException:
> /data/flink/a88ff1c2c919dede80ecc5c9f045865b/chk-17796/3b9ecbb2-ba40-4ad9-9589-09db61cd15d5
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
> at
> org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:423)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
> ... 6 more/
>
> Does anyone know what might be the issue?
>
> Flink Version: 1.3.2
> State backend: Filesystem
>
>
> Regards,
> Pedro.
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Could not initialize keyed state backend.

PedroMrChaves
Hello,

I thought that the checkpoints would be propagated to all the machines in
the cluster when using a local filesystem.

Thank you,
Regards.



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves
Reply | Threaded
Open this post in threaded view
|

Re: Could not initialize keyed state backend.

Stefan Richter
Hi,

that is not the case, and it also would not make too much sense if you think about restoring from a checkpoint in case of a machine failure. Is there a section in the Flink documentation that was confusing and has brought you to this assumption?

Best,
Stefan

> Am 18.09.2017 um 15:56 schrieb PedroMrChaves <[hidden email]>:
>
> Hello,
>
> I thought that the checkpoints would be propagated to all the machines in
> the cluster when using a local filesystem.
>
> Thank you,
> Regards.
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/