Can I only use checkpoints instead of savepoints in production?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Can I only use checkpoints instead of savepoints in production?

徐涛
Hi All,
I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
3. Will checkpoint and savepoint merged in later releases?
Thanks very much.

Best,
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

vino yang
Hi Henry,

A good answer from stackoverflow:

Apache Flink's Checkpoints and Savepoints are similar in that way they both are mechanisms for preserving internal state of Flink's applications.

Checkpoints are taken automatically and are used for automatic restarting job in case of a failure.

Savepoints on the other hand are taken manually, are always stored externally and are used for starting a "new" job with previous internal state in case of e.g.

  • Bug fixing
  • Flink version upgrade
  • A/B testing, etc.
Underneath they are in fact the same mechanism/code path with some subtle nuances.

About your question: 

1) No problem, The main purpose of checkpoint itself is to automatically restart the recovery when the job fails.
2) You can also use REST client to trigger savepoint.
3) I don't know, But it seems that their usage scenarios and purposes are still different. May Till and Chesnay can answer this question.

Thanks, vino.

徐涛 <[hidden email]> 于2018年8月24日周五 下午3:19写道:
Hi All,
I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
3. Will checkpoint and savepoint merged in later releases?
Thanks very much.

Best,
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Andrey Zagrebin
Hi Henry,

In addition to Vino’s answer, there are several things to keep in mind about “checkpoints vs savepoints".

Checkpoints are designed mostly for fault tolerance of running Flink job and automatic recovery
that is why by default Flink manages their storage itself. Though it is correct that you can configure the checkpoints to be retained (externalised), have control over their storage and resume a failed/canceled job from them.

But their format might be optimised for any of new Flink versions and change between them.
It means that in general you might not be able to upgrade Flink version or the running job structure using only checkpoints.

Moreover, currently, it is not guaranteed that you will be always able to rescale your job from the checkpoint (change parallelism). Although, it is technically possible for Flink 1.6.0 at the moment, even for incremental checkpoints.

Savepoints are designed for manual intervention of the user for maintenance operations
that is why their storage is under control of the user in the first place. They have more stable internal format which allows manual migration between Flink or job versions and rescaling.

Cheers,
Andrey

On 24 Aug 2018, at 12:55, vino yang <[hidden email]> wrote:

Hi Henry,

A good answer from stackoverflow:

Apache Flink's Checkpoints and Savepoints are similar in that way they both are mechanisms for preserving internal state of Flink's applications.

Checkpoints are taken automatically and are used for automatic restarting job in case of a failure.

Savepoints on the other hand are taken manually, are always stored externally and are used for starting a "new" job with previous internal state in case of e.g.

  • Bug fixing
  • Flink version upgrade
  • A/B testing, etc.
Underneath they are in fact the same mechanism/code path with some subtle nuances.

About your question: 

1) No problem, The main purpose of checkpoint itself is to automatically restart the recovery when the job fails.
2) You can also use REST client to trigger savepoint.
3) I don't know, But it seems that their usage scenarios and purposes are still different. May Till and Chesnay can answer this question.

Thanks, vino.

徐涛 <[hidden email]> 于2018年8月24日周五 下午3:19写道:
Hi All,
I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
3. Will checkpoint and savepoint merged in later releases?
Thanks very much.

Best,
Henry

Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Andrey Zagrebin
This thread is also useful in this context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html

On 24 Aug 2018, at 14:49, Andrey Zagrebin <[hidden email]> wrote:

Hi Henry,

In addition to Vino’s answer, there are several things to keep in mind about “checkpoints vs savepoints".

Checkpoints are designed mostly for fault tolerance of running Flink job and automatic recovery
that is why by default Flink manages their storage itself. Though it is correct that you can configure the checkpoints to be retained (externalised), have control over their storage and resume a failed/canceled job from them.

But their format might be optimised for any of new Flink versions and change between them.
It means that in general you might not be able to upgrade Flink version or the running job structure using only checkpoints.

Moreover, currently, it is not guaranteed that you will be always able to rescale your job from the checkpoint (change parallelism). Although, it is technically possible for Flink 1.6.0 at the moment, even for incremental checkpoints.

Savepoints are designed for manual intervention of the user for maintenance operations
that is why their storage is under control of the user in the first place. They have more stable internal format which allows manual migration between Flink or job versions and rescaling.

Cheers,
Andrey

On 24 Aug 2018, at 12:55, vino yang <[hidden email]> wrote:

Hi Henry,

A good answer from stackoverflow:

Apache Flink's Checkpoints and Savepoints are similar in that way they both are mechanisms for preserving internal state of Flink's applications.

Checkpoints are taken automatically and are used for automatic restarting job in case of a failure.

Savepoints on the other hand are taken manually, are always stored externally and are used for starting a "new" job with previous internal state in case of e.g.

  • Bug fixing
  • Flink version upgrade
  • A/B testing, etc.
Underneath they are in fact the same mechanism/code path with some subtle nuances.

About your question: 

1) No problem, The main purpose of checkpoint itself is to automatically restart the recovery when the job fails.
2) You can also use REST client to trigger savepoint.
3) I don't know, But it seems that their usage scenarios and purposes are still different. May Till and Chesnay can answer this question.

