Unable to recover from savepoint and checkpoint

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to recover from savepoint and checkpoint

Puneet Kinra-2
Hi 

Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0

Here I have created DummySource for testing

DummySource
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should)


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer @:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
Main App

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Unable to recover from savepoint and checkpoint

Puneet Kinra-2
Sorry for the missed information 

On recovery the value is coming as false instead of true, state.backend has been configured in flink-conf.yaml  along the
the path for checkpointing and savepoint.

On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <[hidden email]> wrote:
Hi 

Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0

Here I have created DummySource for testing

DummySource
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should)


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer @:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
Main App

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Unable to recover from savepoint and checkpoint

Gary Yao-5
Hi Puneet,

Can you describe how you validated that the state is not restored properly? Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <[hidden email]> wrote:
Sorry for the missed information 

On recovery the value is coming as false instead of true, state.backend has been configured in flink-conf.yaml  along the
the path for checkpointing and savepoint.

On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <[hidden email]> wrote:
Hi 

Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0

Here I have created DummySource for testing

DummySource
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should)


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer @:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
Main App

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Unable to recover from savepoint and checkpoint

Puneet Kinra-2
I killed the task manager and job manager forcefully by the kill -9 command and while recovering
I am checking the flag returned by the isRestored method in the Intializestate function.
 anyways I figured the issue and fixed it thanks for the support.

On Tue, Mar 3, 2020 at 7:24 PM Gary Yao <[hidden email]> wrote:
Hi Puneet,

Can you describe how you validated that the state is not restored properly? Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <[hidden email]> wrote:
Sorry for the missed information 

On recovery the value is coming as false instead of true, state.backend has been configured in flink-conf.yaml  along the
the path for checkpointing and savepoint.

On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <[hidden email]> wrote:
Hi 

Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0

Here I have created DummySource for testing

DummySource
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(30000L);
arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
}
}

}


---------------------------------------------------------------------------------------
KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should)


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState<Boolean> contacthistory;
private static final  Long  ONE_MINUTE=60000L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2<Long, String> input,
KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer @:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-------------------------------------------------
Main App

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(1);
// // advanced options:
// // set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// // make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// // checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// // allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// // enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource())
.name("AMQSource");
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
processedStream.print();
env.execute();
}
}
--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]