- How large is the state that you are checkpointing?
[CVP] - I have enabled checkpointing on the StreamEnvironment as below.
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
streamEnv.enableCheckpointing(10000);
In terms of the state stored, the KS1 stream has payload of 100K events/second, while KS2 have about 1 event / 10 minutes... basically the operators perform flatmaps on 8 fields of tuple (all fields are primitives). If you look at the states' sizes in dashboard they are in Kb...
- Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure?
[CVP] -There are no back pressure atleast from the sample computation in the flink dashboard. 100K/second is low load for flink's benchmarks. I could not quite get the barriers vs snapshot state. I have attached the Task Manager log (DEBUG) info if that will interest you.
I have attached the checkpoints times' as .png from the dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the checkpoints take more than a minute in each case. Before these checkpoints, the KS2 stream did not have any events. As soon as an event(should be in bytes) was generated, the checkpoints went slow and subsequently a minute more for every checkpoint thereafter.
This log was collected from the standalone flink cluster with 1 job manager & 2 TMs. 1 TM was running this application with checkpointing (parallelism=1)
Please let me know if you need further info.,