Restart from checkpoint after program failure

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Restart from checkpoint after program failure

chrisr123
This post was updated on .
Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after
a program failure (restart strategies tried but exhausted), but I'm not
picking up the restored state. What am I doing wrong here?  

Summary
I'm using a very simple app on 1 node just to learn checkpointing.
App reads from a socket stream and I deliberately send in some "bad" data to
throw an Exception using netcat (nc) as source. App uses  a simple file URL
as checkpoint backend.

Checkpoint Backend
// specified in program:
env.setStateBackend((StateBackend)new
FsStateBackend("file:///home/hadoop/flink/checkpoints/"));

For restart strategy, I specify 3 attempts with 5 second delay between
attempts
// specified in program:
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));


All the application does is parse each line as key,Integer pair and outputs
accumulated sum to stdout. (See below)
If I start up nc -l 9999 and type values like this it works fine:
key1,5
key1,3
key1,4

However if I type in "junk" the program throws Exception trying to parse
'junk' as an Integer
key1,junk

When the application fails, nc also stops. If I start nc before all 3
restart attempts have been tried, everything is fine and the program
restarts, picking up state where it left off.

So after all the restarts have been tried and failed, I want to restart my
program manually and pick up where I left off. Since I am specifying
checkpoint backend in program , I thought it would just pick it up from
there. Then I tried passing in the backend using the -s parameter to my
program but that doesnot work either:

flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints




App Source:
public class ComputeSumFaultTolerant {

        public static void main(String[] args) throws Exception {

                // Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);
                String host = "localhost";
                int port = 9999;
               
                System.out.println("ComputeSumFaultTolerant BEGIN");
       
                // Setup Checkpoint and Retry
                String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
                Utils.configureCheckpoint(env,checkpointBackendURL);
                Utils.configureRestartFixedDelay(env);

                // Get Our Raw Data Stream
                DataStream<Tuple2&lt;String,Long>> eventStream = env
                                .socketTextStream(host, port)
                                .map(new MessageParser())
                                .keyBy(0)
                                .sum(1);
                eventStream.print();
               
                // Execute
                env.execute("ComputeSumFaultTolerant");
        }
       
        private static class MessageParser implements
MapFunction<String,Tuple2&lt;String,Long>> {
                public Tuple2<String,Long> map(String input) throws Exception {
                        String[] tokens = input.toLowerCase().split(",");
                        String key = tokens[0];
                        Long value = Long.valueOf(tokens[1]);
                        return new Tuple2<String,Long>(key,value);
                }
        }
       
       
}

public class Utils

        public static void configureCheckpoint(StreamExecutionEnvironment env,
String checkpointBackend) throws Exception {
                // Set Up Checkpoints
                env.enableCheckpointing(5000L);
               
                // set mode to exactly-once (this is the default)
       
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
               
                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                // make sure 500 ms of progress happen between checkpoints
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);

                // checkpoints have to complete within one minute, or are discarded
                env.getCheckpointConfig().setCheckpointTimeout(10000);

                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                // Checkpoint Back-end
                env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
               
                System.out.println("CHECKPOINT IS EXTERNALIZED");
       
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
               
                System.out.println("External enabled=" +
env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
               
        }
       
        public static void configureRestart(StreamExecutionEnvironment env) throws
Exception {
               
                // Restart Strategy
                // Fixed Delay
                int restartAttempts = 3;
                int restartDelaySeconds = 5;
                long delayBetweenRestarts = restartDelaySeconds*1000;
       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));
               
                // Failure Rate Restart
                int failureRate = 3;  
                Time failureInterval = Time.of(5, TimeUnit.MINUTES);
                Time delayInterval = Time.of(5, TimeUnit.SECONDS);
                //
env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
failureInterval, delayInterval));
               
                // No Restart
                // env.setRestartStrategy(RestartStrategies.noRestart());
        }

}









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Restart from checkpoint after program failure

Paul Lam
Hi,

I think you need to specify the directory of an concrete checkpoint instead of the root directory for checkpoints to restore the states. The directory name should be like chk-${id}.

The job id will change if you re-submit the job, so jobmanager is not able to recognize the retained checkpoint of the previous submission although you are using the same checkpoint root dir.

