Externalised checkpoint keeps ValueState after a crash of a Flink cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Externalised checkpoint keeps ValueState after a crash of a Flink cluster

min.tan

Hi,

 

I have a question about  to keep a ValueState after a Flink 1.7.2 cluster is crashed.

 

My Flink job is simple

 

1) read dummy events (an event only has a string Id) from a Kafka source.

2) do a count on input events and save it as a ValueState

3) setup an externalized checkpoint running every second

4) fire a number of events (e.g. 2K with four threads in parallel) and kill the task manager or job manager manually, (kill processId) before the inputs are completed

5) restart with the check point in a local file directory

 

when the task manager gets killed and its recovery task manager restarts, the check point works well, i.e. the second picks up events left from the first and produce a correct total count of events (e.g. 8K).

 

When the job manager gets killed and its recovery job manager restarts, the check point does not work well. The second still picks up events left from the first BUT it does not produce a correct total count of events (e.g. 8K minus the count done before the crash).

 

The command to start the recovery job is  bin/flink run -s file:///Users/min/Applications/flink1.7.2/log/c449fad6fbaff3daad6bc526b8a74d18

 

Any idea about where I could have done incorrectly?

 

I expect an externalized checkpoint would keep any Value State after a crash of the Flink cluster.

 

Thank you very much for your help in advance.

 

Regards,

 

Min



Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html
Reply | Threaded
Open this post in threaded view
|

Re: Externalised checkpoint keeps ValueState after a crash of a Flink cluster

Congxian Qiu
hi, Min

Complete Checkpoint contains the snapshot of all states, and when recovery from checkpoint, all the states will be recovered from checkpoint, from what you described, I guess when the job manager gets killed, there is an onging but not completed checkpoint. Maybe the doc[1] can be helpful.


Best, Congxian
On Mar 5, 2019, 04:16 +0800, [hidden email], wrote:

Hi,

 

I have a question about  to keep a ValueState after a Flink 1.7.2 cluster is crashed.

 

My Flink job is simple

 

1) read dummy events (an event only has a string Id) from a Kafka source.

2) do a count on input events and save it as a ValueState

3) setup an externalized checkpoint running every second

4) fire a number of events (e.g. 2K with four threads in parallel) and kill the task manager or job manager manually, (kill processId) before the inputs are completed

5) restart with the check point in a local file directory

 

when the task manager gets killed and its recovery task manager restarts, the check point works well, i.e. the second picks up events left from the first and produce a correct total count of events (e.g. 8K).

 

When the job manager gets killed and its recovery job manager restarts, the check point does not work well. The second still picks up events left from the first BUT it does not produce a correct total count of events (e.g. 8K minus the count done before the crash).

 

The command to start the recovery job is  bin/flink run -s file:///Users/min/Applications/flink1.7.2/log/c449fad6fbaff3daad6bc526b8a74d18

 

Any idea about where I could have done incorrectly?

 

I expect an externalized checkpoint would keep any Value State after a crash of the Flink cluster.

 

Thank you very much for your help in advance.

 

Regards,

 

Min


Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html