This post was updated on .
Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after a program failure (restart strategies tried but exhausted), but I'm not picking up the restored state. What am I doing wrong here? Summary I'm using a very simple app on 1 node just to learn checkpointing. App reads from a socket stream and I deliberately send in some "bad" data to throw an Exception using netcat (nc) as source. App uses a simple file URL as checkpoint backend. Checkpoint Backend // specified in program: env.setStateBackend((StateBackend)new FsStateBackend("file:///home/hadoop/flink/checkpoints/")); For restart strategy, I specify 3 attempts with 5 second delay between attempts // specified in program: int restartAttempts = 3; int restartDelaySeconds = 5; long delayBetweenRestarts = restartDelaySeconds*1000; env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenRestarts)); All the application does is parse each line as key,Integer pair and outputs accumulated sum to stdout. (See below) If I start up nc -l 9999 and type values like this it works fine: key1,5 key1,3 key1,4 However if I type in "junk" the program throws Exception trying to parse 'junk' as an Integer key1,junk When the application fails, nc also stops. If I start nc before all 3 restart attempts have been tried, everything is fine and the program restarts, picking up state where it left off. So after all the restarts have been tried and failed, I want to restart my program manually and pick up where I left off. Since I am specifying checkpoint backend in program , I thought it would just pick it up from there. Then I tried passing in the backend using the -s parameter to my program but that doesnot work either: flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints App Source: public class ComputeSumFaultTolerant { public static void main(String[] args) throws Exception { // Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parms = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(parms); String host = "localhost"; int port = 9999; System.out.println("ComputeSumFaultTolerant BEGIN"); // Setup Checkpoint and Retry String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/"; Utils.configureCheckpoint(env,checkpointBackendURL); Utils.configureRestartFixedDelay(env); // Get Our Raw Data Stream DataStream<Tuple2<String,Long>> eventStream = env .socketTextStream(host, port) .map(new MessageParser()) .keyBy(0) .sum(1); eventStream.print(); // Execute env.execute("ComputeSumFaultTolerant"); } private static class MessageParser implements MapFunction<String,Tuple2<String,Long>> { public Tuple2<String,Long> map(String input) throws Exception { String[] tokens = input.toLowerCase().split(","); String key = tokens[0]; Long value = Long.valueOf(tokens[1]); return new Tuple2<String,Long>(key,value); } } } public class Utils public static void configureCheckpoint(StreamExecutionEnvironment env, String checkpointBackend) throws Exception { // Set Up Checkpoints env.enableCheckpointing(5000L); // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(10000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Checkpoint Back-end env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend)); System.out.println("CHECKPOINT IS EXTERNALIZED"); env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); System.out.println("External enabled=" + env.getCheckpointConfig().isExternalizedCheckpointsEnabled()); } public static void configureRestart(StreamExecutionEnvironment env) throws Exception { // Restart Strategy // Fixed Delay int restartAttempts = 3; int restartDelaySeconds = 5; long delayBetweenRestarts = restartDelaySeconds*1000; env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenRestarts)); // Failure Rate Restart int failureRate = 3; Time failureInterval = Time.of(5, TimeUnit.MINUTES); Time delayInterval = Time.of(5, TimeUnit.SECONDS); // env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate, failureInterval, delayInterval)); // No Restart // env.setRestartStrategy(RestartStrategies.noRestart()); } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I think you need to specify the directory of an concrete checkpoint instead of the root directory for checkpoints to restore the states. The directory name should be like chk-${id}. The job id will change if you re-submit the job, so jobmanager is not able to recognize the retained checkpoint of the previous submission although you are using the same checkpoint root dir. Best, Paul Lam > 在 2018年10月18日,09:51,chrisr123 <[hidden email]> 写道: > > Hi Folks, > I'm trying to restart my program with restored state from a checkpoint after > a program failure (restart strategies tried but exhausted), but I'm not > picking up the restored state. What am I doing wrong here? > > *Summary* > I'm using a very simple app on 1 node just to learn checkpointing. > App reads from a socket stream and I deliberately send in some "bad" data to > throw an Exception using netcat (nc) as source. App uses a simple file URL > as checkpoint backend. > > *Checkpoint Backend* > // specified in program: > env.setStateBackend((StateBackend)new > FsStateBackend("file:///home/hadoop/flink/checkpoints/")); > > For restart strategy, I specify 3 attempts with 5 second delay between > attempts > // specified in program: > int restartAttempts = 3; > int restartDelaySeconds = 5; > long delayBetweenRestarts = restartDelaySeconds*1000; > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, > delayBetweenRestarts)); > > *Checkpoint Backend* > *App Logic:* > All the application does is parse each line as key,Integer pair and outputs > accumulated sum to stdout. (See below) > If I start up nc -l 9999 and type values like this it works fine: > key1,5 > key1,3 > key1,4 > > However if I type in "junk" the program throws Exception trying to parse > 'junk' as an Integer > key1,junk > > When the application fails, nc also stops. If I start nc before all 3 > restart attempts have been tried, everything is fine and the program > restarts, picking up state where it left off. > > So after all the restarts have been tried and failed, I want to restart my > program manually and pick up where I left off. Since I am specifying > checkpoint backend in program , I thought it would just pick it up from > there. Then I tried passing in the backend using the -s parameter to my > program but that doesnot work either: > > flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints > > > > > *App Source:* > public class ComputeSumFaultTolerant { > > public static void main(String[] args) throws Exception { > > // Execution Environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > ParameterTool parms = ParameterTool.fromArgs(args); > env.getConfig().setGlobalJobParameters(parms); > String host = "localhost"; > int port = 9999; > > System.out.println("ComputeSumFaultTolerant BEGIN"); > > // Setup Checkpoint and Retry > String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/"; > Utils.configureCheckpoint(env,checkpointBackendURL); > Utils.configureRestartFixedDelay(env); > > // Get Our Raw Data Stream > DataStream<Tuple2<String,Long>> eventStream = env > .socketTextStream(host, port) > .map(new MessageParser()) > .keyBy(0) > .sum(1); > eventStream.print(); > > // Execute > env.execute("ComputeSumFaultTolerant"); > } > > private static class MessageParser implements > MapFunction<String,Tuple2<String,Long>> { > public Tuple2<String,Long> map(String input) throws Exception { > String[] tokens = input.toLowerCase().split(","); > String key = tokens[0]; > Long value = Long.valueOf(tokens[1]); > return new Tuple2<String,Long>(key,value); > } > } > > > } > > public class Utils > > public static void configureCheckpoint(StreamExecutionEnvironment env, > String checkpointBackend) throws Exception { > // Set Up Checkpoints > env.enableCheckpointing(5000L); > > // set mode to exactly-once (this is the default) > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > // allow only one checkpoint to be in progress at the same time > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > // make sure 500 ms of progress happen between checkpoints > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); > > // checkpoints have to complete within one minute, or are discarded > env.getCheckpointConfig().setCheckpointTimeout(10000); > > // allow only one checkpoint to be in progress at the same time > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > // Checkpoint Back-end > env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend)); > > System.out.println("CHECKPOINT IS EXTERNALIZED"); > > env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > System.out.println("External enabled=" + > env.getCheckpointConfig().isExternalizedCheckpointsEnabled()); > > } > > public static void configureRestart(StreamExecutionEnvironment env) throws > Exception { > > // Restart Strategy > // Fixed Delay > int restartAttempts = 3; > int restartDelaySeconds = 5; > long delayBetweenRestarts = restartDelaySeconds*1000; > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, > delayBetweenRestarts)); > > // Failure Rate Restart > int failureRate = 3; > Time failureInterval = Time.of(5, TimeUnit.MINUTES); > Time delayInterval = Time.of(5, TimeUnit.SECONDS); > // > env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate, > failureInterval, delayInterval)); > > // No Restart > // env.setRestartStrategy(RestartStrategies.noRestart()); > } > > } > > > > > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |