data loss after implementing checkpoint

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

data loss after implementing checkpoint

Aftab Ansari

Hi,
I am new to flink. I am facing issue implementing checkpoint.

checkpoint related code:
long checkpointInterval = 5000;

StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
//specify backend
//env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
env.setStateBackend(new FsStateBackend("file:///Users/aftabansari/flink-state/", true));
//enable checkpoint
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

When I run the code, I can see flink-state being written in my local machine. but when I stop the job , wait for a few minutes and restart the job, it does not pick up from the time it left but it starts from when I started the job.

Could you point out what i am doing wrong. I am testing it locally from ideaIntellij. below is what i see from localhost. Any help would be appreciated. Thanks
Inline images 1
Br,
--

Aftab Ansari                   

Analytics Developer

[hidden email]

rovio_logo.gif

Rovio Entertainment Ltd.

Keilaranta 7, FIN - 02150 Espoo, Finland

Mobile: + 358 (0)46 923 3060

www.rovio.com

Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Kien Truong

Hi,

I think you need to create a savepoint and restore from there.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html


Checkpoint are for automatic recovery within the lifetime of a job, they're deleted when you stop the job manually.

Regards,

Kien


On 7/10/17 7:20 PM, Aftab Ansari wrote:

Hi,
I am new to flink. I am facing issue implementing checkpoint.

checkpoint related code:
long checkpointInterval = 5000;

 StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
 //specify backend
 //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
env.setStateBackend(new FsStateBackend("file:///Users/aftabansari/flink-state/", true));
 //enable checkpoint
 env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

When I run the code, I can see flink-state being written in my local machine. but when I stop the job , wait for a few minutes and restart the job, it does not pick up from the time it left but it starts from when I started the job.

Could you point out what i am doing wrong. I am testing it locally from ideaIntellij. below is what i see from localhost. Any help would be appreciated. Thanks
Inline
            images 1
Br,
--

Aftab Ansari                   

Analytics Developer

[hidden email]

rovio_logo.gif

Rovio Entertainment Ltd.

Keilaranta 7, FIN - 02150 Espoo, Finland

Mobile: + 358 (0)46 923 3060

www.rovio.com


Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Nico Kruber
In reply to this post by Aftab Ansari
Hi Aftab,
looks like what you want is either an externalized checkpoint with
RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].

Ordinary checkpoints are deleted when the job is cancelled and only serve as a
fault tolerance layer in case something goes wrong, i.e. machines fail, so
that the job can be restarted automatically based on the restart policy.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html

On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:

> Hi,
> I am new to flink. I am facing issue implementing checkpoint.
>
> checkpoint related code:
>
> long checkpointInterval = 5000;
>
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.enableCheckpointing(checkpointInterval,
> CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
>
> When I run the code, I can see flink-state being written in my local
> machine. but when I stop the job , wait for a few minutes and restart the
> job, it does not pick up from the time it left but it starts from when I
> started the job.
>
> Could you point out what i am doing wrong. I am testing it locally from
> ideaIntellij. below is what i see from localhost. Any help would be
> appreciated. Thanks
> [image: Inline images 1]
> Br,


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Nico Kruber
(back to list)

state.checkpoints.dir is a configuration parameter which you set in the flink
configuration itself (see [1]). This will be used for checkpoint metadata only
(for RocksDB and Fs) while the checkpoints themselves are stored in the given
directory.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html#directory-structure

On Tuesday, 11 July 2017 10:32:25 CEST Aftab Ansari wrote:

> Thanks nico,
> I am trying to go for externalized checkpoint. But the below codes throws
> error: "Caused by: java.lang.IllegalStateException: CheckpointConfig says
> to persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key 'state.checkpoints.dir"
>
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.Ex
> ternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> after stateBackend, do I need to configure another dir for
> checkpoints? How can I set this configuration in main method like I
> did for stateBackend ?
>
> BR,
>
> On 10 July 2017 at 17:06, Nico Kruber <[hidden email]> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> >
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> >
> >
> > Nico
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> >
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > >
> > > checkpoint related code:
> > >
> > > long checkpointInterval = 5000;
> > >
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> >
> > true));
> >
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > >
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > >
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > >
> > >
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > >
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Sridhar Chellappa
In reply to this post by Nico Kruber
A follow up question on this. I have a Complex Event processor implemented using the CEP library (1.3.0). The CEP library runs a variety of rules that are configured (enable/disable rule) VIA REST APIs.

Now, if my application crashes and recovers (or is cancelled and restarted), will my configuration(as to which rules are enabled) still hold? or do I have to persist the info into a backend?


On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote:
Hi Aftab,
looks like what you want is either an externalized checkpoint with
RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].

Ordinary checkpoints are deleted when the job is cancelled and only serve as a
fault tolerance layer in case something goes wrong, i.e. machines fail, so
that the job can be restarted automatically based on the restart policy.


Nico

[1] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ checkpoints.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html
[2] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ savepoints.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html

