import java.io.Serializable; import java.util.HashMap; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.util.Collector; //Sample class how state need to stored class Product implements Serializable{ /** * */ private static final long serialVersionUID = 1L; private String value; public Product(String value) { super(); this.value = value; } @Override public String toString() { return "Product [value=" + value + "]"; } } //Sample Source Class class TestSource extends RichSourceFunction{ boolean flag=true; public TestSource() { super(); } @Override public void run(SourceContext ctx) throws Exception { while(flag) { Thread.sleep(6000); ctx.collect(new Product(Long.toString(System.currentTimeMillis()))); } } @Override public void cancel() { // TODO Auto-generated method stub this.flag=false; } } //Check Point Class class Checkpoint extends ProcessFunction implements CheckpointedFunction { private ListState checkpointedState; private Product state; @Override public void processElement(Product arg0, ProcessFunction.Context arg1, Collector arg2) throws Exception { // TODO Auto-generated method stub state=arg0; arg2.collect(state.toString()); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // TODO Auto-generated method stub checkpointedState.clear(); checkpointedState.add(state); System.out.println("Checkpointing value"+checkpointedState.get().toString()); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // TODO Auto-generated method stub ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("check-point-test",TypeInformation.of(new TypeHint(){})); checkpointedState= context.getOperatorStateStore().getListState(stateDescriptor); if(context.isRestored()) { System.out.println("Trying to restore"); System.out.println("Restored value"+checkpointedState.get().toString()); } } } //Test class public class TestCheckPointing { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(6000); FsStateBackend fsStateBackend = new FsStateBackend("file:///F://checkpointing", false); env.setStateBackend(fsStateBackend); // advanced options: // set mode to exactly-once (this is the default) //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); DataStreamSource addSource = env.addSource(new TestSource()); addSource.process(new Checkpoint()); env.execute(); } }