Hello,
We have a task that fails to restart from a checkpoint with the following error: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /home/gluster/flink/checkpoints/fac589c7248186bda2ad7b711f174973/chk-1/a069f85e-4ceb-4fba-9308-fb238f31574f (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1290) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1477) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1333) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1512) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:979) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 common frames omitted It seems that it tries to restore the job using checkpoint number 1 (which was automatically deleted by flink), when the latest checkpoint is the 1620. And I can actually see how it logged that it would try to restore from checkpoint 1620: Found 1 checkpoints in ZooKeeper. Trying to retrieve checkpoint 1620. Restoring from latest valid checkpoint: Checkpoint 1620 @ 1511267100332 for fac589c7248186bda2ad7b711f174973. I have incremental checkpointing enabled, but I read many times that checkpoints do not reference themselves so I'm not sure what could be happening. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
where exactly did you read many times that incremental checkpoints cannot reference files from previous checkpoints, because we would have to correct that information. In fact, this is how incremental checkpoints work. Now for this case, I would consider it extremely unlikely that a checkpoint 1620 would still reference a checkpoint 1, in particular if the files for that checkpoint are already deleted, which should only happen if it is no longer referenced. Which version of Flink are you using and what is your distributed filesystem? Is there any way to reproduce the problem? Best, Stefan > Am 21.11.2017 um 14:30 schrieb gerardg <[hidden email]>: > > Hello, > > We have a task that fails to restart from a checkpoint with the following > error: > > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: > /home/gluster/flink/checkpoints/fac589c7248186bda2ad7b711f174973/chk-1/a069f85e-4ceb-4fba-9308-fb238f31574f > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1290) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1477) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1333) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1512) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:979) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 common frames omitted > > It seems that it tries to restore the job using checkpoint number 1 (which > was automatically deleted by flink), when the latest checkpoint is the 1620. > And I can actually see how it logged that it would try to restore from > checkpoint 1620: > > Found 1 checkpoints in ZooKeeper. > Trying to retrieve checkpoint 1620. > Restoring from latest valid checkpoint: Checkpoint 1620 @ 1511267100332 for > fac589c7248186bda2ad7b711f174973. > > I have incremental checkpointing enabled, but I read many times that > checkpoints do not reference themselves so I'm not sure what could be > happening. > > Gerard > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
> where exactly did you read many times that incremental checkpoints cannot
reference files from previous > checkpoints, because we would have to correct that information. In fact, > this is how incremental checkpoints work. My fault, I read it in some other posts in the mailing list but now that I read it carefully it meant savepoints not checkpoints. > Now for this case, I would consider it extremely unlikely that a > checkpoint 1620 would still reference a checkpoint 1, > in particular if the files for that checkpoint are already deleted, which > should only happen if it is no longer > referenced. Which version of Flink are you using and what is your > distributed filesystem? Is there any way to > reproduce the problem? We are using Flink version 1.3.2 and GlusterFS. There are usually a few checkpoints around at the same time, for example right now: chk-1 chk-26 chk-27 chk-28 chk-29 chk-30 chk-31 I'm not sure how to reproduce the problem but I'll monitor the folder to see when chk-1 gets deleted and try to make the task fail when that happens. Gerard Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Ok, thanks for trying to reproduce this. If possible, could you also activate trace-level logging for class org.apache.flink.runtime.state.SharedStateRegistry? In case the problem occurs, this would greatly help to understand what was going on.
> Am 21.11.2017 um 15:16 schrieb gerardg <[hidden email]>: > >> where exactly did you read many times that incremental checkpoints cannot > reference files from previous >> checkpoints, because we would have to correct that information. In fact, >> this is how incremental checkpoints work. > > My fault, I read it in some other posts in the mailing list but now that I > read it carefully it meant savepoints not checkpoints. > >> Now for this case, I would consider it extremely unlikely that a >> checkpoint 1620 would still reference a checkpoint 1, >> in particular if the files for that checkpoint are already deleted, which >> should only happen if it is no longer >> referenced. Which version of Flink are you using and what is your >> distributed filesystem? Is there any way to >> reproduce the problem? > > We are using Flink version 1.3.2 and GlusterFS. There are usually a few > checkpoints around at the same time, for example right now: > > chk-1 chk-26 chk-27 chk-28 chk-29 chk-30 chk-31 > > I'm not sure how to reproduce the problem but I'll monitor the folder to see > when chk-1 gets deleted and try to make the task fail when that happens. > > Gerard > > Gerard > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I've been monitoring the task and checkpoint 1 never gets deleted. Right now we have: chk-1 chk-1222 chk-326 chk-329 chk-357 chk-358 chk-8945 chk-8999 chk-9525 chk-9788 chk-9789 chk-9790 chk-9791 I made the task fail and it recovered without problems so for now I would say that the problem was with the distributed system or that somehow the chk-1 folder got deleted by something external to flink. If I see the problem again I will try to get more information. Thanks, Gerard On Tue, Nov 21, 2017 at 4:27 PM, Stefan Richter <[hidden email]> wrote: Ok, thanks for trying to reproduce this. If possible, could you also activate trace-level logging for class org.apache.flink.runtime. |
Free forum by Nabble | Edit this page |