On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> Hi,
> I am new to flink. I am facing issue implementing checkpoint.
>
> checkpoint related code:
>
> long checkpointInterval = 5000;
>
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.enableCheckpointing(checkpointInterval,
> CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
>
> When I run the code, I can see flink-state being written in my local
> machine. but when I stop the job , wait for a few minutes and restart the
> job, it does not pick up from the time it left but it starts from when I
> started the job.
>
> Could you point out what i am doing wrong. I am testing it locally from
> ideaIntellij. below is what i see from localhost. Any help would be
> appreciated. Thanks
> [image: Inline images 1]
> Br,


Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Sridhar Chellappa
>>>> The CEP library runs ....
Correction; the CEP implemented using the CEP library runs .....

On Wed, Jul 19, 2017 at 10:08 AM, Sridhar Chellappa <[hidden email]> wrote:
A follow up question on this. I have a Complex Event processor implemented using the CEP library (1.3.0). The CEP library runs a variety of rules that are configured (enable/disable rule) VIA REST APIs.

Now, if my application crashes and recovers (or is cancelled and restarted), will my configuration(as to which rules are enabled) still hold? or do I have to persist the info into a backend?


On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote:
Hi Aftab,
looks like what you want is either an externalized checkpoint with
RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].

Ordinary checkpoints are deleted when the job is cancelled and only serve as a
fault tolerance layer in case something goes wrong, i.e. machines fail, so
that the job can be restarted automatically based on the restart policy.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html


On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> Hi,
> I am new to flink. I am facing issue implementing checkpoint.
>
> checkpoint related code:
>
> long checkpointInterval = 5000;
>
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.enableCheckpointing(checkpointInterval,
> CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
>
> When I run the code, I can see flink-state being written in my local
> machine. but when I stop the job , wait for a few minutes and restart the
> job, it does not pick up from the time it left but it starts from when I
> started the job.
>
> Could you point out what i am doing wrong. I am testing it locally from
> ideaIntellij. below is what i see from localhost. Any help would be
> appreciated. Thanks
> [image: Inline images 1]
> Br,



Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Nico Kruber
In reply to this post by Sridhar Chellappa
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field
either.

I don't see this enabling/disabling of rules in the CEP library overview at
[1]. How do you do this?

You'll probably have to create a stateful operator [2] to store this state in
Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
some other workaround.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
cep.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:

> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
>
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
>
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> >
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> >
> >
> > Nico
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> >
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > >
> > > checkpoint related code:
> > >
> > > long checkpointInterval = 5000;
> > >
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> >
> > true));
> >
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > >
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > >
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > >
> > >
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > >
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Stephan Ewen
Maybe to clear up some confusion here:

  - Flink recovers from the latest checkpoint after a failure

  - If you stopping/cancelling a Flink job and submit the job again, it does not automatically pick up the latest checkpoint. Flink does not know that the second program is a continuation of the first program.

  - If you want to second program to resume from the last program, you need to start it with the option to continue from checkpoint/savepoint and pass a path to that checkpoint/savepoint:

Stephan


On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber <[hidden email]> wrote:
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field
either.

I don't see this enabling/disabling of rules in the CEP library overview at
[1]. How do you do this?

You'll probably have to create a stateful operator [2] to store this state in
Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
some other workaround.


Nico

[1] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/ cep.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
cep.html
[2] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
>
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
>
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> >
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> >
> >
> > Nico
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> >
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > >
> > > checkpoint related code:
> > >
> > > long checkpointInterval = 5000;
> > >
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> >
> > true));
> >
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > >
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > >
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > >
> > >
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > >
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,


Reply | Threaded
Open this post in threaded view
|

Re: data loss after implementing checkpoint

Kostas Kloudas
Hi Sridhar,

Stephan already covered the correct sequence of actions in order for your second program
to know its correct starting point.

As far as the active/inactive rules are concerned, as Nico pointed out you have to somehow 
store in the backend which rules are active and which are not upon checkpointing. If not, upon 
recovery your program will not be able to know which rules to apply and which to ignore.

Hope this helps,
Kostas

On Jul 31, 2017, at 10:27 PM, Stephan Ewen <[hidden email]> wrote:

Maybe to clear up some confusion here:

  - Flink recovers from the latest checkpoint after a failure

  - If you stopping/cancelling a Flink job and submit the job again, it does not automatically pick up the latest checkpoint. Flink does not know that the second program is a continuation of the first program.

  - If you want to second program to resume from the last program, you need to start it with the option to continue from checkpoint/savepoint and pass a path to that checkpoint/savepoint:

Stephan


On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber <[hidden email]> wrote:
Hi Sridhar,
sorry for not coming back to you earlier and tbh, I'm no expert on this field
either.

I don't see this enabling/disabling of rules in the CEP library overview at
[1]. How do you do this?

You'll probably have to create a stateful operator [2] to store this state in
Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
some other workaround.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
cep.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html


On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> A follow up question on this. I have a Complex Event processor implemented
> using the CEP library (1.3.0). The CEP library runs a variety of rules that
> are configured (enable/disable rule) VIA REST APIs.
>
> Now, if my application crashes and recovers (or is cancelled and
> restarted), will my configuration(as to which rules are enabled) still
> hold? or do I have to persist the info into a backend?
>
> On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber <[hidden email]> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> >
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> >
> >
> > Nico
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> >
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > >
> > > checkpoint related code:
> > >
> > > long checkpointInterval = 5000;
> > >
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("<a href="s3n://xxx/" class="">s3n://xxx/flink-state/"),
> >
> > true));
> >
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > >
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > >
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > >
> > >
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > >
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,