Hi,
with Flink 1.9 running in docker mode, I have a batch job and got the following error message. However, it works totally fine with the same code on EMR. I checked the log and here is the only difference: managedMemoryInMB=138 . (the working ones has 0 value) did anybody see this before? Thanks, Fanbin org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:357) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.flink.runtime.concurrent.FutureUtils.whenCompleteAsyncIfNotDone(FutureUtils.java:940) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestSlotFromResourceManager(SlotPoolImpl.java:339) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestNewAllocatedSlotInternal(SlotPoolImpl.java:306) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestNewAllocatedBatchSlot(SlotPoolImpl.java:448) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.requestNewAllocatedSlot(SchedulerImpl.java:262) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:542) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:341) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:168) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateBatchSlot(SchedulerImpl.java:129) at org.apache.flink.runtime.executiongraph.SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:109) at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:556) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:554) at org.apache.flink.runtime.executiongraph.Execution.allocateResourcesForExecution(Execution.java:496) at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:439) at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:674) at org.apache.flink.runtime.executiongraph.Execution.scheduleConsumer(Execution.java:850) at org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:887) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1064) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1548) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1521) at org.apache.flink.runtime.scheduler.LegacyScheduler.updateTaskExecutionState(LegacyScheduler.java:289) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:377) at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request c7de65260c8d428b2e295e5afb205242. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:214) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) at com.sun.proxy.$Proxy8.requestSlot(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestSlotFromResourceManager(SlotPoolImpl.java:334) ... 48 more Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not fulfill slot request c7de65260c8d428b2e295e5afb205242. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:315) at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:443) at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) ... 24 more Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request c7de65260c8d428b2e295e5afb205242. Requested resource profile (ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=138}) is unfulfillable. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:768) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:310) ... 26 more |
Hi Fanbin, The blink planner batch sql operators requires managed memory, and the amount of managed memory needed depends on your job. The failure is because the slot, according to your cluster configurations, does not have enough managed memory to fulfill the requests. To fix the problem, you would need to configure more managed memory for your task executors. You can set the config option "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in your case) * numberOfSlots'. It's not clear to me why the exactly same code works on emr. Were you running the same version of flink? Thank you~ Xintong Song On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu <[hidden email]> wrote:
|
Xintong, Thanks for looking into this. I changed docker setting of #CPUs to a lower number and it works now. I was using the same code and same flink version. The reason that it works on EMR is that I'm using a machine with large memory. According to the doc: JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value. The default value for JVM heap size is 1024m and I was configuring docker to have 6 CPUs and that failed blink batch jobs. Thanks for your help! Fanbin On Tue, Jan 7, 2020 at 7:51 PM Xintong Song <[hidden email]> wrote:
|
Hi Fanbin, On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.If I understand correctly, you are running Flink standalone cluster both in docker and on EMR? If that is the case, then this sentence has nothing to do with your case, because it's describing about Yarn deployment. It should also be irrelevant that you are using a machine with larger memory on EMR, as long as the "taskmanager.heap.size" are the same. In your case, I assume for both scenarios the default 1024m is used? If "taskmanager.memory.size" is not explicitly specified, Flink will automatically decide the managed memory size. The derived size of managed memory is depended on the JVM free memory after launching the TM. Flink will trigger a "System.gc()" after the TM is started, and read the JVM free heap size after it. I guess the reason decreasing docker cpu cores works might be that, less cpu cores somehow results in less heap memory consumption, leaving more free heap memory, thus more managed memory. AFAIK, there are several places in TM where Flink read the system cpu cores and decide thread pool sizes accordingly. But this is just my guess and I cannot confirm it. I would suggest you to configure "taskmanager.memory.size" explicitly anyway, to avoid potential problems caused by the uncertainty of JVM free heap memory size. BTW, this randomness is eliminated in Flink 1.10. Thank you~ Xintong Song On Thu, Jan 9, 2020 at 3:04 AM Fanbin Bu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |