Yarn terminating TM for pmem limit cascades causing all jobs to fail

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Yarn terminating TM for pmem limit cascades causing all jobs to fail

Shannon Carey
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to exceeding pmem limits and all jobs failing as a result. I thought I had successfully disabled that check, but apparently the property doesn't work as expected in EMR.

From what I can tell in the logs, it looks like after the first TM was killed by Yarn, the jobs failed and were retried. However, when they are retried they cause increased pmem load on yet another TM, which results in Yarn killing another TM. That caused the jobs to fail again. This happened 5 times until our job retry policy gave up and allowed the jobs to fail permanently. Obviously, this situation is very problematic because it results in the loss of all job state, plus it requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the job ... The slot in which the task was executed has been released. Probably loss of TaskManager" or due to "Could not restart the job … Connection unexpectedly closed by remote task manager … This might indicate that the remote task manager was lost." Those are only the final failure causes: Flink does not appear to log the cause of intermediate restart failures.

I assume that the messages logged from the JobManager about "Association with remote system … has failed, address is now gated for [5000] ms. Reason is: [Disassociated]." is due to the TM failing, and is expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am wondering if this is related: https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do ? I don't see any log messages about quarantined TMs…

Do you think that increasing the # of job retries so that the jobs don't fail until all TMs are replaced with fresh ones fix this issue? The "memory.percent-free" metric from Collectd did go down to 2-3% on the TMs before they failed, and shot back up to 30-40% on TM restart (though I'm not sure how much of that had to do with the loss of state).  So, memory usage may be a real problem, but we don't get an OOM exception so I'm not sure we can control this from the JVM perspective. Are there other memory adjustments we should make which would allow our TMs to run for long periods of time without having this problem? Is there perhaps a memory leak in RocksDB?

Thanks for any help you can provide,
Shannon
Reply | Threaded
Open this post in threaded view
|

Re: Yarn terminating TM for pmem limit cascades causing all jobs to fail

Stephan Ewen
Hi Shannon!

Increasing the number of retries is definitely a good idea.

The fact that you see increasing pmem use after failures / retries - let's dig into that. There are various possible leaks depending on what you use:

  (1) There may be a leak in class-loading (or specifically class unloading). 1.1.x dynamically loads code when tasks are (re)started. This requires that code can be unloaded, which means that tasks (after being cancelled) must have no more references to the classes. Class leaks typically come when you spawn threads (or use libraries that spawn threads) but do not shut them down when tasks are cancelled.

    You can check this in the Flink UI by looking at the non-heap memory consumption of the TaskManagers. In case of that type of leak, that number should continuously grow.

    1.2.x does not re-load code on each task restart in the Yarn per-job mode.


  (2) There may be a leak in the native memory allocation of some library you use, such as Netty or so.


  (3) As for a RocksDB leak - I am not directly aware of a known leak in 1.1.x, but the RocksDB code has been improved quite a bit from 1.1.x to 1.2.x. It may be worth checking out 1.2.x to see if that fixes the issue.


The "Association with remote system … has failed, address is now gated for [5000] ms. Reason is: [Disassociated]." is what akka logs if a remote system is lost - hence a normal artifact of taskmanager failures.

Greetings,
Stephan



On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey <[hidden email]> wrote:
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to exceeding pmem limits and all jobs failing as a result. I thought I had successfully disabled that check, but apparently the property doesn't work as expected in EMR.

From what I can tell in the logs, it looks like after the first TM was killed by Yarn, the jobs failed and were retried. However, when they are retried they cause increased pmem load on yet another TM, which results in Yarn killing another TM. That caused the jobs to fail again. This happened 5 times until our job retry policy gave up and allowed the jobs to fail permanently. Obviously, this situation is very problematic because it results in the loss of all job state, plus it requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the job ... The slot in which the task was executed has been released. Probably loss of TaskManager" or due to "Could not restart the job … Connection unexpectedly closed by remote task manager … This might indicate that the remote task manager was lost." Those are only the final failure causes: Flink does not appear to log the cause of intermediate restart failures.

I assume that the messages logged from the JobManager about "Association with remote system … has failed, address is now gated for [5000] ms. Reason is: [Disassociated]." is due to the TM failing, and is expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am wondering if this is related: https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do ? I don't see any log messages about quarantined TMs…

