Kubernetes Job Cluster - Checkpointing with Parallelism > 1

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Thad Truman

Hi all,

 

We are trying to configure checkpointing (RocksDb) for flink job clusters in k8s.  As described here we have a parallelism value that is used as the -Dparallelism.default arg in the job manager template as well as the replicas value in the task manager template.  For jobs where the parallelism value is set to 1 checkpointing works great.  But when we set the parallelism value to anything > 1 we get the below error:

 

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 4, slots allocated: 1

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)

                at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

                at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

                at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)

                at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

                at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

                at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)

                at akka.dispatch.OnComplete.internal(Future.scala:258)

                at akka.dispatch.OnComplete.internal(Future.scala:256)

                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)

                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)

                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

                at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)

                at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

                at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

                at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)

                at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

                at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

                at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

                at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

                at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

                at java.lang.Thread.run(Thread.java:748)

 

 

Any ideas on how we can remediate this?

 

Thanks,

 

Thad Truman | Software Engineer | Neovest, Inc.

A:

T:

E:

1145 S 800 E, Ste 310 Orem, UT 84097

+1 801 900 2480

[hidden email]

 

Support Desk: [hidden email] / +1 800 433 4276

 

Alt logo for white backgrounds (Grey Flat)2

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Thad Truman

Upgrading to Flink 1.6.2 from 1.6.0 appears to fix this, after making sure each job is writing checkpoints to a unique directory since the jobid’s all match.

 

Thad Truman | Software Engineer | Neovest, Inc.

A:

T:

E:

1145 S 800 E, Ste 310 Orem, UT 84097

+1 801 900 2480

[hidden email]

 

Support Desk: [hidden email] / +1 800 433 4276

 

Alt logo for white backgrounds (Grey Flat)2

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html

 

 

From: Thad Truman
Sent: Tuesday, November 6, 2018 9:38 AM
To: [hidden email]
Subject: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

 

Hi all,

 

We are trying to configure checkpointing (RocksDb) for flink job clusters in k8s.  As described here we have a parallelism value that is used as the -Dparallelism.default arg in the job manager template as well as the replicas value in the task manager template.  For jobs where the parallelism value is set to 1 checkpointing works great.  But when we set the parallelism value to anything > 1 we get the below error:

 

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 4, slots allocated: 1

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)

                at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

                at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

                at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)

                at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

                at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

                at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

                at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)

                at akka.dispatch.OnComplete.internal(Future.scala:258)

                at akka.dispatch.OnComplete.internal(Future.scala:256)

                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)

                at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)

                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

                at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)

                at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

                at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

                at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)

                at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

                at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

                at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

                at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

                at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

                at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

                at java.lang.Thread.run(Thread.java:748)

 

 

Any ideas on how we can remediate this?

 

Thanks,

 

Thad Truman | Software Engineer | Neovest, Inc.

A:

T:

E:

1145 S 800 E, Ste 310 Orem, UT 84097

+1 801 900 2480

[hidden email]

 

Support Desk: [hidden email] / +1 800 433 4276

 

Alt logo for white backgrounds (Grey Flat)2

This email is confidential and subject to important disclaimers and conditions including on offers for purchase or sale of securities accuracy and completeness of information viruses confidentiality legal privilege and legal entity disclaimers available at www.neovest.com/disclosures.html