Thanks, vino.

徐涛 <[hidden email]> 于2018年8月24日周五 下午3:19写道:
Hi All,
I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
3. Will checkpoint and savepoint merged in later releases?
Thanks very much.

Best,
Henry


Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Averell
In reply to this post by vino yang
Hi Vino,

Regarding this statement "/Checkpoints are taken automatically and are used
for automatic restarting job in case of a failure/", I do not quite
understand the definition of a failure, and how to simulate that while
testing my application. Possible scenarios that I can think of:
   (1) flink application killed
   (2) cluster crashed
   (3) one of the servers in the cluster crashed
   (4) unhandled exception raised when abnormal data received
   ...

Could you please help explain?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

vino yang
Hi Averell,

The checkpoint is automatically triggered periodically according to the checkpoint interval set by the user. I believe that you should have no doubt about this.

There are many reasons for the Job failure. 
The technical definition is that the Job does not normally enter the final termination state. 
Here is a document [1] with a transformation map of Job status, you can see how Flink Job status is converted. 
There are many reasons why a job fails. 
For example, if a sub task fails or throws an exception, a sub task throws an exception when doing a checkpoint (but this does not necessarily lead to a job failure), 
Connection timeout between a TM and JM, TM downtime, JM leader switch and more.

So, in these scenarios (including your own enumeration) you can simulate the failure recovery of a job.

More specifically, Job recovery is based on the child nodes of Zookeeper's "/jobgraph". 
If any job does not enter the termination state normally, the child nodes of this job will not be cleaned up.


Thanks, vino.

Averell <[hidden email]> 于2018年8月24日周五 下午9:17写道:
Hi Vino,

Regarding this statement "/Checkpoints are taken automatically and are used
for automatic restarting job in case of a failure/", I do not quite
understand the definition of a failure, and how to simulate that while
testing my application. Possible scenarios that I can think of:
   (1) flink application killed
   (2) cluster crashed
   (3) one of the servers in the cluster crashed
   (4) unhandled exception raised when abnormal data received
   ...

Could you please help explain?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Averell
Thank you Vino.

I sometimes got the error message like the one below. It looks like my
executors got overloaded. Here I have another question: is there any
existing solution that allows me to have the job restored automatically?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

vino yang
Hi Averell,

What is the error message? Do you seem to forget to post it? 
As far as I know, if you enable checkpoints, it will automatically resume when the job fails.

Thanks, vino.

Averell <[hidden email]> 于2018年8月27日周一 下午1:21写道:
Thank you Vino.

I sometimes got the error message like the one below. It looks like my
executors got overloaded. Here I have another question: is there any
existing solution that allows me to have the job restored automatically?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Averell
Thank you Vino.

I put the message in a  tag, and I don't know why it was not shown in the
email thread. I paste the error message below in this email.

Anyway, it seems that was an issue with enabling checkpointing. Now I am
able to get it turned on properly, and my job is getting restored
automatically.
I am trying to test my scenarios now. Found some issues, and I think it
would be better to ask in a separate thread.

Thanks and regards,
Averell

=====
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 457d8f370ef8a50bb462946e1f12b80e)
        at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
        at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
......
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id container_1535279282999_0032_01_000013 timed out.
        at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
        at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

vino yang
Hi Averell,

This problem is caused by a heartbeat timeout between JM and TM. You can locate it by:
1) Check the network status of the node at the time, such as whether the connection with other systems is equally problematic;
2) Check the tm log to see if there are more specific reasons;
3) View the load condition of the node that generated the timeout period;
4) Confirm whether there is a problem such as Full GC causing the JVM process to be stuck at the time;

Also, I don't know if you are using the default timeout, and if so, you can increase it appropriately.

Thanks, vino.

Averell <[hidden email]> 于2018年8月27日周一 下午3:00写道:
Thank you Vino.

I put the message in a  tag, and I don't know why it was not shown in the
email thread. I paste the error message below in this email.

Anyway, it seems that was an issue with enabling checkpointing. Now I am
able to get it turned on properly, and my job is getting restored
automatically.
I am trying to test my scenarios now. Found some issues, and I think it
would be better to ask in a separate thread.

Thanks and regards,
Averell

=====
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 457d8f370ef8a50bb462946e1f12b80e)
        at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
        at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
......
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id container_1535279282999_0032_01_000013 timed out.
        at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
        at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Averell
Hi Vino,

Could you please tell where I should find the JM and TM logs? I'm running on
an AWS EMR using yarn.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

vino yang
Hi Averell,

I have not used aws products, but if it is similar to YARN, or if you have visited YARN's web ui. 
Then you look at the YARN ApplicationMaster log to view the JM log, and the container log is the tm log.

Thanks, vino.

Averell <[hidden email]> 于2018年8月27日周一 下午4:09写道:
Hi Vino,

Could you please tell where I should find the JM and TM logs? I'm running on
an AWS EMR using yarn.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Can I only use checkpoints instead of savepoints in production?

Averell
Thank you, Vino.
I found it, http://<clusterIP>:8088/

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/