Hi!
Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen: In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail. The errors usually look like this: 20181220T141057.132+0100 INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out. [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137] 20181220T141057.133+0100 INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out. [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822] 20181220T141057.135+0100 INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825) at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342] The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange. I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. Has anyone observed anything similar so far? Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode. Thank you! Gyula |
Hi Gyula,
Your issue is possibly related to [1] that slots prematurely released. I’ve raised a PR which is still pending review. [1] https://issues.apache.org/jira/browse/FLINK-10941
|
In my case the problem seems to happen when a streaming job is recovering with large state. I dont really understand how it could be caused by what you described as that seems to be affecting batch jobs mostly. But I can easily be wrong, maybe there are other implications of the above issue. Gyula qi luo <[hidden email]> ezt írta (időpont: 2018. dec. 21., P, 3:35):
|
After some investigation it seems like there was a Thread leak in the job as the operator implementation shut down the threads in the close method which was never called in the failure scenario. (had to change to dispose) Sorry for the false alarm. Gyula On Fri, Dec 21, 2018 at 8:58 AM Gyula Fóra <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |