This post was updated on .
Hi,
I am using following in code : 1. flink 1.4 2. running example on IDE 3. Enabled Exactly once semantics 4. Window Aggregation 5. Checkpoint is enabled at 10 Sec 6/ RocksDB as state backend Workflow : Kafka Source -> map -> keyBy -> Window(60 Sec) -> ApplyFunction -> Aggregated Record to Kafka Issues : I am having issues with checkpointing . If job reads few records from kafka and Window still needs to be evaluated , even then checkpointed is triggered and getting completed successfully. If i stop job after 30 seconds (by this kafka checkpoint is completed) and restart my job .. all inflight messages for window are getting lost . Flink is not restoring them from state backend. Attaching code . CheckpointTest1.java <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/CheckpointTest1.java> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi sohimankotia, Have you ever enableExternalizedCheckpoints[1]? // enable externalized checkpoints which are retained after job cancellation Best, Hequn On Tue, Oct 16, 2018 at 11:47 PM sohimankotia <[hidden email]> wrote: Hi, |
Hi Hequn,
I tried with following : Configuration conf = new Configuration(); conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1,conf); CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setParallelism(1); env.enableCheckpointing(20 * SECOND); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints")); Still issue persists. Any idea ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Do you mean that you stop your job manually and then start it? Checkpoints are used in case of failures and are 1) automatically not persisted across separate job runs (unless you set them to be externalized) 2) are not automatically picked up for starting your job. For your case when you stop and then want to start a job with a state from previous run you should use savepoints. For a more thorough explanation of those concepts please have a look here[1] Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint On 17/10/2018 05:37, sohimankotia wrote: > Hi Hequn, > > I tried with following : > > Configuration conf = new Configuration(); > > conf.setString("state.checkpoints.dir","file:///home/sohanvir/Desktop/flink/checkpoints2"); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1,conf); > CheckpointConfig config = env.getCheckpointConfig(); > > config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setParallelism(1); > env.enableCheckpointing(20 * SECOND); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new > RocksDBStateBackend("file:///home/sohanvir/Desktop/flink/checkpoints")); > > > Still issue persists. > > Any idea ? > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (849 bytes) Download Attachment |
Thanks . It solved problem.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |