YarnResourceManager unresponsive under heavy containers allocations

Posted by qi luo on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/YarnResourceManager-unresponsive-under-heavy-containers-allocations-tp28640.html

Hi guys,

We’re using latest version Flink YarnResourceManager, but our job startup occasionally hangs when allocating many Yarn containers (e.g. >1000). I checked the related code in YarnResourceManager as below:


It seems that it handles all allocated containers and starts TM in main thread. Thus when containers allocations are heavy, the RM thread becomes unresponsive (such as no response to TM heartbeats, see TM logs as below).

Any idea on how to better handle such case (e.g. multi-threading to handle allocated containers) would be very appreciated. Thanks!

Regards,
Qi 


———————————————————————— 
TM log:

2019-07-09 13:56:59,110 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager <a href="akka.tcp://flink@xxx/user/resourcemanager(00000000000000000000000000000000)" class="">akka.tcp://flink@xxx/user/resourcemanager(00000000000000000000000000000000).
2019-07-09 14:00:01,138 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@xxx/user/resourcemanager" class="">akka.tcp://flink@xxx/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@xxx/" class="">akka.tcp://flink@xxx/), Path(/user/resourcemanager)]] after [182000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-07-09 14:01:59,137 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor            - Fatal error occurred in TaskExecutor <a href="akka.tcp://flink@xxx/user/taskmanager_0" class="">akka.tcp://flink@xxx/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1023)
	at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1009)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)