Re: JVM Non Heap Memory

Posted by Chesnay Schepler on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/JVM-Non-Heap-Memory-tp10373p10458.html

We don't have to include it in 1.1.4 since Meter's do not exist in 1.1;
my bad for tagging it in JIRA for 1.1.4.

On 05.12.2016 14:18, Ufuk Celebi wrote:

> Just to note that the bug mentioned by Chesnay does not invalidate Stefan's comments. ;-)
>
> Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261
>
> I added an issue to improve the documentation about cancellation (https://issues.apache.org/jira/browse/FLINK-5260).
>
> Which version of Flink are you using? Chesnay's fix will make it into the upcoming 1.1.4 release.
>
>
> On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) wrote:
>> Hello Daniel,
>>  
>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
>> cleaned up, causing the underlying dropwizard meter update threads to
>> not be shutdown either.
>>  
>> I've opened a JIRA
>> and will open a PR soon.
>>  
>> Thank your for reporting this issue.
>>  
>> Regards,
>> Chesnay
>>  
>> On 05.12.2016 12:05, Stefan Richter wrote:
>>> Hi Daniel,
>>>
>>> the behaviour you observe looks like some threads are not canceled.
>>> Thread cancelation in Flink (and Java in general) is always
>>> cooperative, where cooperative means that the thread you want to
>>> cancel should somehow check cancelation and react to it. Sometimes
>>> this also requires some effort from the client that wants to cancel a
>>> thread. So if you implement e.g. custom operators or functions with
>>> aerospike, you must ensure that they a) react on cancelation and b)
>>> cleanup their resources. If you do not consider this, your aerospike
>>> client might stay in a blocking call forever, in particular blocking
>>> IO calls are prone to this. What you need to ensure is that
>>> cancelation from the clients includes closing IO resources such as
>>> streams to unblock the thread and allow for termination. This means
>>> that you need your code must (to a certain degree) actively
>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a
>>> feature called CloseableRegistry, which makes participating in this
>>> lifecycle easier w.r.t. closing resources. For the time being, you
>>> should check that Flink’s task cancelation also causes your code to
>>> close the aerospike client and check cancelation flags.
>>>
>>> Best,
>>> Stefan
>>>
>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
>>>>
>>>> Hello,
>>>>
>>>> I have done some threads checking and dumps. And I have disabled the
>>>> checkpointing.
>>>>
>>>> Here are my findings.
>>>>
>>>> I did a thread dump a few hours after I booted up the whole cluster.
>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
>>>>
>>>> The dump shows that most threads are of 3 sources.
>>>> *
>>>> **OutputFlusher --- 634 -- Sleeping State*
>>>>
>>>> "OutputFlusher" - Thread t@4758
>>>> java.lang.Thread.State: TIMED_WAITING
>>>> at java.lang.Thread.sleep(Native Method)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
>>>>
>>>> Locked ownable synchronizers:
>>>> - None
>>>> *
>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
>>>> being used ) -- Parked State*
>>>>
>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>> java.lang.Thread.State: TIMED_WAITING
>>>> at sun.misc.Unsafe.park(Native Method)
>>>> - parking to wait for (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>> at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Locked ownable synchronizers:
>>>> - None
>>>> *
>>>> *
>>>>
>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
>>>> *
>>>>
>>>> "tend" - Thread t@29011
>>>> java.lang.Thread.State: TIMED_WAITING
>>>> at java.lang.Thread.sleep(Native Method)
>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Locked ownable synchronizers:
>>>> - None
>>>>
>>>>
>>>> I have 2 streaming jobs and a batch Job that runs once in a while.
>>>>
>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in
>>>> RichSink .
>>>>
>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in
>>>> RichFilterFunction / RichMapFunction with open and close methods, in
>>>> order to open and close the client.
>>>>
>>>> Batch Job runs Aerospike Client in RichFilterFunction /
>>>> RichMapFunction with open and close methods in order to open and
>>>> close the client.
>>>>
>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked
>>>> the threads and the JVM non-heap usage.
>>>>
>>>> JVM non-heap usage reaches 3GB, threads go down, but some still
>>>> linger around and they are the following.
>>>>
>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
>>>> used ) *
>>>>
>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>> java.lang.Thread.State: TIMED_WAITING
>>>> at sun.misc.Unsafe.park(Native Method)
>>>> - parking to wait for (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>> at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Locked ownable synchronizers:
>>>> - None
>>>>
>>>> *
>>>> *
>>>>
>>>> *tend -- 432**( Aerospike Client Thread )*
>>>>
>>>>
>>>> "tend" - Thread t@29011
>>>> java.lang.Thread.State: TIMED_WAITING
>>>> at java.lang.Thread.sleep(Native Method)
>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Locked ownable synchronizers:
>>>> - None
>>>>
>>>>
>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
>>>> So I have 1220 threads that I believe that sould be dead and not
>>>> running, since I have no jobs running at all.
>>>>
>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing
>>>> every job.
>>>>
>>>>
>>>> Why the hell metrics grow to no end ?
>>>>
>>>> I am using the following libs for metrics :
>>>>
>>>> - metrics-graphite-3.1.0.jar
>>>>
>>>> - metrics-core-3.1.0.jar
>>>>
>>>> - flink-metrics-dropwizard-1.1.3.jar
>>>>
>>>> - flink-metrics-graphite-1.1.3.jar
>>>>
>>>> And the config for reporter is :
>>>>
>>>> metrics.reporters: graphite
>>>> metrics.reporter.graphite.class:
>>>> org.apache.flink.metrics.graphite.GraphiteReporter
>>>> metrics.reporter.graphite.host: CARBONRELAYHOST
>>>> metrics.reporter.graphite.port: 2003
>>>>
>>>>
>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing
>>>> something, or doing something wrong ?
>>>>
>>>>
>>>> Sorry for the long post.
>>>>
>>>> Best Regards,
>>>>
>>>> Daniel Santos
>>>>
>>>>
>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
>>>>> Hey Daniel!
>>>>>
>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is not expected.
>> What kind of Threads are you seeing being spawned/lingering around?
>>>>> As a first step, could you try to disable checkpointing and see how it behaves afterwards?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On 29 November 2016 at 17:32:32, Daniel Santos ([hidden email]) wrote:
>>>>>> Hello,
>>>>>>
>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
>>>>>> HttpClient as a Sink, also Kafka as Sink.
>>>>>> So it's possible that the state backend is the culprit?
>>>>>>
>>>>>> Curious thing is even when no jobs are running streaming or otherwise,
>>>>>> the JVM Non-HEAP stays the same.
>>>>>> Which I find it odd.
>>>>>>
>>>>>> Another curious thing is that it's proportional to an increase of JVM
>>>>>> thread's number.
>>>>>> Whenever there are more JVM threads running there is also more JVM
>>>>>> Non-HEAP being used, which makes sense.
>>>>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP
>>>>>> memory.
>>>>>>
>>>>>> These observations described are based on what flink's metrics are being
>>>>>> sent and recorded to our graphite's system.
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Daniel Santos
>>>>>>
>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
>>>>>>> Are you using the RocksDB backend in native mode? If so then the
>>>>>>> off-heap memory may be there.
>>>>>>>
>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
>>>>>>>
>>>>>>> i have the same problem,but i put the flink job into yarn.
>>>>>>> but i put the job into yarn on the computer 22,and the job can
>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
>>>>>>> three different compu345ter,
>>>>>>> however,on computer 22,the pid=3463,which is the job that put into
>>>>>>> yarn,is have 2.3g memory,15% of total,
>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
>>>>>>> -ytm 1024 ....
>>>>>>> why in conputer 22,has occupy so much momory?the job is running
>>>>>>> computer 79 and computer 69.
>>>>>>> What would be the possible causes of such behavior ?
>>>>>>> Best Regards,
>>>>>>> ----- 原始邮件 -----
>>>>>>> 发件人:Daniel Santos > > >
>>>>>>> 收件人:[hidden email]
>>>>>>> 主题:JVM Non Heap Memory
>>>>>>> 日期:2016年11月29日 22点26分
>>>>>>>
>>>>>>>
>>>>>>> Hello,
>>>>>>> Is it common to have high usage of Non-Heap in JVM ?
>>>>>>> I am running flink in stand-alone cluster and in docker, with each
>>>>>>> docker bieng capped at 6G of memory.
>>>>>>> I have been struggling to keep memory usage in check.
>>>>>>> The non-heap increases to no end. It start with just 100MB of
>>>>>>> usage and
>>>>>>> after a day it reaches to 1,3GB.
>>>>>>> Then evetually reaches to 2GB and then eventually the docker is
>>>>>>> killed
>>>>>>> because it has reached the memory limit.
>>>>>>> My configuration for each flink task manager is the following :
>>>>>>> ----------- flink-conf.yaml --------------
>>>>>>> taskmanager.heap.mb: 3072
>>>>>>> taskmanager.numberOfTaskSlots: 8
>>>>>>> taskmanager.memory.preallocate: false
>>>>>>> taskmanager.network.numberOfBuffers: 12500
>>>>>>> taskmanager.memory.off-heap: false
>>>>>>> ---------------------------------------------
>>>>>>> What would be the possible causes of such behavior ?
>>>>>>> Best Regards,
>>>>>>> Daniel Santos
>>>>>>>
>>>>>>>
>>>>>>
>>  
>>  
>