http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/JVM-Non-Heap-Memory-tp10373p10466.html
with either the Graphite or Ganglia Reporter. The Meter metric is
> Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink metrics are you using?
>
> – Ufuk
>
> On 5 December 2016 at 16:44:47, Daniel Santos (
[hidden email]) wrote:
>> Hello,
>>
>> Thank you all for the kindly reply.
>>
>> I've got the general idea. I am using version flink's 1.1.3.
>>
>> So it seems the fix of Meter's won't make it to 1.1.4 ?
>>
>> Best Regards,
>>
>> Daniel Santos
>>
>>
>> On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
>>> We don't have to include it in 1.1.4 since Meter's do not exist in
>>> 1.1; my bad for tagging it in JIRA for 1.1.4.
>>>
>>> On 05.12.2016 14:18, Ufuk Celebi wrote:
>>>> Just to note that the bug mentioned by Chesnay does not invalidate
>>>> Stefan's comments. ;-)
>>>>
>>>> Chesnay's issue is here:
>>>>
https://issues.apache.org/jira/browse/FLINK-5261>>>>
>>>> I added an issue to improve the documentation about cancellation
>>>> (
https://issues.apache.org/jira/browse/FLINK-5260).
>>>>
>>>> Which version of Flink are you using? Chesnay's fix will make it into
>>>> the upcoming 1.1.4 release.
>>>>
>>>>
>>>> On 5 December 2016 at 14:04:49, Chesnay Schepler (
[hidden email])
>>>> wrote:
>>>>> Hello Daniel,
>>>>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
>>>>> cleaned up, causing the underlying dropwizard meter update threads to
>>>>> not be shutdown either.
>>>>> I've opened a JIRA
>>>>> and will open a PR soon.
>>>>> Thank your for reporting this issue.
>>>>> Regards,
>>>>> Chesnay
>>>>> On 05.12.2016 12:05, Stefan Richter wrote:
>>>>>> Hi Daniel,
>>>>>>
>>>>>> the behaviour you observe looks like some threads are not canceled.
>>>>>> Thread cancelation in Flink (and Java in general) is always
>>>>>> cooperative, where cooperative means that the thread you want to
>>>>>> cancel should somehow check cancelation and react to it. Sometimes
>>>>>> this also requires some effort from the client that wants to cancel a
>>>>>> thread. So if you implement e.g. custom operators or functions with
>>>>>> aerospike, you must ensure that they a) react on cancelation and b)
>>>>>> cleanup their resources. If you do not consider this, your aerospike
>>>>>> client might stay in a blocking call forever, in particular blocking
>>>>>> IO calls are prone to this. What you need to ensure is that
>>>>>> cancelation from the clients includes closing IO resources such as
>>>>>> streams to unblock the thread and allow for termination. This means
>>>>>> that you need your code must (to a certain degree) actively
>>>>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a
>>>>>> feature called CloseableRegistry, which makes participating in this
>>>>>> lifecycle easier w.r.t. closing resources. For the time being, you
>>>>>> should check that Flink’s task cancelation also causes your code to
>>>>>> close the aerospike client and check cancelation flags.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have done some threads checking and dumps. And I have disabled the
>>>>>>> checkpointing.
>>>>>>>
>>>>>>> Here are my findings.
>>>>>>>
>>>>>>> I did a thread dump a few hours after I booted up the whole cluster.
>>>>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
>>>>>>>
>>>>>>> The dump shows that most threads are of 3 sources.
>>>>>>> *
>>>>>>> **OutputFlusher --- 634 -- Sleeping State*
>>>>>>>
>>>>>>> "OutputFlusher" - Thread t@4758
>>>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>>>> at java.lang.Thread.sleep(Native Method)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
>>>>>>>
>>>>>>>
>>>>>>> Locked ownable synchronizers:
>>>>>>> - None
>>>>>>> *
>>>>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
>>>>>>> being used ) -- Parked State*
>>>>>>>
>>>>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>>>> at sun.misc.Unsafe.park(Native Method)
>>>>>>> - parking to wait for (a
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Locked ownable synchronizers:
>>>>>>> - None
>>>>>>> *
>>>>>>> *
>>>>>>>
>>>>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
>>>>>>> *
>>>>>>>
>>>>>>> "tend" - Thread t@29011
>>>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>>>> at java.lang.Thread.sleep(Native Method)
>>>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Locked ownable synchronizers:
>>>>>>> - None
>>>>>>>
>>>>>>>
>>>>>>> I have 2 streaming jobs and a batch Job that runs once in a while.
>>>>>>>
>>>>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in
>>>>>>> RichSink .
>>>>>>>
>>>>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in
>>>>>>> RichFilterFunction / RichMapFunction with open and close methods, in
>>>>>>> order to open and close the client.
>>>>>>>
>>>>>>> Batch Job runs Aerospike Client in RichFilterFunction /
>>>>>>> RichMapFunction with open and close methods in order to open and
>>>>>>> close the client.
>>>>>>>
>>>>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked
>>>>>>> the threads and the JVM non-heap usage.
>>>>>>>
>>>>>>> JVM non-heap usage reaches 3GB, threads go down, but some still
>>>>>>> linger around and they are the following.
>>>>>>>
>>>>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
>>>>>>> used ) *
>>>>>>>
>>>>>>> "metrics-meter-tick-thread-1" - Thread t@29024
>>>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>>>> at sun.misc.Unsafe.park(Native Method)
>>>>>>> - parking to wait for (a
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Locked ownable synchronizers:
>>>>>>> - None
>>>>>>>
>>>>>>> *
>>>>>>> *
>>>>>>>
>>>>>>> *tend -- 432**( Aerospike Client Thread )*
>>>>>>>
>>>>>>>
>>>>>>> "tend" - Thread t@29011
>>>>>>> java.lang.Thread.State: TIMED_WAITING
>>>>>>> at java.lang.Thread.sleep(Native Method)
>>>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
>>>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Locked ownable synchronizers:
>>>>>>> - None
>>>>>>>
>>>>>>>
>>>>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
>>>>>>> So I have 1220 threads that I believe that sould be dead and not
>>>>>>> running, since I have no jobs running at all.
>>>>>>>
>>>>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing
>>>>>>> every job.
>>>>>>>
>>>>>>>
>>>>>>> Why the hell metrics grow to no end ?
>>>>>>>
>>>>>>> I am using the following libs for metrics :
>>>>>>>
>>>>>>> - metrics-graphite-3.1.0.jar
>>>>>>>
>>>>>>> - metrics-core-3.1.0.jar
>>>>>>>
>>>>>>> - flink-metrics-dropwizard-1.1.3.jar
>>>>>>>
>>>>>>> - flink-metrics-graphite-1.1.3.jar
>>>>>>>
>>>>>>> And the config for reporter is :
>>>>>>>
>>>>>>> metrics.reporters: graphite
>>>>>>> metrics.reporter.graphite.class:
>>>>>>> org.apache.flink.metrics.graphite.GraphiteReporter
>>>>>>> metrics.reporter.graphite.host: CARBONRELAYHOST
>>>>>>> metrics.reporter.graphite.port: 2003
>>>>>>>
>>>>>>>
>>>>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing
>>>>>>> something, or doing something wrong ?
>>>>>>>
>>>>>>>
>>>>>>> Sorry for the long post.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Daniel Santos
>>>>>>>
>>>>>>>
>>>>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
>>>>>>>> Hey Daniel!
>>>>>>>>
>>>>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is
>>>>>>>> not expected.
>>>>> What kind of Threads are you seeing being spawned/lingering around?
>>>>>>>> As a first step, could you try to disable checkpointing and see
>>>>>>>> how it behaves afterwards?
>>>>>>>>
>>>>>>>> – Ufuk
>>>>>>>>
>>>>>>>> On 29 November 2016 at 17:32:32, Daniel Santos
>>>>>>>> (
[hidden email]) wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source,
>>>>>>>>> and a
>>>>>>>>> HttpClient as a Sink, also Kafka as Sink.
>>>>>>>>> So it's possible that the state backend is the culprit?
>>>>>>>>>
>>>>>>>>> Curious thing is even when no jobs are running streaming or
>>>>>>>>> otherwise,
>>>>>>>>> the JVM Non-HEAP stays the same.
>>>>>>>>> Which I find it odd.
>>>>>>>>>
>>>>>>>>> Another curious thing is that it's proportional to an increase
>>>>>>>>> of JVM
>>>>>>>>> thread's number.
>>>>>>>>> Whenever there are more JVM threads running there is also more JVM
>>>>>>>>> Non-HEAP being used, which makes sense.
>>>>>>>>> But threads stick around never decreasing, too, likewise JVM
>>>>>>>>> Non-HEAP
>>>>>>>>> memory.
>>>>>>>>>
>>>>>>>>> These observations described are based on what flink's metrics
>>>>>>>>> are being
>>>>>>>>> sent and recorded to our graphite's system.
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>>
>>>>>>>>> Daniel Santos
>>>>>>>>>
>>>>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
>>>>>>>>>> Are you using the RocksDB backend in native mode? If so then the
>>>>>>>>>> off-heap memory may be there.
>>>>>>>>>>
>>>>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
>>>>>>>>>>
>>>>>>>>>> i have the same problem,but i put the flink job into yarn.
>>>>>>>>>> but i put the job into yarn on the computer 22,and the job can
>>>>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
>>>>>>>>>> three different compu345ter,
>>>>>>>>>> however,on computer 22,the pid=3463,which is the job that put into
>>>>>>>>>> yarn,is have 2.3g memory,15% of total,
>>>>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
>>>>>>>>>> -ytm 1024 ....
>>>>>>>>>> why in conputer 22,has occupy so much momory?the job is running
>>>>>>>>>> computer 79 and computer 69.
>>>>>>>>>> What would be the possible causes of such behavior ?
>>>>>>>>>> Best Regards,
>>>>>>>>>> ----- 原始邮件 -----
>>>>>>>>>> 发件人:Daniel Santos > > >
>>>>>>>>>> 收件人:
[hidden email]
>>>>>>>>>> 主题:JVM Non Heap Memory
>>>>>>>>>> 日期:2016年11月29日 22点26分
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>> Is it common to have high usage of Non-Heap in JVM ?
>>>>>>>>>> I am running flink in stand-alone cluster and in docker, with each
>>>>>>>>>> docker bieng capped at 6G of memory.
>>>>>>>>>> I have been struggling to keep memory usage in check.
>>>>>>>>>> The non-heap increases to no end. It start with just 100MB of
>>>>>>>>>> usage and
>>>>>>>>>> after a day it reaches to 1,3GB.
>>>>>>>>>> Then evetually reaches to 2GB and then eventually the docker is
>>>>>>>>>> killed
>>>>>>>>>> because it has reached the memory limit.
>>>>>>>>>> My configuration for each flink task manager is the following :
>>>>>>>>>> ----------- flink-conf.yaml --------------
>>>>>>>>>> taskmanager.heap.mb: 3072
>>>>>>>>>> taskmanager.numberOfTaskSlots: 8
>>>>>>>>>> taskmanager.memory.preallocate: false
>>>>>>>>>> taskmanager.network.numberOfBuffers: 12500
>>>>>>>>>> taskmanager.memory.off-heap: false
>>>>>>>>>> ---------------------------------------------
>>>>>>>>>> What would be the possible causes of such behavior ?
>>>>>>>>>> Best Regards,
>>>>>>>>>> Daniel Santos
>>>>>>>>>>
>>>>>>>>>>
>>
>>
>