How to gracefully handle job recovery failures

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to gracefully handle job recovery failures

Li Peng-2
Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:

2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 common frames omitted
Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
... 7 common frames omitted
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
... 8 common frames omitted

We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?

Thanks!
Li




Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully handle job recovery failures

Roman Khachatryan
Hi Li,

The missing file is a serialized job graph and the job recovery can't
proceed without it.
Unfortunately, the cluster can't proceed if one of the jobs can't recover.

Regards,
Roman

On Thu, Jun 10, 2021 at 6:02 AM Li Peng <[hidden email]> wrote:

>
> Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:
>
> 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
> at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ... 3 common frames omitted
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
> at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
> ... 7 common frames omitted
> Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
> at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
> at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
> at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
> at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
> at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
> ... 8 common frames omitted
>
> We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?
>
> Thanks!
> Li
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully handle job recovery failures

Li Peng-2
Hi Roman,

Is there a way to abandon job recovery after a few tries? By that I mean that this problem was fixed by me restarting the cluster and not try to recover a job. Is there some setting that emulates what I did, so I don't need to do manual intervention if this happens again??

Thanks,
Li

On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <[hidden email]> wrote:
Hi Li,

The missing file is a serialized job graph and the job recovery can't
proceed without it.
Unfortunately, the cluster can't proceed if one of the jobs can't recover.

Regards,
Roman

On Thu, Jun 10, 2021 at 6:02 AM Li Peng <[hidden email]> wrote:
>
> Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:
>
> 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
> at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ... 3 common frames omitted
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
> at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
> at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
> ... 7 common frames omitted
> Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
> at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
> at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
> at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
> at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
> at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
> ... 8 common frames omitted
>
> We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?
>
> Thanks!
> Li
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully handle job recovery failures

Roman Khachatryan
Hi Li,

If I understand correctly, you want the cluster to proceed recovery,
skipping some non-recoverable jobs (but still recover others).
The only way I can think of is to remove the corresponding nodes in
ZooKeeper which is not very safe.

I'm pulling in Robert and Till who might know better.

Regards,
Roman


On Thu, Jun 10, 2021 at 8:56 PM Li Peng <[hidden email]> wrote:

>
> Hi Roman,
>
> Is there a way to abandon job recovery after a few tries? By that I mean that this problem was fixed by me restarting the cluster and not try to recover a job. Is there some setting that emulates what I did, so I don't need to do manual intervention if this happens again??
>
> Thanks,
> Li
>
> On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi Li,
>>
>> The missing file is a serialized job graph and the job recovery can't
>> proceed without it.
>> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 10, 2021 at 6:02 AM Li Peng <[hidden email]> wrote:
>> >
>> > Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:
>> >
>> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
>> > java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> > at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > at java.base/java.lang.Thread.run(Thread.java:834)
>> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> > at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>> > ... 3 common frames omitted
>> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>> > ... 7 common frames omitted
>> > Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
>> > at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>> > at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
>> > ... 8 common frames omitted
>> >
>> > We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?
>> >
>> > Thanks!
>> > Li
>> >
>> >
>> >
>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully handle job recovery failures

Till Rohrmann
Hi Li,

Roman is right about Flink's behavior and what you can do about it. The idea behind its current behavior is the following: If Flink cannot recover a job, it is very hard for it to tell whether it is due to an intermittent problem or a permanent one. No matter how often you retry, you can always run into the situation that you give up too early. Since we believe that this would be a very surprising behavior because it effectively means that Flink can forget about jobs in case of a recovery, we decided that this situation requires the intervention of the user to resolve the situation. By enforcing the user to make a decision, we make this problem very explicit and require her to think about the situation. I hope this makes sense.

So in your case, what you have to do is to remove the relevant ZooKeeper zNode which contains the pointer to the submitted job graph file. That way, Flink will no longer try to recover this job. I do agree that this is a bit cumbersome and it could definitely help to offer a small tool to do this kind of cleanup task.

Cheers,
Till

On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan <[hidden email]> wrote:
Hi Li,

