Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

马阳阳
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re:Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

马阳阳
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

Matthias
Hi,
I have a few questions about your case:
* What is the option you're referring to for the bounded shuffle? That might help to understand what streaming mode solution you're looking for.
* What does the job graph look like? Are you assuming that it's due to a shuffling operation? Could you provide the logs to get a better understanding of your case?
* Do you observe the same memory increase for other TaskManager nodes?
* Are you expecting to reach the memory limits considering that you mentioned a "big state size"? Would increasing the memory limit be an option or do you fear that it's caused by some memory leak?

Bet,
Matthias

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳 <[hidden email]> wrote:
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

dhanesh arole
Hi, 

Questions that [hidden email] asked are very valid and might provide more leads. But if you haven't already then it's worth trying to use jemalloc / tcmalloc. We had similar problems with slow growth in TM memory resulting in pods getting OOMed by k8s. After switching to jemalloc, the memory foot print improved dramatically.


Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Thu, Apr 22, 2021 at 1:39 PM Matthias Pohl <[hidden email]> wrote:
Hi,
I have a few questions about your case:
* What is the option you're referring to for the bounded shuffle? That might help to understand what streaming mode solution you're looking for.
* What does the job graph look like? Are you assuming that it's due to a shuffling operation? Could you provide the logs to get a better understanding of your case?
* Do you observe the same memory increase for other TaskManager nodes?
* Are you expecting to reach the memory limits considering that you mentioned a "big state size"? Would increasing the memory limit be an option or do you fear that it's caused by some memory leak?

Bet,
Matthias

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳 <[hidden email]> wrote:
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

马阳阳
In reply to this post by Matthias
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the questions, hoping this will help.

* What is the option you're referring to for the bounded shuffle? That might help to understand what streaming mode solution you're looking for.
    
taskmanager.network.blocking-shuffle.type
"file"StringThe blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.
* What does the job graph look like? Are you assuming that it's due to a shuffling operation? Could you provide the logs to get a better understanding of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. I think the crash is due to rocksdb. And I could not get the logs (because some misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?
   After one tm is killed, the job failed. So I didn’t see the exactly same memory increase for other tms. But I think other tms would have similiar behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a "big state size"? Would increasing the memory limit be an option or do you fear that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.

By the answers I provided, I think maybe we should figure out why rocksdb overused virtual memory, and caused yarn to kill the container.

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳 <[hidden email]> wrote:
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

Matthias
Thanks for sharing these details. Looking into FLINK-14952 [1] (which introduced this option) and the related mailing list thread [2], it feels like your issue is quite similar to what is described in there even though it sounds like this issue is mostly tied to bounded jobs. But I'm not sure what is happening under the hood. I guess you tried the option already? Have you had the chance to profile memory. I'm pulling in Piotr and Zhijiang. Maybe, they have more insights on that matter.


On Fri, Apr 23, 2021 at 4:53 AM 马阳阳 <[hidden email]> wrote:
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the questions, hoping this will help.

* What is the option you're referring to for the bounded shuffle? That might help to understand what streaming mode solution you're looking for.
    
taskmanager.network.blocking-shuffle.type
"file"StringThe blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.
* What does the job graph look like? Are you assuming that it's due to a shuffling operation? Could you provide the logs to get a better understanding of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. I think the crash is due to rocksdb. And I could not get the logs (because some misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?
   After one tm is killed, the job failed. So I didn’t see the exactly same memory increase for other tms. But I think other tms would have similiar behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a "big state size"? Would increasing the memory limit be an option or do you fear that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.

By the answers I provided, I think maybe we should figure out why rocksdb overused virtual memory, and caused yarn to kill the container.

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳 <[hidden email]> wrote:
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

Matthias
Another few questions: Have you had the chance to monitor/profile the memory usage? What section of the memory was used excessively? Additionally, could [hidden email]'s proposal solve your issue?

Matthias

On Fri, Apr 23, 2021 at 8:41 AM Matthias Pohl <[hidden email]> wrote:
Thanks for sharing these details. Looking into FLINK-14952 [1] (which introduced this option) and the related mailing list thread [2], it feels like your issue is quite similar to what is described in there even though it sounds like this issue is mostly tied to bounded jobs. But I'm not sure what is happening under the hood. I guess you tried the option already? Have you had the chance to profile memory. I'm pulling in Piotr and Zhijiang. Maybe, they have more insights on that matter.


On Fri, Apr 23, 2021 at 4:53 AM 马阳阳 <[hidden email]> wrote:
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the questions, hoping this will help.

* What is the option you're referring to for the bounded shuffle? That might help to understand what streaming mode solution you're looking for.
    
taskmanager.network.blocking-shuffle.type
"file"StringThe blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.
* What does the job graph look like? Are you assuming that it's due to a shuffling operation? Could you provide the logs to get a better understanding of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. I think the crash is due to rocksdb. And I could not get the logs (because some misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?
   After one tm is killed, the job failed. So I didn’t see the exactly same memory increase for other tms. But I think other tms would have similiar behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a "big state size"? Would increasing the memory limit be an option or do you fear that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.

By the answers I provided, I think maybe we should figure out why rocksdb overused virtual memory, and caused yarn to kill the container.

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳 <[hidden email]> wrote:
The Flink version we used is 1.12.0.


On 04/16/2021 16:07[hidden email] wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager process was killed by the yarn node manager. The following log is from the yarn node manager:

2021-04-16 11:51:23,013 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=521232,containerID=container_e157_1618223445363_16943_01_000010] is running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing container.

When searching solution for this problem, I found that there is a option for this that worked for bounded shuffle. So is there a way to get rid of this in streaming mode?

PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7