Window State is not being store on check-pointing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Window State is not being store on check-pointing

sohimankotia
This post was updated on .
Hi,

I am using following in code :

1. flink 1.4
2. running example on IDE
3. Enabled Exactly once semantics
4. Window Aggregation
5. Checkpoint is enabled at 10 Sec
6/ RocksDB as state backend


Workflow :

Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
Aggregated Record to Kafka

Issues :

I am having issues with checkpointing . If job reads few records from kafka
and Window still needs to be evaluated , even then checkpointed is triggered
and getting completed successfully.
If i stop job after 30 seconds (by this kafka checkpoint is completed) and
restart my job .. all inflight messages for window are getting lost . Flink
is not restoring them from state backend.

Attaching code .


CheckpointTest1.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window State is not being store on check-pointing

Hequn Cheng
Hi sohimankotia,

Have you ever enableExternalizedCheckpoints[1]?
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
Best, Hequn


On Tue, Oct 16, 2018 at 11:47 PM sohimankotia <[hidden email]> wrote:
Hi,

I am using following in code :

1. flink 1.4
2. running example on IDE
3. Enabled Exactly once semantics
4. Window Aggregation
5. Checkpoint is enabled at 20 Sec
6/ RocksDB as state backend


Workflow :

Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction ->
Aggregated Record to Kafka

Issues :

I am having issues with checkpointing . If job reads few records from kafka
and Window still needs to be evaluated , even then checkpointed is triggered
and getting completed successfully.
If i stop job after 30 seconds (by this kafka checkpoint is completed) and
restart my job .. all inflight messages for window are getting lost . Flink
is not restoring them from state backend.

Attaching code .


CheckpointTest1.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window State is not being store on check-pointing

sohimankotia
Hi Hequn,

I tried with following :

    Configuration conf = new Configuration();
   
conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1,conf);
    CheckpointConfig config = env.getCheckpointConfig();
   
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setParallelism(1);
    env.enableCheckpointing(20 * SECOND);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.setStateBackend(new
RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
   

Still issue persists.

Any idea ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window State is not being store on check-pointing

Dawid Wysakowicz-2
Hi,

Do you mean that you stop your job manually and then start it?
Checkpoints are used in case of failures and are 1) automatically not
persisted across separate job runs (unless you set them to be
externalized) 2) are not automatically picked up for starting your job.
For your case when you stop and then want to start a job with a state
from previous run you should use savepoints.

For a more thorough explanation of those concepts please have a look here[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

On 17/10/2018 05:37, sohimankotia wrote:

> Hi Hequn,
>
> I tried with following :
>
>     Configuration conf = new Configuration();
>    
> conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2");
>     final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1,conf);
>     CheckpointConfig config = env.getCheckpointConfig();
>    
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.setParallelism(1);
>     env.enableCheckpointing(20 * SECOND);
>    
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     env.setStateBackend(new
> RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints"));
>    
>
> Still issue persists.
>
> Any idea ?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window State is not being store on check-pointing

sohimankotia