Do you think that increasing the # of job retries so that the jobs don't fail until all TMs are replaced with fresh ones fix this issue? The "memory.percent-free" metric from Collectd did go down to 2-3% on the TMs before they failed, and shot back up to 30-40% on TM restart (though I'm not sure how much of that had to do with the loss of state).  So, memory usage may be a real problem, but we don't get an OOM exception so I'm not sure we can control this from the JVM perspective. Are there other memory adjustments we should make which would allow our TMs to run for long periods of time without having this problem? Is there perhaps a memory leak in RocksDB?

Thanks for any help you can provide,
Shannon

Reply | Threaded
Open this post in threaded view
|

回复:Yarn terminating TM for pmem limit cascades causing all jobs to fail

Zhijiang(wangzhijiang999)
In reply to this post by Shannon Carey
Hi Shannon,

       Have you tried to increase the total memory size for task manager container?  Maybe the maximum memory requirement is beyond your current setting.
And also you should check your UDF would not consume memory increasingly which would not be recycled.
        
        If your UDF is not consuming much memory and the container still exceeds pmem limits as a result after increase the memory size , that may indicate the memory leak. But you did not get OOM exception, so it is not related to heap memory issue, maybe the native memory causes this problem. RocksDB will use native memory, so you can try to upgrade the version as Stephan's suggestions.  Good luck!

Cheers,
zhijiang
------------------------------------------------------------------
发件人:Stephan Ewen <[hidden email]>
发送时间:2017年4月19日(星期三) 21:25
收件人:Shannon Carey <[hidden email]>
主 题:Re: Yarn terminating TM for pmem limit cascades causing all jobs to fail

Hi Shannon!

Increasing the number of retries is definitely a good idea.

The fact that you see increasing pmem use after failures / retries - let's dig into that. There are various possible leaks depending on what you use:

  (1) There may be a leak in class-loading (or specifically class unloading). 1.1.x dynamically loads code when tasks are (re)started. This requires that code can be unloaded, which means that tasks (after being cancelled) must have no more references to the classes. Class leaks typically come when you spawn threads (or use libraries that spawn threads) but do not shut them down when tasks are cancelled.

    You can check this in the Flink UI by looking at the non-heap memory consumption of the TaskManagers. In case of that type of leak, that number should continuously grow.

    1.2.x does not re-load code on each task restart in the Yarn per-job mode.


  (2) There may be a leak in the native memory allocation of some library you use, such as Netty or so.


  (3) As for a RocksDB leak - I am not directly aware of a known leak in 1.1.x, but the RocksDB code has been improved quite a bit from 1.1.x to 1.2.x. It may be worth checking out 1.2.x to see if that fixes the issue.


The "Association with remote system … has failed, address is now gated for [5000] ms. Reason is: [Disassociated]." is what akka logs if a remote system is lost - hence a normal artifact of taskmanager failures.

Greetings,
Stephan



On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey <[hidden email]> wrote:
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to exceeding pmem limits and all jobs failing as a result. I thought I had successfully disabled that check, but apparently the property doesn't work as expected in EMR.

From what I can tell in the logs, it looks like after the first TM was killed by Yarn, the jobs failed and were retried. However, when they are retried they cause increased pmem load on yet another TM, which results in Yarn killing another TM. That caused the jobs to fail again. This happened 5 times until our job retry policy gave up and allowed the jobs to fail permanently. Obviously, this situation is very problematic because it results in the loss of all job state, plus it requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the job ... The slot in which the task was executed has been released. Probably loss of TaskManager" or due to "Could not restart the job … Connection unexpectedly closed by remote task manager … This might indicate that the remote task manager was lost." Those are only the final failure causes: Flink does not appear to log the cause of intermediate restart failures.

I assume that the messages logged from the JobManager about "Association with remote system … has failed, address is now gated for [5000] ms. Reason is: [Disassociated]." is due to the TM failing, and is expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am wondering if this is related: https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do ? I don't see any log messages about quarantined TMs…

Do you think that increasing the # of job retries so that the jobs don't fail until all TMs are replaced with fresh ones fix this issue? The "memory.percent-free" metric from Collectd did go down to 2-3% on the TMs before they failed, and shot back up to 30-40% on TM restart (though I'm not sure how much of that had to do with the loss of state).  So, memory usage may be a real problem, but we don't get an OOM exception so I'm not sure we can control this from the JVM perspective. Are there other memory adjustments we should make which would allow our TMs to run for long periods of time without having this problem? Is there perhaps a memory leak in RocksDB?

Thanks for any help you can provide,
Shannon