Hi
Stuck with the simple program regarding the checkpointing Flink version I am using 1.10.0 Here I have created DummySource for testing DummySource package com.nudge.stateful; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{ /** * */ private static final long serialVersionUID = 1L; private Boolean isRunning=true; public BeaconSource() { super(); // TODO Auto-generated constructor stub } public void cancel() { // TODO Auto-generated method stub this.isRunning=false; } public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception { // TODO Auto-generated method stub while(isRunning) { Thread.sleep(30000L); arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource")); } } } --------------------------------------------------------------------------------------- KeyedProcessFunction (to register the timer and update the status to true so that only one-time trigger should) package com.nudge.stateful; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import scala.collection.mutable.LinkedHashMap; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; public class TimeProcessTrigger extends KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{ /** * */ private static final long serialVersionUID = 1L; /** * */ private transient ValueState<Boolean> contacthistory; private static final Long ONE_MINUTE=60000L; @Override public void onTimer(long timestamp, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.OnTimerContext ctx, Collector<String> out) throws Exception { // TODO Auto-generated method stub super.onTimer(timestamp, ctx, out); System.out.println("Timer has fired for the key"+ctx.getCurrentKey()); } @Override public void open(Configuration parameters) throws Exception { // TODO Auto-generated method stub super.open(parameters); ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<Boolean>( "contact-history", // the state name Boolean.class); // type information this.contacthistory=getRuntimeContext().getState(descriptor); } @Override public void processElement(Tuple2<Long, String> input, KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, Collector<String> collect) throws Exception { // TODO Auto-generated method stub System.out.println(this.contacthistory.value()); Boolean value = this.contacthistory.value(); if(value==null) { Long currentTime = ctx.timerService().currentProcessingTime(); Long regTimer=currentTime+ONE_MINUTE; System.out.println("Updating the flag and registering the timer @:"+regTimer); this.contacthistory.update(true); ctx.timerService().registerProcessingTimeTimer(regTimer); }else { System.out.println("Timer has already register for this key"); } } } ------------------------------------------------- Main App package com.nudge.stateful; -- import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.indiabulls.nudge.stateful.*; public class App { public static void main( String[] args ) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(30000); env.setParallelism(1); // // advanced options: // // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); // // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // // allow job recovery fallback to checkpoint when there is a more recent savepoint env.getCheckpointConfig().setPreferCheckpointForRecovery(true); SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = env.addSource(new BeaconSource()) .name("AMQSource"); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(1); KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0); SingleOutputStreamOperator<String> processedStream = keyedValues.process(new TimeProcessTrigger()).setParallelism(1); processedStream.print(); env.execute(); } } |
Sorry for the missed information On recovery the value is coming as false instead of true, state.backend has been configured in flink-conf.yaml along the the path for checkpointing and savepoint. On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <[hidden email]> wrote:
|
Hi Puneet, Can you describe how you validated that the state is not restored properly? Specifically, how did you introduce faults to the cluster? Best, Gary On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <[hidden email]> wrote:
|
I killed the task manager and job manager forcefully by the kill -9 command and while recovering I am checking the flag returned by the isRestored method in the Intializestate function. anyways I figured the issue and fixed it thanks for the support. On Tue, Mar 3, 2020 at 7:24 PM Gary Yao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |