Savepoint failed with error "Checkpoint expired before completing"

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Savepoint failed with error "Checkpoint expired before completing"

Gagan Agrawal
Hi,
We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters

-yn 10  (10 task managers)
-ytm 2048 (2 GB each)
- Operator parallelism is also 10.

While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.

org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

徐涛
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
> at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
> at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
> at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Gagan Agrawal
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Yun Tang
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].



Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Gagan Agrawal
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Yun Tang
Haha, actually externalized checkpoint also support parallelism changes, you could read my email posted in dev-mail-list.

Best
Yun Tang

From: Gagan Agrawal <[hidden email]>
Sent: Thursday, November 1, 2018 13:38
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Gagan Agrawal
Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <[hidden email]> wrote:
Haha, actually externalized checkpoint also support parallelism changes, you could read my email posted in dev-mail-list.

Best
Yun Tang

From: Gagan Agrawal <[hidden email]>
Sent: Thursday, November 1, 2018 13:38
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Steven Wu
FYI, here is the jira to support timeout in savepoint REST api

On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal <[hidden email]> wrote:
Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <[hidden email]> wrote:
Haha, actually externalized checkpoint also support parallelism changes, you could read my email posted in dev-mail-list.

Best
Yun Tang

From: Gagan Agrawal <[hidden email]>
Sent: Thursday, November 1, 2018 13:38
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint failed with error "Checkpoint expired before completing"

Gagan Agrawal
Good to know that Steven. It will be useful feature to have separate time out configs for both.

Gagan

On Mon, Nov 5, 2018, 10:06 Steven Wu <[hidden email] wrote:
FYI, here is the jira to support timeout in savepoint REST api

On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal <[hidden email]> wrote:
Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <[hidden email]> wrote:
Haha, actually externalized checkpoint also support parallelism changes, you could read my email posted in dev-mail-list.

Best
Yun Tang

From: Gagan Agrawal <[hidden email]>
Sent: Thursday, November 1, 2018 13:38
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03
To: [hidden email]
Cc: [hidden email]
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)