Hi, I am new to flink. I am facing issue implementing checkpoint. checkpoint related code: long checkpointInterval = 5000; When I run the code, I can see flink-state being written in my local machine. but when I stop the job , wait for a few minutes and restart the job, it does not pick up from the time it left but it starts from when I started the job. Could you point out what i am doing wrong. I am testing it locally from ideaIntellij. below is what i see from localhost. Any help would be appreciated. Thanks Br, -- Aftab Ansari Analytics Developer Rovio Entertainment Ltd. Keilaranta 7, FIN - 02150 Espoo, Finland Mobile: + 358 (0)46 923 3060 |
Hi, I think you need to create a savepoint and restore from there. https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
Checkpoint are for automatic recovery within the lifetime of a job, they're deleted when you stop the job manually. Regards, Kien On 7/10/17 7:20 PM, Aftab Ansari wrote:
|
In reply to this post by Aftab Ansari
Hi Aftab,
looks like what you want is either an externalized checkpoint with RETAIN_ON_CANCELLATION mode [1] or a savepoint [2]. Ordinary checkpoints are deleted when the job is cancelled and only serve as a fault tolerance layer in case something goes wrong, i.e. machines fail, so that the job can be restarted automatically based on the restart policy. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ checkpoints.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ savepoints.html On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote: > Hi, > I am new to flink. I am facing issue implementing checkpoint. > > checkpoint related code: > > long checkpointInterval = 5000; > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > //specify backend > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true)); > env.setStateBackend(new > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > //enable checkpoint > env.enableCheckpointing(checkpointInterval, > CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > > > When I run the code, I can see flink-state being written in my local > machine. but when I stop the job , wait for a few minutes and restart the > job, it does not pick up from the time it left but it starts from when I > started the job. > > Could you point out what i am doing wrong. I am testing it locally from > ideaIntellij. below is what i see from localhost. Any help would be > appreciated. Thanks > [image: Inline images 1] > Br, signature.asc (201 bytes) Download Attachment |
(back to list)
state.checkpoints.dir is a configuration parameter which you set in the flink configuration itself (see [1]). This will be used for checkpoint metadata only (for RocksDB and Fs) while the checkpoints themselves are stored in the given directory. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ checkpoints.html#directory-structure On Tuesday, 11 July 2017 10:32:25 CEST Aftab Ansari wrote: > Thanks nico, > I am trying to go for externalized checkpoint. But the below codes throws > error: "Caused by: java.lang.IllegalStateException: CheckpointConfig says > to persist periodic checkpoints, but no checkpoint directory has been > configured. You can configure configure one via key 'state.checkpoints.dir" > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > //specify backend > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true)); > env.setStateBackend(new > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > //enable checkpoint > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.Ex > ternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > after stateBackend, do I need to configure another dir for > checkpoints? How can I set this configuration in main method like I > did for stateBackend ? > > BR, > > On 10 July 2017 at 17:06, Nico Kruber <[hidden email]> wrote: > > Hi Aftab, > > looks like what you want is either an externalized checkpoint with > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2]. > > > > Ordinary checkpoints are deleted when the job is cancelled and only serve > > as a > > fault tolerance layer in case something goes wrong, i.e. machines fail, so > > that the job can be restarted automatically based on the restart policy. > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > checkpoints.html > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > savepoints.html > > > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote: > > > Hi, > > > I am new to flink. I am facing issue implementing checkpoint. > > > > > > checkpoint related code: > > > > > > long checkpointInterval = 5000; > > > > > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > > > //specify backend > > > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), > > > > true)); > > > > > env.setStateBackend(new > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > > > > > > //enable checkpoint > > > env.enableCheckpointing(checkpointInterval, > > > > > > CheckpointingMode.EXACTLY_ONCE); > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > > > > > > > > > When I run the code, I can see flink-state being written in my local > > > machine. but when I stop the job , wait for a few minutes and restart > > > the > > > job, it does not pick up from the time it left but it starts from when I > > > started the job. > > > > > > Could you point out what i am doing wrong. I am testing it locally from > > > ideaIntellij. below is what i see from localhost. Any help would be > > > appreciated. Thanks > > > [image: Inline images 1] > > > Br, signature.asc (201 bytes) Download Attachment |
In reply to this post by Nico Kruber
A follow up question on this. I have a Complex Event processor implemented using the CEP library (1.3.0). The CEP library runs a variety of rules that are configured (enable/disable rule) VIA REST APIs. Now, if my application crashes and recovers (or is cancelled and restarted), will my configuration(as to which rules are enabled) still hold? or do I have to persist the info into a backend?On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote: Hi Aftab, |
>>>> The CEP library runs .... Correction; the CEP implemented using the CEP library runs .....On Wed, Jul 19, 2017 at 10:08 AM, Sridhar Chellappa <[hidden email]> wrote:
|
In reply to this post by Sridhar Chellappa
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field either. I don't see this enabling/disabling of rules in the CEP library overview at [1]. How do you do this? You'll probably have to create a stateful operator [2] to store this state in Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has some other workaround. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/ cep.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote: > A follow up question on this. I have a Complex Event processor implemented > using the CEP library (1.3.0). The CEP library runs a variety of rules that > are configured (enable/disable rule) VIA REST APIs. > > Now, if my application crashes and recovers (or is cancelled and > restarted), will my configuration(as to which rules are enabled) still > hold? or do I have to persist the info into a backend? > > On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote: > > Hi Aftab, > > looks like what you want is either an externalized checkpoint with > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2]. > > > > Ordinary checkpoints are deleted when the job is cancelled and only serve > > as a > > fault tolerance layer in case something goes wrong, i.e. machines fail, so > > that the job can be restarted automatically based on the restart policy. > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > checkpoints.html > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > savepoints.html > > > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote: > > > Hi, > > > I am new to flink. I am facing issue implementing checkpoint. > > > > > > checkpoint related code: > > > > > > long checkpointInterval = 5000; > > > > > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > > > //specify backend > > > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), > > > > true)); > > > > > env.setStateBackend(new > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > > > > > > //enable checkpoint > > > env.enableCheckpointing(checkpointInterval, > > > > > > CheckpointingMode.EXACTLY_ONCE); > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > > > > > > > > > When I run the code, I can see flink-state being written in my local > > > machine. but when I stop the job , wait for a few minutes and restart > > > the > > > job, it does not pick up from the time it left but it starts from when I > > > started the job. > > > > > > Could you point out what i am doing wrong. I am testing it locally from > > > ideaIntellij. below is what i see from localhost. Any help would be > > > appreciated. Thanks > > > [image: Inline images 1] > > > Br, signature.asc (201 bytes) Download Attachment |
Maybe to clear up some confusion here:
- Flink recovers from the latest checkpoint after a failure - If you stopping/cancelling a Flink job and submit the job again, it does not automatically pick up the latest checkpoint. Flink does not know that the second program is a continuation of the first program. - If you want to second program to resume from the last program, you need to start it with the option to continue from checkpoint/savepoint and pass a path to that checkpoint/savepoint: Stephan On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber <[hidden email]> wrote: Hi Sridhar, |
Hi Sridhar,
Stephan already covered the correct sequence of actions in order for your second program to know its correct starting point. As far as the active/inactive rules are concerned, as Nico pointed out you have to somehow store in the backend which rules are active and which are not upon checkpointing. If not, upon recovery your program will not be able to know which rules to apply and which to ignore. Hope this helps, Kostas
|
Free forum by Nabble | Edit this page |