Best,
Paul Lam

> 在 2018年10月18日,09:51,chrisr123 <[hidden email]> 写道:
>
> Hi Folks,
> I'm trying to restart my program with restored state from a checkpoint after
> a program failure (restart strategies tried but exhausted), but I'm not
> picking up the restored state. What am I doing wrong here?  
>
> *Summary*
> I'm using a very simple app on 1 node just to learn checkpointing.
> App reads from a socket stream and I deliberately send in some "bad" data to
> throw an Exception using netcat (nc) as source. App uses  a simple file URL
> as checkpoint backend.
>
> *Checkpoint Backend*
> // specified in program:
> env.setStateBackend((StateBackend)new
> FsStateBackend("file:///home/hadoop/flink/checkpoints/"));
>
> For restart strategy, I specify 3 attempts with 5 second delay between
> attempts
> // specified in program:
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
>
> *Checkpoint Backend*
> *App Logic:*
> All the application does is parse each line as key,Integer pair and outputs
> accumulated sum to stdout. (See below)
> If I start up nc -l 9999 and type values like this it works fine:
> key1,5
> key1,3
> key1,4
>
> However if I type in "junk" the program throws Exception trying to parse
> 'junk' as an Integer
> key1,junk
>
> When the application fails, nc also stops. If I start nc before all 3
> restart attempts have been tried, everything is fine and the program
> restarts, picking up state where it left off.
>
> So after all the restarts have been tried and failed, I want to restart my
> program manually and pick up where I left off. Since I am specifying
> checkpoint backend in program , I thought it would just pick it up from
> there. Then I tried passing in the backend using the -s parameter to my
> program but that doesnot work either:
>
> flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints
>
>
>
>
> *App Source:*
> public class ComputeSumFaultTolerant {
>
> public static void main(String[] args) throws Exception {
>
> // Execution Environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> ParameterTool parms = ParameterTool.fromArgs(args);
> env.getConfig().setGlobalJobParameters(parms);
> String host = "localhost";
> int port = 9999;
>
> System.out.println("ComputeSumFaultTolerant BEGIN");
>
> // Setup Checkpoint and Retry
> String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
> Utils.configureCheckpoint(env,checkpointBackendURL);
> Utils.configureRestartFixedDelay(env);
>
> // Get Our Raw Data Stream
> DataStream<Tuple2&lt;String,Long>> eventStream = env
> .socketTextStream(host, port)
> .map(new MessageParser())
> .keyBy(0)
> .sum(1);
> eventStream.print();
>
> // Execute
> env.execute("ComputeSumFaultTolerant");
> }
>
> private static class MessageParser implements
> MapFunction<String,Tuple2&lt;String,Long>> {
> public Tuple2<String,Long> map(String input) throws Exception {
> String[] tokens = input.toLowerCase().split(",");
> String key = tokens[0];
> Long value = Long.valueOf(tokens[1]);
> return new Tuple2<String,Long>(key,value);
> }
> }
>
>
> }
>
> public class Utils
>
> public static void configureCheckpoint(StreamExecutionEnvironment env,
> String checkpointBackend) throws Exception {
> // Set Up Checkpoints
> env.enableCheckpointing(5000L);
>
> // set mode to exactly-once (this is the default)
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> // make sure 500 ms of progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
>
> // checkpoints have to complete within one minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(10000);
>
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> // Checkpoint Back-end
> env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
>
> System.out.println("CHECKPOINT IS EXTERNALIZED");
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> System.out.println("External enabled=" +
> env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
>
> }
>
> public static void configureRestart(StreamExecutionEnvironment env) throws
> Exception {
>
> // Restart Strategy
> // Fixed Delay
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
>
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
>
> // Failure Rate Restart
> int failureRate = 3;  
> Time failureInterval = Time.of(5, TimeUnit.MINUTES);
> Time delayInterval = Time.of(5, TimeUnit.SECONDS);
> //
> env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
> failureInterval, delayInterval));
>
> // No Restart
> // env.setRestartStrategy(RestartStrategies.noRestart());
> }
>
> }
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/