Hi, We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters -yn 10 (10 task managers) -ytm 2048 (2 GB each) - Operator parallelism is also 10. While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help. org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) |
Hi Gagan,
I have met with the error the checkpoint timeout too. In my case, it is not due to big checkpoint size, but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink. Please check if it is the case you have met. Best Henry > 在 2018年10月30日,下午6:07,Gagan Agrawal <[hidden email]> 写道: > > Hi, > We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters > > -yn 10 (10 task managers) > -ytm 2048 (2 GB each) > - Operator parallelism is also 10. > > While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help. > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed. > at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714) > at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692) > at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979) > at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059) > at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) > Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing > at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955) > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing > at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) |
Hi Henry, Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem. Gagan On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote: Hi Gagan, |
Hi Gagan
Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
Best
Yun Tang
From: Gagan Agrawal <[hidden email]>
Sent: Wednesday, October 31, 2018 19:03 To: [hidden email] Cc: [hidden email] Subject: Re: Savepoint failed with error "Checkpoint expired before completing" Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.
Gagan
On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <[hidden email]> wrote:
Hi Gagan, |
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes. Gagan On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
|
Haha, actually externalized checkpoint also support parallelism changes, you could read
my email posted in dev-mail-list.
Best
Yun Tang
From: Gagan Agrawal <[hidden email]>
Sent: Thursday, November 1, 2018 13:38 To: [hidden email] Cc: [hidden email]; [hidden email] Subject: Re: Savepoint failed with error "Checkpoint expired before completing" Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.
Gagan
On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <[hidden email]> wrote:
|
Great, thanks for sharing that info. Gagan On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <[hidden email]> wrote:
|
FYI, here is the jira to support timeout in savepoint REST api On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal <[hidden email]> wrote:
|
Good to know that Steven. It will be useful feature to have separate time out configs for both. Gagan On Mon, Nov 5, 2018, 10:06 Steven Wu <[hidden email] wrote:
|
Free forum by Nabble | Edit this page |