Hi,
My use case is : - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager) - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector. - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID> - At the end of the day, I cancel all jobs - Each VM is configured with 16Gb RAM - Allocated memory configured for one taskmanager is 10Gb After several days, the memory saturates (we exceed 14Gb of used memory). I read the following posts but I did not succeed in understanding my problem : - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) : - When a job is started and running, it consumes memory - When a job is cancelled, a large part of the memory is still used - When another job is started and running (after to have cancel the previous job), even more memory is consumed - When I restart jobmanager and taskmanager, memory returns to normal Why when a job is canceled, the memory is not released? I added another attachment that represents the graph of a job - Graph.PNG. If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ... Thanks in advance, Julien Flink_memory.xlsx (26K) Download Attachment Graph.PNG (41K) Download Attachment Flink_memory.PNG (108K) Download Attachment |
Hi Julien, AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. Best, Paul Lam > 在 2018年10月12日,14:29,[hidden email] 写道: > > Hi, > > My use case is : > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager) > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector. > - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID> > - At the end of the day, I cancel all jobs > - Each VM is configured with 16Gb RAM > - Allocated memory configured for one taskmanager is 10Gb > > After several days, the memory saturates (we exceed 14Gb of used memory). > > I read the following posts but I did not succeed in understanding my problem : > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser > > I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) : > - When a job is started and running, it consumes memory > - When a job is cancelled, a large part of the memory is still used > - When another job is started and running (after to have cancel the previous job), even more memory is consumed > - When I restart jobmanager and taskmanager, memory returns to normal > > Why when a job is canceled, the memory is not released? > > I added another attachment that represents the graph of a job - Graph.PNG. > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ... > > Thanks in advance, > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG> |
Hi Julien, Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly. The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false. Best, Zhijiang
|
Hi Zhijiang,
Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you! Best, Paul Lam
|
The operators for stream jobs will not use memory management which is only for batch jobs as you said. I guess the initial feedback is for batch jobs from the description?
|
Hi, As was said before, managed memory (as described in the blog post [1]) is only used for batch jobs. By default, managed memory is only lazily allocated, i.e., when a batch job is executed. Streaming jobs maintain state in state backends. Flink provides state backends that store the state on the JVM heap or in an embedded RocksDB instance on disk. The state backend can be chosen per job (the default backend stores state on the JVM heap). Best, Fabian Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) <[hidden email]>:
|
Hi all,
Thanks for answers. I confirm I have streaming jobs. If I resume : - "When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption" is incorrect because MemoryManager functionnality is available only for batch jobs - My issue could be resolved by storing state backend in an embedded RocksDB instance on disk Is it exact ? If yes, does that mean that I have to purge old state backend in RocksDB ? Thanks a lot ! Regards, Julien. ----- Mail original ----- De: "Fabian Hueske" <[hidden email]> À: "wangzhijiang999" <[hidden email]> Cc: "Paul Lam" <[hidden email]>, [hidden email], "user" <[hidden email]> Envoyé: Mercredi 17 Octobre 2018 10:03:35 Objet: Re: Need help to understand memory consumption Hi, As was said before, managed memory (as described in the blog post [1]) is only used for batch jobs. By default, managed memory is only lazily allocated, i.e., when a batch job is executed. Streaming jobs maintain state in state backends. Flink provides state backends that store the state on the JVM heap or in an embedded RocksDB instance on disk. The state backend can be chosen per job (the default backend stores state on the JVM heap). Best, Fabian [1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html Am Mi., 17. Okt. 2018 um 08:53 Uhr schrieb Zhijiang(wangzhijiang999) < [hidden email] >: The operators for stream jobs will not use memory management which is only for batch jobs as you said. I guess the initial feedback is for batch jobs from the description? ------------------------------------------------------------------ 发件人:Paul Lam < [hidden email] > 发送时间:2018年10月17日(星期三) 14:35 收件人:Zhijiang(wangzhijiang999) < [hidden email] > 抄 送:jpreisner < [hidden email] >; user < [hidden email] > 主 题:Re: Need help to understand memory consumption Hi Zhijiang, Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you! [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 Best, Paul Lam 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) < [hidden email] > 写道: Hi Julien, Flink would manage the default 70% fraction of free memory in TaskManager for caching data efficiently, just as you mentioned in this article " https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html ". These managed memories are persistent resident and referenced by the MemoryManager once allocated, so they will be resident in old region of JVM and will not be recycled by gc. To do so, wecan aovid the costs of creating and recycling the objects repeatedly. The default parameter "taskmanager.memory.preallocate" is false, that means these managed memories will not be allocated during starting TaskManager. When the job is running, the related tasks would request these managed memories and then you will see the memory consumption is high. When the job is cancelled, these managed memories will be released to the MemoryManager but not recycled by gc, so you will see no changes in memory consumption. After you restart the TaskManager, the initial memory consumption is low because of lazy allocating via taskmanager.memory.preallocate=false. Best, Zhijiang ------------------------------------------------------------------ 发件人:Paul Lam < [hidden email] > 发送时间:2018年10月17日(星期三) 12:31 收件人:jpreisner < [hidden email] > 抄 送:user < [hidden email] > 主 题:Re: Need help to understand memory consumption Hi Julien, AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM GC to release the memory. Best, Paul Lam > 在 2018年10月12日,14:29, [hidden email] 写道: > > Hi, > > My use case is : > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager) > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector. > - Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID> > - At the end of the day, I cancel all jobs > - Each VM is configured with 16Gb RAM > - Allocated memory configured for one taskmanager is 10Gb > > After several days, the memory saturates (we exceed 14Gb of used memory). > > I read the following posts but I did not succeed in understanding my problem : > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser > > I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) : > - When a job is started and running, it consumes memory > - When a job is cancelled, a large part of the memory is still used > - When another job is started and running (after to have cancel the previous job), even more memory is consumed > - When I restart jobmanager and taskmanager, memory returns to normal > > Why when a job is canceled, the memory is not released? > > I added another attachment that represents the graph of a job - Graph.PNG. > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ... > > Thanks in advance, > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG> |
Hi Julien, First of all, if you only run streaming jobs you do not need to worry about "managed" memory. Regardless of the state backend, that you use, you should remove state that you don't need anymore. Otherwise, Flink will keep (and checkpoint) the state forever. There is no automatic garbage collection happening for state. WIth Flink 1.6, Flink added TTL for state which has to be manually enabled and which will remove state that has not been used since a configurable time. Note: in the current version state is only removed under certain circumstances. The feature is still being completed and improved. Best, Fabian Am Mi., 17. Okt. 2018 um 10:57 Uhr schrieb <[hidden email]>: Hi all, |
Free forum by Nabble | Edit this page |