Hi, I am running flink job on yarn where it ran fine so far (4-5 days) and have now started failing with following errors. 2018-11-24 03:46:21,029 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_1542008917197_0038_01_000006 because: Container [pid=18380,containerID=container_1542008917197_0038_01_000006] is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 5.0 GB of 15 GB virtual memory used. Killing container. This is simple job where we are reading 2 Avro streams from Kafka and applying some custom UDF after creating keyed stream from union on those 2 streams and writing back output to Kafka. Udf internally uses Map State with RocksDB backend. Currently size of checkpoint is around 300 GB and we are running this with 10 task manager with 3 GB memory each. I have also set "containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink version is 1.6.2 Here is the flink command ./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar I want to understand what are typical reasons for this issue? Also why would flink consume more memory than allocated as JVM memory is fixed and will not grow beyond max heap. Can this be something related to RocksDB where it may be consuming memory outside heap and hence over using defined limits? I didn't find this issue when checkpoint size was small (<50 GB). But ever since we are now at 300GB size, this issue is coming frequently. I can try increasing memory, but I am still interested in knowing what are typical reasons for this error if Jvm heap memory can not grow beyond defined limit. Gagan |
I think it is probably related with rockdb memory usage if you have not found OutOfMemory issue before. There already existed a jira ticket [1] for fixing this issue, and you can watch it for updates. :) Best, Zhijiang
|
I am also experiencing this error message "Container is running beyond physical memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 40 slots for each task manager. The memory assigned during flink cluster creation is 1024MB per task manager. The checkpoint is using RocksDb and the checkpoint size is very small (10MB). Is the simply solution to increase the Task Manager memory size? I will try from 1024MB to 4096MB per task manager. ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ On Sunday, November 25, 2018 7:58 PM, zhijiang <[hidden email]> wrote:
|
Also, after the Flink job has failed from the above error, the Flink job is unable to recover from previous checkpoint. Is this the expected behavior? How can the job be recovered successfully from this? ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ On Monday, November 26, 2018 12:35 AM, Flink Developer <[hidden email]> wrote:
|
It may work aournd by increasing the task manager memory size. The recover failure is up to serveral issues, whether it had successful checkpoint before, the states are available and what is the failover strategy? Best, Zhijiang
|
Free forum by Nabble | Edit this page |