If I understand correctly, you want the cluster to proceed recovery,
skipping some non-recoverable jobs (but still recover others).
The only way I can think of is to remove the corresponding nodes in
ZooKeeper which is not very safe.

I'm pulling in Robert and Till who might know better.

Regards,
Roman


On Thu, Jun 10, 2021 at 8:56 PM Li Peng <[hidden email]> wrote:
>
> Hi Roman,
>
> Is there a way to abandon job recovery after a few tries? By that I mean that this problem was fixed by me restarting the cluster and not try to recover a job. Is there some setting that emulates what I did, so I don't need to do manual intervention if this happens again??
>
> Thanks,
> Li
>
> On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi Li,
>>
>> The missing file is a serialized job graph and the job recovery can't
>> proceed without it.
>> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 10, 2021 at 6:02 AM Li Peng <[hidden email]> wrote:
>> >
>> > Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:
>> >
>> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
>> > java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> > at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > at java.base/java.lang.Thread.run(Thread.java:834)
>> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> > at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>> > ... 3 common frames omitted
>> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>> > ... 7 common frames omitted
>> > Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
>> > at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>> > at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
>> > ... 8 common frames omitted
>> >
>> > We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?
>> >
>> > Thanks!
>> > Li
>> >
>> >
>> >
>> >
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully handle job recovery failures

Li Peng-2
Understood, thanks all!

-Li

On Fri, Jun 11, 2021 at 12:40 AM Till Rohrmann <[hidden email]> wrote:
Hi Li,

Roman is right about Flink's behavior and what you can do about it. The idea behind its current behavior is the following: If Flink cannot recover a job, it is very hard for it to tell whether it is due to an intermittent problem or a permanent one. No matter how often you retry, you can always run into the situation that you give up too early. Since we believe that this would be a very surprising behavior because it effectively means that Flink can forget about jobs in case of a recovery, we decided that this situation requires the intervention of the user to resolve the situation. By enforcing the user to make a decision, we make this problem very explicit and require her to think about the situation. I hope this makes sense.

So in your case, what you have to do is to remove the relevant ZooKeeper zNode which contains the pointer to the submitted job graph file. That way, Flink will no longer try to recover this job. I do agree that this is a bit cumbersome and it could definitely help to offer a small tool to do this kind of cleanup task.

Cheers,
Till

On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan <[hidden email]> wrote:
Hi Li,

If I understand correctly, you want the cluster to proceed recovery,
skipping some non-recoverable jobs (but still recover others).
The only way I can think of is to remove the corresponding nodes in
ZooKeeper which is not very safe.

I'm pulling in Robert and Till who might know better.

Regards,
Roman


On Thu, Jun 10, 2021 at 8:56 PM Li Peng <[hidden email]> wrote:
>
> Hi Roman,
>
> Is there a way to abandon job recovery after a few tries? By that I mean that this problem was fixed by me restarting the cluster and not try to recover a job. Is there some setting that emulates what I did, so I don't need to do manual intervention if this happens again??
>
> Thanks,
> Li
>
> On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi Li,
>>
>> The missing file is a serialized job graph and the job recovery can't
>> proceed without it.
>> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 10, 2021 at 6:02 AM Li Peng <[hidden email]> wrote:
>> >
>> > Hey folks, we have a cluster with HA mode enabled, and recently after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12) crashed and was stuck in a crash loop, with the following error:
>> >
>> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
>> > java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> > at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > at java.base/java.lang.Thread.run(Thread.java:834)
>> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00000000000000000000000000000000.
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> > at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>> > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>> > ... 3 common frames omitted
>> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /00000000000000000000000000000000. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
>> > at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>> > ... 7 common frames omitted
>> > Caused by: java.io.FileNotFoundException: No such file or directory: s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
>> > at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
>> > at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>> > at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
>> > at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> > at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
>> > ... 8 common frames omitted
>> >
>> > We have an idea of why the file might be gone and are addressing it, but my question is: how can I configure this in such a way so that a missing job file doesn't trap the cluster in a forever restart loop? Is there some setting to just treat this like a complete fresh deployment if the recovery file is missing?
>> >
>> > Thanks!
>> > Li
>> >
>> >
>> >
>> >