Hi,
One of our Flink jobs has a lot of tiny Flink Jobs (and some larger jobs) associated with it that then request and release resources as need as per the FLIP-6 mode. Internally
we track how much parallelism we’ve used before submitting the new job so that we’re bounded by the expected top cap. What we found was that the job intermittently holds onto 20-40x what is expected and thereby eating into our cluster’s overall resources.
It seems as if Flink isn’t releasing the resources back to Yarn quickly enough for these.
As an immediate stop gap, what I tried doing was just to revert to using legacy mode hoping that the resource utilization is then at least constant as per the number of task
managers + slots + memory allocated. However, we then ran into this issue. Why would the client’s pending container requests still be 60 when Yarn shows it’s been allocated? What can we do here?
org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy - Actor failed with exception. Stopping it now.
java.lang.IllegalStateException: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container
requests 60 != Number RM's pending container requests 0.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:217)
at org.apache.flink.yarn.YarnFlinkResourceManager.getPendingRequests(YarnFlinkResourceManager.java:520)
at org.apache.flink.yarn.YarnFlinkResourceManager.containersAllocated(YarnFlinkResourceManager.java:449)
at org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:227)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
JobManager logs: (full logs also attached)
2019-10-22 11:36:52,733 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Received new container: container_e102_1569128826219_23941567_01_000002 - Remaining
pending container requests: 118
2019-10-22 11:36:52,734 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758612734: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000002, NodeId: d49111-041.dc.gs.com:45454, NodeHttpAddress: d49111-041.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.59.83.235:45454
}, ] on host d49111-041.dc.gs.com
2019-10-22 11:36:52,736 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : d49111-041.dc.gs.com:45454
2019-10-22 11:36:52,784 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Received new container: container_e102_1569128826219_23941567_01_000003 - Remaining
pending container requests: 116
2019-10-22 11:36:52,784 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758612784: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000003, NodeId: d49111-162.dc.gs.com:45454, NodeHttpAddress: d49111-162.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.59.72.254:45454
}, ] on host d49111-162.dc.gs.com
….
Received new container: container_e102_1569128826219_23941567_01_000066 - Remaining pending container requests: 2
2019-10-22 11:36:53,409 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758613409: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000066, NodeId: d49111-275.dc.gs.com:45454, NodeHttpAddress: d49111-275.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.50.199.239:45454
}, ] on host d49111-275.dc.gs.com
2019-10-22 11:36:53,411 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : d49111-275.dc.gs.com:45454
2019-10-22 11:36:53,418 INFO org.apache.flink.yarn.YarnFlinkResourceManager -
Received new container: container_e102_1569128826219_23941567_01_000067 - Remaining pending container requests: 0
2019-10-22 11:36:53,418 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758613418: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000067, NodeId: d49111-409.dc.gs.com:45454, NodeHttpAddress: d49111-409.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.59.40.203:45454
}, ] on host d49111-409.dc.gs.com
2019-10-22 11:36:53,420 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : d49111-409.dc.gs.com:45454
2019-10-22 11:36:53,430 INFO org.apache.flink.yarn.YarnFlinkResourceManager -
Received new container: container_e102_1569128826219_23941567_01_000070 - Remaining pending container requests: 0
2019-10-22 11:36:53,430 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758613430: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000070, NodeId: d49111-167.dc.gs.com:45454, NodeHttpAddress: d49111-167.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.51.138.251:45454
}, ] on host d49111-167.dc.gs.com
2019-10-22 11:36:53,432 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : d49111-167.dc.gs.com:45454
2019-10-22 11:36:53,439 INFO org.apache.flink.yarn.YarnFlinkResourceManager -
Received new container: container_e102_1569128826219_23941567_01_000072 - Remaining pending container requests: 0
2019-10-22 11:36:53,440 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758613439: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000072, NodeId: d49111-436.dc.gs.com:45454, NodeHttpAddress: d49111-436.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.59.235.176:45454
}, ] on host d49111-436.dc.gs.com
2019-10-22 11:36:53,441 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : d49111-436.dc.gs.com:45454
2019-10-22 11:36:53,449 INFO org.apache.flink.yarn.YarnFlinkResourceManager -
Received new container: container_e102_1569128826219_23941567_01_000073 - Remaining pending container requests: 0
2019-10-22 11:36:53,449 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Launching TaskManager in container ContainerInLaunch @ 1571758613449: Container:
[ContainerId: container_e102_1569128826219_23941567_01_000073, NodeId: d49111-387.dc.gs.com:45454, NodeHttpAddress: d49111-387.dc.gs.com:8042, Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: ContainerToken, service: 10.51.136.247:45454
}, ] on host d49111-387.dc.gs.com
…..
Thanks,
Regina
Free forum by Nabble | Edit this page |