Hello guys.
We run a stand-alone cluster that runs a single job (if you are familiar with the way Ververica Platform runs Flink jobs, we use a very similar approach). It runs Flink 1.11.1 straight from the official docker image. Usually, when our jobs crash for any reason, they will resume from the latest checkpoint. This is the expected behavior and has been working fine for years. But we encountered an issue with a job that crashed apparently because it lost connectivity with Zookeeper. The logs for this job can be found here: https://pastebin.com/raw/uH9KDU2L (I redacted boring or private stuff and annotated the relevant parts). From what I can tell, this line was called: ``` // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); ``` https://github.com/apache/flink/blob/6b9cdd41743edd24a929074d62a57b84e7b2dd97/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L243-L246 which seems pretty dangerous because it ends up calling HighAvailabilityServices.closeAndCleanupAllData() https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L225-L239 To me this looks like a dangerous default... why would we want to delete the checkpoint metadata ever unless when explicitly canceling/stopping the job? I think that if/else branch means something like: if the job crashed (i.e. `throwable != null`), then DO NOT wipe out the state. Otherwise, delete it. But in this case... it seems like `throwable` was indeed null, which caused the job to delete the checkpoint data before dying. At this point, I'm just guessing really... I don't really know if this is what happened in this case. Hopefully someone with more kwoledge of how this works give us a hand. Thanks. |
Hi Cristian,
In the log,we can see it went to the method shutDownAsync(applicationStatus,null,true); `` 2020-09-04 17:32:07,950 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null. `` In general shutdown path,default to clean up HaData is normal. So the problem is not why we clean up HaData in general shutdown path,but why it went to the general shutdown path when your cluster fails. I am going to have lunch , and plan to analyze the log in the afternoon. Best, Qingdong Zeng -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
My suspicion is that somewhere in the path were it fails to connect yo zookeeper, the exception is swallowed, so instead of running the shutdown path for when the job fails, the general shutdown path is taken. This was fortunately a job for which we had a savepoint from yesterday. Otherwise we would have been in serios problems. On Fri, Sep 4, 2020, at 9:12 PM, Qingdong Zeng wrote: > Hi Cristian, > > In the log,we can see it went to the method > shutDownAsync(applicationStatus,null,true); > > `` > 2020-09-04 17:32:07,950 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StandaloneApplicationClusterEntryPoint down with application status FAILED. > Diagnostics null. > `` > > In general shutdown path,default to clean up HaData is normal. So the > problem is not why we clean up HaData in general shutdown path,but why it > went to the general shutdown path when your cluster fails. > > I am going to have lunch , and plan to analyze the log in the afternoon. > > Best, > Qingdong Zeng > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Hi Cristian, From this code , we could see that the Exception or Error was ignored in dispatcher.shutDownCluster(applicationStatus) . `` org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster return applicationCompletionFuture .handle((r, t) -> { final ApplicationStatus applicationStatus; if (t != null) { final Optional<JobCancellationException> cancellationException = ExceptionUtils.findThrowable(t, JobCancellationException.class); if (cancellationException.isPresent()) { // this means the Flink Job was cancelled applicationStatus = ApplicationStatus.CANCELED; } else if (t instanceof CancellationException) { // this means that the future was cancelled applicationStatus = ApplicationStatus.UNKNOWN; } else { applicationStatus = ApplicationStatus.FAILED; } LOG.warn("Application {}: ", applicationStatus, t); } else { applicationStatus = ApplicationStatus.SUCCEEDED; LOG.info("Application completed SUCCESSFULLY"); } return dispatcher.shutDownCluster(applicationStatus); }) .thenCompose(Function.identity()); `` So when it come to java.util.concurrent.CompletableFuture#whenComplete , there is no throwable, only ApplicationStatus.FAILED , and data was cleaned up. `` clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } `` If you want to save your checkPoint,you could refer to this document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html In another way,you could change the code in org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when it came to faied,save the data. In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data in any solution. Is there anyone could help me to solve this question? Best, Husky Zeng ----- Chinese,NanJing , Huawei. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
> If you want to save your checkPoint,you could refer to this document
What do you mean? We already persist our savepoints, and we do not delete them explicitly ever. The problem is that Flink deleted the data from zookeeper when it shouldn't have. Is it possible to start a job from a checkpoint using - - fromSavepoint? On Sat, Sep 5, 2020, at 2:05 AM, Husky Zeng wrote: > > Hi Cristian, > > From this code , we could see that the Exception or Error was ignored in > dispatcher.shutDownCluster(applicationStatus) . > > `` > org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster > > return applicationCompletionFuture > .handle((r, t) -> { > final ApplicationStatus applicationStatus; > if (t != null) { > > final Optional<JobCancellationException> cancellationException = > ExceptionUtils.findThrowable(t, JobCancellationException.class); > > if (cancellationException.isPresent()) { > // this means the Flink Job was cancelled > applicationStatus = ApplicationStatus.CANCELED; > } else if (t instanceof CancellationException) { > // this means that the future was cancelled > applicationStatus = ApplicationStatus.UNKNOWN; > } else { > applicationStatus = ApplicationStatus.FAILED; > } > > LOG.warn("Application {}: ", applicationStatus, t); > } else { > applicationStatus = ApplicationStatus.SUCCEEDED; > LOG.info("Application completed SUCCESSFULLY"); > } > return dispatcher.shutDownCluster(applicationStatus); > }) > .thenCompose(Function.identity()); > > `` > > > So when it come to java.util.concurrent.CompletableFuture#whenComplete , > there is no throwable, only ApplicationStatus.FAILED , and data was cleaned > up. > > > `` > clusterComponent.getShutDownFuture().whenComplete( > (ApplicationStatus applicationStatus, Throwable throwable) -> { > if (throwable != null) { > shutDownAsync( > ApplicationStatus.UNKNOWN, > ExceptionUtils.stringifyException(throwable), > false); > } else { > // This is the general shutdown path. If a separate more specific > shutdown was > // already triggered, this will do nothing > shutDownAsync( > applicationStatus, > null, > true); > } > }); > } > > `` > > If you want to save your checkPoint,you could refer to this document: > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html > > In another way,you could change the code in > org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when > it came to faied,save the data. > > In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data > in any solution. Is there anyone could help me to solve this question? > > Best, > Husky Zeng > > > > > > ----- > Chinese,NanJing , Huawei. > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
I means that checkpoints are usually dropped after the job was terminated by
the user (except if explicitly configured as retained Checkpoints). You could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save your checkpoint when te cames to failure. When your zookeeper lost connection,the High-Availability system ,which rely on zookeeper was also failure, it leads to your application stop without retry. I hava a question , if your application lost zookeeper connection,how did it delete the data in zookeeper? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
That's an excellent question. I can't explain that. All I know is this:
- the job was upgraded and resumed from a savepoint - After hours of working fine, it failed (like it shows in the logs) - the Metadata was cleaned up, again as shown in the logs - because I run this in Kubernetes, the container was restarted immediately, and because nothing was found in zookeeper it started again from the savepoint I didn't realize this was happening after a couple of hours later. At that point the job had already checkpointed several times, and it was futile to try to start it from a retained checkpoint (assuming there were any). My question is... Is this a bug or not? On Mon, Sep 7, 2020, at 1:53 AM, Husky Zeng wrote: > I means that checkpoints are usually dropped after the job was terminated by > the user (except if explicitly configured as retained Checkpoints). You > could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save > your checkpoint when te cames to failure. > > When your zookeeper lost connection,the High-Availability system ,which rely > on zookeeper was also failure, it leads to your application stop without > retry. > > I hava a question , if your application lost zookeeper connection,how did > it delete the data in zookeeper? > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
This post was updated on .
Hi Cristian,
》》》》My question is... Is this a bug or not? I don't know if it was designed to be like this deliberately. So I have already submitted an issue ,and wait for somebody to response. https://issues.apache.org/jira/browse/FLINK-19154 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks a lot for reporting this problem here Cristian! I am not super familiar with the involved components, but the behavior you are describing doesn't sound right to me. Which entrypoint are you using? This is logged at the beginning, like this: "2020-09-08 14:45:32,807 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)" Do you know by chance if this problem is reproducible? With the StandaloneSessionClusterEntrypoint I was not able to reproduce the problem. On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <[hidden email]> wrote: Hi Cristian, |
I'm using the standalone script to start the cluster. As far as I can tell, it's not easy to reproduce. We found that zookeeper lost a node around the time this happened, but all of our other 75 Flink jobs which use the same setup, version and zookeeper, didn't have any issues. They didn't even restart. So unfortunately I don't know how to reproduce this. All I know is I can't sleep. I have nightmares were my precious state is deleted. I wake up crying and quickly start manually savepointing all jobs just in case, because I feel the day of reckon is near. Flinkpocalypse! On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
|
AFAIK, the HA data, including Zookeeper meta data and real data on DFS, will only be cleaned up when the Flink cluster reached terminated state. So if you are using a session cluster, the root cluster node on Zk will be cleaned up after you manually stop the session cluster. The job sub directory will be cleaned up when the job finished/canceled/failed. If you are using a job/application cluster, once the only running job finished/failed, all the HA data will be cleaned up. I think you need to check the job restart strategy you have set. For example, the following configuration will make the Flink cluster terminated after 10 attempts. restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 Best, Yang Cristian <[hidden email]> 于2020年9月9日周三 上午12:28写道:
|
> The job sub directory will be cleaned up when the job finished/canceled/failed. What does this mean? Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the time... and yet, the jobs would ALWAYS resume from the last checkpoint. The only cases where I expect Flink to clean up the checkpoint data from ZK is when I explicitly stop or cancel the job (in those cases the job manager takes a savepoint before cleaning up zk and finishing the cluster). Which is not the case here. Flink was on autopilot here and decided to wipe my poor, good checkpoint metadata as the logs show. On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
|
> The job sub directory will be cleaned up when the job finished/canceled/failed. Since we could submit multiple jobs into a Flink session, what i mean is when a job reached to the terminal state, the sub node(e.g. /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1) on the Zookeeper will be cleaned up. But the root directory(/flink/application_xxxx/) still exists. For your current case, it is a different case(perjob cluster). I think we need to figure out why the only running job reached the terminal state. For example, the restart attempts are exhausted. And you could find the following logs in your JobManager log. "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy" Best, Yang Cristian <[hidden email]> 于2020年9月9日周三 上午11:26写道:
|
Hi Cristian, thanks for reporting this issue. It looks indeed like a very critical problem. The problem seems to be that the ApplicationDispatcherBootstrap class produces an exception (that the request job can no longer be found because of a lost ZooKeeper connection) which will be interpreted as a job failure. Due to this interpretation, the cluster will be shut down with a terminal state of FAILED which will cause the HA data to be cleaned up. The exact problem occurs in the JobStatusPollingUtils.getJobResult which is called by ApplicationDispatcherBootstrap.getJobResult(). I think there are two problems here: First of all not every exception bubbling up in the future returned by ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a job failure. Some of them can also indicate a framework failure which should not lead to the clean up of HA data. The other problem is that the polling logic cannot properly handle a temporary connection loss to ZooKeeper which is a normal situation. I am pulling in Aljoscha and Klou who worked on this feature and might be able to propose a solution for these problems. I've also updated the JIRA issue FLINK-19154. Cheers, Till On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <[hidden email]> wrote:
|
Hi all,
I will have a look. Kostas On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann <[hidden email]> wrote: > > Hi Cristian, > > thanks for reporting this issue. It looks indeed like a very critical problem. > > The problem seems to be that the ApplicationDispatcherBootstrap class produces an exception (that the request job can no longer be found because of a lost ZooKeeper connection) which will be interpreted as a job failure. Due to this interpretation, the cluster will be shut down with a terminal state of FAILED which will cause the HA data to be cleaned up. The exact problem occurs in the JobStatusPollingUtils.getJobResult which is called by ApplicationDispatcherBootstrap.getJobResult(). > > I think there are two problems here: First of all not every exception bubbling up in the future returned by ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a job failure. Some of them can also indicate a framework failure which should not lead to the clean up of HA data. The other problem is that the polling logic cannot properly handle a temporary connection loss to ZooKeeper which is a normal situation. > > I am pulling in Aljoscha and Klou who worked on this feature and might be able to propose a solution for these problems. I've also updated the JIRA issue FLINK-19154. > > Cheers, > Till > > On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <[hidden email]> wrote: >> >> > The job sub directory will be cleaned up when the job finished/canceled/failed. >> Since we could submit multiple jobs into a Flink session, what i mean is when a job >> reached to the terminal state, the sub node(e.g. /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1) >> on the Zookeeper will be cleaned up. But the root directory(/flink/application_xxxx/) still exists. >> >> >> For your current case, it is a different case(perjob cluster). I think we need to figure out why the only >> running job reached the terminal state. For example, the restart attempts are exhausted. And you >> could find the following logs in your JobManager log. >> >> "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy" >> >> >> Best, >> Yang >> >> >> >> >> Cristian <[hidden email]> 于2020年9月9日周三 上午11:26写道: >>> >>> > The job sub directory will be cleaned up when the job finished/canceled/failed. >>> >>> What does this mean? >>> >>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the time... and yet, the jobs would ALWAYS resume from the last checkpoint. >>> >>> The only cases where I expect Flink to clean up the checkpoint data from ZK is when I explicitly stop or cancel the job (in those cases the job manager takes a savepoint before cleaning up zk and finishing the cluster). >>> >>> Which is not the case here. Flink was on autopilot here and decided to wipe my poor, good checkpoint metadata as the logs show. >>> >>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote: >>> >>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, will only be cleaned up >>> when the Flink cluster reached terminated state. >>> >>> So if you are using a session cluster, the root cluster node on Zk will be cleaned up after you manually >>> stop the session cluster. The job sub directory will be cleaned up when the job finished/canceled/failed. >>> >>> If you are using a job/application cluster, once the only running job finished/failed, all the HA data will >>> be cleaned up. I think you need to check the job restart strategy you have set. For example, the following >>> configuration will make the Flink cluster terminated after 10 attempts. >>> >>> restart-strategy: fixed-delay >>> restart-strategy.fixed-delay.attempts: 10 >>> >>> >>> Best, >>> Yang >>> >>> Cristian <[hidden email]> 于2020年9月9日周三 上午12:28写道: >>> >>> >>> I'm using the standalone script to start the cluster. >>> >>> As far as I can tell, it's not easy to reproduce. We found that zookeeper lost a node around the time this happened, but all of our other 75 Flink jobs which use the same setup, version and zookeeper, didn't have any issues. They didn't even restart. >>> >>> So unfortunately I don't know how to reproduce this. All I know is I can't sleep. I have nightmares were my precious state is deleted. I wake up crying and quickly start manually savepointing all jobs just in case, because I feel the day of reckon is near. Flinkpocalypse! >>> >>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote: >>> >>> Thanks a lot for reporting this problem here Cristian! >>> >>> I am not super familiar with the involved components, but the behavior you are describing doesn't sound right to me. >>> Which entrypoint are you using? This is logged at the beginning, like this: "2020-09-08 14:45:32,807 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)" >>> >>> Do you know by chance if this problem is reproducible? With the StandaloneSessionClusterEntrypoint I was not able to reproduce the problem. >>> >>> >>> >>> >>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <[hidden email]> wrote: >>> >>> Hi Cristian, >>> >>> >>> I don't know if it was designed to be like this deliberately. >>> >>> So I have already submitted an issue ,and wait for somebody to response. >>> >>> https://issues.apache.org/jira/browse/FLINK-19154 >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>> |
Great, thanks Klou! Cheers, Till On Mon, Sep 28, 2020 at 5:07 PM Kostas Kloudas <[hidden email]> wrote: Hi all, |
Free forum by Nabble | Edit this page |