Serious stability issues when running on YARN (Flink 1.7.0)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Serious stability issues when running on YARN (Flink 1.7.0)

Gyula Fóra
Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. 

It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail.

The errors usually look like this:
20181220T141057.132+0100  INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021  timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]

The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange.

I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. 

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode.

Thank you!
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Serious stability issues when running on YARN (Flink 1.7.0)

qi luo
Hi Gyula,

Your issue is possibly related to [1] that slots prematurely released. I’ve raised a PR which is still pending review.

[1] https://issues.apache.org/jira/browse/FLINK-10941

On Dec 20, 2018, at 9:33 PM, Gyula Fóra <[hidden email]> wrote:

Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. 

It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail.

The errors usually look like this:
20181220T141057.132+0100  INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021  timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]

The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange.

I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. 

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode.

Thank you!
Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Serious stability issues when running on YARN (Flink 1.7.0)

Gyula Fóra
In my case the problem seems to happen when a streaming job is recovering with large state. I dont really understand how it could be caused by what you described as that seems to be affecting batch jobs mostly.

But I can easily be wrong, maybe there are other implications of the above issue.

Gyula

qi luo <[hidden email]> ezt írta (időpont: 2018. dec. 21., P, 3:35):
Hi Gyula,

Your issue is possibly related to [1] that slots prematurely released. I’ve raised a PR which is still pending review.



On Dec 20, 2018, at 9:33 PM, Gyula Fóra <[hidden email]> wrote:

Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. 

It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail.

The errors usually look like this:
20181220T141057.132+0100  INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021  timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]

The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange.

I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. 

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode.

Thank you!
Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Serious stability issues when running on YARN (Flink 1.7.0)

Gyula Fóra
After some investigation it seems like there was a Thread leak in the job as the operator implementation shut down the threads in the close method which was never called in the failure scenario. (had to change to dispose)

Sorry for the false alarm.

Gyula

On Fri, Dec 21, 2018 at 8:58 AM Gyula Fóra <[hidden email]> wrote:
In my case the problem seems to happen when a streaming job is recovering with large state. I dont really understand how it could be caused by what you described as that seems to be affecting batch jobs mostly.

But I can easily be wrong, maybe there are other implications of the above issue.

Gyula

qi luo <[hidden email]> ezt írta (időpont: 2018. dec. 21., P, 3:35):
Hi Gyula,

Your issue is possibly related to [1] that slots prematurely released. I’ve raised a PR which is still pending review.



On Dec 20, 2018, at 9:33 PM, Gyula Fóra <[hidden email]> wrote:

Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. 

It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail.

The errors usually look like this:
20181220T141057.132+0100  INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021  timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]

The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange.

I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. 

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode.

Thank you!
Gyula