i have the same problem,but i put the flink job into yarn.
but i put the job into yarn on the computer 22,and the job can success run,and the jobmanager is 79 and taskmanager is 69,they three different compu345ter, however,on computer 22,the pid=3463,which is the job that put into yarn,is have 2.3g memory,15% of total, the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 -ytm 1024 .... why in conputer 22,has occupy so much momory?the job is running computer 79 and computer 69. What would be the possible causes of such behavior ? Best Regards, ----- 原始邮件 ----- 发件人:Daniel Santos <[hidden email]> 收件人:[hidden email] 主题:JVM Non Heap Memory 日期:2016年11月29日 22点26分 Hello, Is it common to have high usage of Non-Heap in JVM ? I am running flink in stand-alone cluster and in docker, with each docker bieng capped at 6G of memory. I have been struggling to keep memory usage in check. The non-heap increases to no end. It start with just 100MB of usage and after a day it reaches to 1,3GB. Then evetually reaches to 2GB and then eventually the docker is killed because it has reached the memory limit. My configuration for each flink task manager is the following : ----------- flink-conf.yaml -------------- taskmanager.heap.mb: 3072 taskmanager.numberOfTaskSlots: 8 taskmanager.memory.preallocate: false taskmanager.network.numberOfBuffers: 12500 taskmanager.memory.off-heap: false --------------------------------------------- What would be the possible causes of such behavior ? Best Regards, Daniel Santos |
Are you using the RocksDB backend in native mode? If so then the off-heap memory may be there. On Tue, Nov 29, 2016 at 9:54 AM, <[hidden email]> wrote: i have the same problem,but i put the flink job into yarn. |
Hello,
Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a HttpClient as a Sink, also Kafka as Sink. So it's possible that the state backend is the culprit? Curious thing is even when no jobs are running streaming or otherwise, the JVM Non-HEAP stays the same. Which I find it odd. Another curious thing is that it's proportional to an increase of JVM thread's number. Whenever there are more JVM threads running there is also more JVM Non-HEAP being used, which makes sense. But threads stick around never decreasing, too, likewise JVM Non-HEAP memory. These observations described are based on what flink's metrics are being sent and recorded to our graphite's system. Best Regards, Daniel Santos On 11/29/2016 04:04 PM, Cliff Resnick
wrote:
|
Hey Daniel!
Thanks for reporting this. Unbounded growth of non-heap memory is not expected. What kind of Threads are you seeing being spawned/lingering around? As a first step, could you try to disable checkpointing and see how it behaves afterwards? – Ufuk On 29 November 2016 at 17:32:32, Daniel Santos ([hidden email]) wrote: > Hello, > > Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a > HttpClient as a Sink, also Kafka as Sink. > So it's possible that the state backend is the culprit? > > Curious thing is even when no jobs are running streaming or otherwise, > the JVM Non-HEAP stays the same. > Which I find it odd. > > Another curious thing is that it's proportional to an increase of JVM > thread's number. > Whenever there are more JVM threads running there is also more JVM > Non-HEAP being used, which makes sense. > But threads stick around never decreasing, too, likewise JVM Non-HEAP > memory. > > These observations described are based on what flink's metrics are being > sent and recorded to our graphite's system. > > Best Regards, > > Daniel Santos > > On 11/29/2016 04:04 PM, Cliff Resnick wrote: > > Are you using the RocksDB backend in native mode? If so then the > > off-heap memory may be there. > > > > On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: > > > > i have the same problem,but i put the flink job into yarn. > > but i put the job into yarn on the computer 22,and the job can > > success run,and the jobmanager is 79 and taskmanager is 69,they > > three different compu345ter, > > however,on computer 22,the pid=3463,which is the job that put into > > yarn,is have 2.3g memory,15% of total, > > the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 > > -ytm 1024 .... > > why in conputer 22,has occupy so much momory?the job is running > > computer 79 and computer 69. > > What would be the possible causes of such behavior ? > > Best Regards, > > ----- 原始邮件 ----- > > 发件人:Daniel Santos > > > > > 收件人:[hidden email] > > 主题:JVM Non Heap Memory > > 日期:2016年11月29日 22点26分 > > > > > > Hello, > > Is it common to have high usage of Non-Heap in JVM ? > > I am running flink in stand-alone cluster and in docker, with each > > docker bieng capped at 6G of memory. > > I have been struggling to keep memory usage in check. > > The non-heap increases to no end. It start with just 100MB of > > usage and > > after a day it reaches to 1,3GB. > > Then evetually reaches to 2GB and then eventually the docker is > > killed > > because it has reached the memory limit. > > My configuration for each flink task manager is the following : > > ----------- flink-conf.yaml -------------- > > taskmanager.heap.mb: 3072 > > taskmanager.numberOfTaskSlots: 8 > > taskmanager.memory.preallocate: false > > taskmanager.network.numberOfBuffers: 12500 > > taskmanager.memory.off-heap: false > > --------------------------------------------- > > What would be the possible causes of such behavior ? > > Best Regards, > > Daniel Santos > > > > > > |
Hello, I have done some threads checking and dumps. And I have disabled
the checkpointing. Here are my findings. The dump shows that most threads are of 3 sources. OutputFlusher --- 634 -- Sleeping State "OutputFlusher" - Thread t@4758 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) Locked ownable synchronizers: - None Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being used ) -- Parked State "metrics-meter-tick-thread-1" - Thread t@29024 java.lang.Thread.State: TIMED_WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <bcfb9f9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: - None tend -- 220 ( Aerospike Client Thread ) -- Sleeping State "tend" - Thread t@29011
I have 2 streaming jobs and a batch Job that runs once in a while. Streaming job A runs with a parallel of 2 and runs Aerospike only in RichSink . Streaming job B runs with a parallel of 24 and runs Aerospike in RichFilterFunction / RichMapFunction with open and close methods, in order to open and close the client. Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction with open and close methods in order to open and close the client. Next thing I cancelled all the streaming jobs @5/12/2016 and checked the threads and the JVM non-heap usage. JVM non-heap usage reaches 3GB, threads go down, but some still linger around and they are the following. Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being used ) "metrics-meter-tick-thread-1" - Thread t@29024
tend -- 432 ( Aerospike Client Thread )
Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . So I have 1220 threads that I believe that sould be dead and not running, since I have no jobs running at all. And the JVM Non-HEAP usage doesn't decreases at all, after
removing every job.
Why the hell metrics grow to no end ? I am using the following libs for metrics : - metrics-graphite-3.1.0.jar - metrics-core-3.1.0.jar - flink-metrics-dropwizard-1.1.3.jar - flink-metrics-graphite-1.1.3.jar And the config for reporter is : metrics.reporters: graphite
Shouldn't also the Aerospike Client be closed ? Or am I missing
something, or doing something wrong ?
Sorry for the long post. Best Regards, Daniel Santos On 11/29/2016 04:57 PM, Ufuk Celebi
wrote:
Hey Daniel! Thanks for reporting this. Unbounded growth of non-heap memory is not expected. What kind of Threads are you seeing being spawned/lingering around? As a first step, could you try to disable checkpointing and see how it behaves afterwards? – Ufuk On 29 November 2016 at 17:32:32, Daniel Santos ([hidden email]) wrote:Hello, Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a HttpClient as a Sink, also Kafka as Sink. So it's possible that the state backend is the culprit? Curious thing is even when no jobs are running streaming or otherwise, the JVM Non-HEAP stays the same. Which I find it odd. Another curious thing is that it's proportional to an increase of JVM thread's number. Whenever there are more JVM threads running there is also more JVM Non-HEAP being used, which makes sense. But threads stick around never decreasing, too, likewise JVM Non-HEAP memory. These observations described are based on what flink's metrics are being sent and recorded to our graphite's system. Best Regards, Daniel Santos On 11/29/2016 04:04 PM, Cliff Resnick wrote:Are you using the RocksDB backend in native mode? If so then the off-heap memory may be there. On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: i have the same problem,but i put the flink job into yarn. but i put the job into yarn on the computer 22,and the job can success run,and the jobmanager is 79 and taskmanager is 69,they three different compu345ter, however,on computer 22,the pid=3463,which is the job that put into yarn,is have 2.3g memory,15% of total, the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 -ytm 1024 .... why in conputer 22,has occupy so much momory?the job is running computer 79 and computer 69. What would be the possible causes of such behavior ? Best Regards, ----- 原始邮件 ----- 发件人:Daniel Santos > > > 收件人:[hidden email] 主题:JVM Non Heap Memory 日期:2016年11月29日 22点26分 Hello, Is it common to have high usage of Non-Heap in JVM ? I am running flink in stand-alone cluster and in docker, with each docker bieng capped at 6G of memory. I have been struggling to keep memory usage in check. The non-heap increases to no end. It start with just 100MB of usage and after a day it reaches to 1,3GB. Then evetually reaches to 2GB and then eventually the docker is killed because it has reached the memory limit. My configuration for each flink task manager is the following : ----------- flink-conf.yaml -------------- taskmanager.heap.mb: 3072 taskmanager.numberOfTaskSlots: 8 taskmanager.memory.preallocate: false taskmanager.network.numberOfBuffers: 12500 taskmanager.memory.off-heap: false --------------------------------------------- What would be the possible causes of such behavior ? Best Regards, Daniel Santos |
Hi Daniel,
the behaviour you observe looks like some threads are not canceled. Thread cancelation in Flink (and Java in general) is always cooperative, where cooperative means that the thread you want to cancel should somehow check cancelation and react to it. Sometimes this also requires some effort from the client that wants to cancel a thread. So if you implement e.g. custom operators or functions with aerospike, you must ensure that they a) react on cancelation and b) cleanup their resources. If you do not consider this, your aerospike client might stay in a blocking call forever, in particular blocking IO calls are prone to this. What you need to ensure is that cancelation from the clients includes closing IO resources such as streams to unblock the thread and allow for termination. This means that you need your code must (to a certain degree) actively participate in Flink's task lifecycle. In Flink 1.2 we introduce a feature called CloseableRegistry, which makes participating in this lifecycle easier w.r.t. closing resources. For the time being, you should check that Flink’s task cancelation also causes your code to close the aerospike client and check cancelation flags. Best, Stefan
|
Hello Daniel,
I'm afraid you stumbled upon a bug in Flink. Meters were not properly cleaned up, causing the underlying dropwizard meter update threads to not be shutdown either. I've opened a JIRA and will open a PR soon. Thank your for reporting this issue. Regards, Chesnay On 05.12.2016 12:05, Stefan Richter wrote: Hi Daniel, |
Just to note that the bug mentioned by Chesnay does not invalidate Stefan's comments. ;-)
Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261 I added an issue to improve the documentation about cancellation (https://issues.apache.org/jira/browse/FLINK-5260). Which version of Flink are you using? Chesnay's fix will make it into the upcoming 1.1.4 release. On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) wrote: > Hello Daniel, > > I'm afraid you stumbled upon a bug in Flink. Meters were not properly > cleaned up, causing the underlying dropwizard meter update threads to > not be shutdown either. > > I've opened a JIRA > and will open a PR soon. > > Thank your for reporting this issue. > > Regards, > Chesnay > > On 05.12.2016 12:05, Stefan Richter wrote: > > Hi Daniel, > > > > the behaviour you observe looks like some threads are not canceled. > > Thread cancelation in Flink (and Java in general) is always > > cooperative, where cooperative means that the thread you want to > > cancel should somehow check cancelation and react to it. Sometimes > > this also requires some effort from the client that wants to cancel a > > thread. So if you implement e.g. custom operators or functions with > > aerospike, you must ensure that they a) react on cancelation and b) > > cleanup their resources. If you do not consider this, your aerospike > > client might stay in a blocking call forever, in particular blocking > > IO calls are prone to this. What you need to ensure is that > > cancelation from the clients includes closing IO resources such as > > streams to unblock the thread and allow for termination. This means > > that you need your code must (to a certain degree) actively > > participate in Flink's task lifecycle. In Flink 1.2 we introduce a > > feature called CloseableRegistry, which makes participating in this > > lifecycle easier w.r.t. closing resources. For the time being, you > > should check that Flink’s task cancelation also causes your code to > > close the aerospike client and check cancelation flags. > > > > Best, > > Stefan > > > >> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: > >> > >> Hello, > >> > >> I have done some threads checking and dumps. And I have disabled the > >> checkpointing. > >> > >> Here are my findings. > >> > >> I did a thread dump a few hours after I booted up the whole cluster. > >> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) > >> > >> The dump shows that most threads are of 3 sources. > >> * > >> **OutputFlusher --- 634 -- Sleeping State* > >> > >> "OutputFlusher" - Thread t@4758 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at > >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) > >> > >> Locked ownable synchronizers: > >> - None > >> * > >> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics > >> being used ) -- Parked State* > >> > >> "metrics-meter-tick-thread-1" - Thread t@29024 > >> java.lang.Thread.State: TIMED_WAITING > >> at sun.misc.Unsafe.park(Native Method) > >> - parking to wait for (a > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >> at > >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >> at > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >> at > >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> * > >> * > >> > >> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State > >> * > >> > >> "tend" - Thread t@29011 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at com.aerospike.client.util.Util.sleep(Util.java:38) > >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> > >> I have 2 streaming jobs and a batch Job that runs once in a while. > >> > >> Streaming job A runs with a parallel of 2 and runs Aerospike only in > >> RichSink . > >> > >> Streaming job B runs with a parallel of 24 and runs Aerospike in > >> RichFilterFunction / RichMapFunction with open and close methods, in > >> order to open and close the client. > >> > >> Batch Job runs Aerospike Client in RichFilterFunction / > >> RichMapFunction with open and close methods in order to open and > >> close the client. > >> > >> Next thing I cancelled all the streaming jobs @5/12/2016 and checked > >> the threads and the JVM non-heap usage. > >> > >> JVM non-heap usage reaches 3GB, threads go down, but some still > >> linger around and they are the following. > >> > >> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being > >> used ) * > >> > >> "metrics-meter-tick-thread-1" - Thread t@29024 > >> java.lang.Thread.State: TIMED_WAITING > >> at sun.misc.Unsafe.park(Native Method) > >> - parking to wait for (a > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >> at > >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >> at > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >> at > >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> * > >> * > >> > >> *tend -- 432**( Aerospike Client Thread )* > >> > >> > >> "tend" - Thread t@29011 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at com.aerospike.client.util.Util.sleep(Util.java:38) > >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> > >> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . > >> So I have 1220 threads that I believe that sould be dead and not > >> running, since I have no jobs running at all. > >> > >> And the JVM Non-HEAP usage doesn't decreases at all, after removing > >> every job. > >> > >> > >> Why the hell metrics grow to no end ? > >> > >> I am using the following libs for metrics : > >> > >> - metrics-graphite-3.1.0.jar > >> > >> - metrics-core-3.1.0.jar > >> > >> - flink-metrics-dropwizard-1.1.3.jar > >> > >> - flink-metrics-graphite-1.1.3.jar > >> > >> And the config for reporter is : > >> > >> metrics.reporters: graphite > >> metrics.reporter.graphite.class: > >> org.apache.flink.metrics.graphite.GraphiteReporter > >> metrics.reporter.graphite.host: CARBONRELAYHOST > >> metrics.reporter.graphite.port: 2003 > >> > >> > >> Shouldn't also the Aerospike Client be closed ? Or am I missing > >> something, or doing something wrong ? > >> > >> > >> Sorry for the long post. > >> > >> Best Regards, > >> > >> Daniel Santos > >> > >> > >> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: > >>> Hey Daniel! > >>> > >>> Thanks for reporting this. Unbounded growth of non-heap memory is not expected. > What kind of Threads are you seeing being spawned/lingering around? > >>> > >>> As a first step, could you try to disable checkpointing and see how it behaves afterwards? > >>> > >>> – Ufuk > >>> > >>> On 29 November 2016 at 17:32:32, Daniel Santos ([hidden email]) wrote: > >>>> Hello, > >>>> > >>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a > >>>> HttpClient as a Sink, also Kafka as Sink. > >>>> So it's possible that the state backend is the culprit? > >>>> > >>>> Curious thing is even when no jobs are running streaming or otherwise, > >>>> the JVM Non-HEAP stays the same. > >>>> Which I find it odd. > >>>> > >>>> Another curious thing is that it's proportional to an increase of JVM > >>>> thread's number. > >>>> Whenever there are more JVM threads running there is also more JVM > >>>> Non-HEAP being used, which makes sense. > >>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP > >>>> memory. > >>>> > >>>> These observations described are based on what flink's metrics are being > >>>> sent and recorded to our graphite's system. > >>>> > >>>> Best Regards, > >>>> > >>>> Daniel Santos > >>>> > >>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: > >>>>> Are you using the RocksDB backend in native mode? If so then the > >>>>> off-heap memory may be there. > >>>>> > >>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: > >>>>> > >>>>> i have the same problem,but i put the flink job into yarn. > >>>>> but i put the job into yarn on the computer 22,and the job can > >>>>> success run,and the jobmanager is 79 and taskmanager is 69,they > >>>>> three different compu345ter, > >>>>> however,on computer 22,the pid=3463,which is the job that put into > >>>>> yarn,is have 2.3g memory,15% of total, > >>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 > >>>>> -ytm 1024 .... > >>>>> why in conputer 22,has occupy so much momory?the job is running > >>>>> computer 79 and computer 69. > >>>>> What would be the possible causes of such behavior ? > >>>>> Best Regards, > >>>>> ----- 原始邮件 ----- > >>>>> 发件人:Daniel Santos > > > > >>>>> 收件人:[hidden email] > >>>>> 主题:JVM Non Heap Memory > >>>>> 日期:2016年11月29日 22点26分 > >>>>> > >>>>> > >>>>> Hello, > >>>>> Is it common to have high usage of Non-Heap in JVM ? > >>>>> I am running flink in stand-alone cluster and in docker, with each > >>>>> docker bieng capped at 6G of memory. > >>>>> I have been struggling to keep memory usage in check. > >>>>> The non-heap increases to no end. It start with just 100MB of > >>>>> usage and > >>>>> after a day it reaches to 1,3GB. > >>>>> Then evetually reaches to 2GB and then eventually the docker is > >>>>> killed > >>>>> because it has reached the memory limit. > >>>>> My configuration for each flink task manager is the following : > >>>>> ----------- flink-conf.yaml -------------- > >>>>> taskmanager.heap.mb: 3072 > >>>>> taskmanager.numberOfTaskSlots: 8 > >>>>> taskmanager.memory.preallocate: false > >>>>> taskmanager.network.numberOfBuffers: 12500 > >>>>> taskmanager.memory.off-heap: false > >>>>> --------------------------------------------- > >>>>> What would be the possible causes of such behavior ? > >>>>> Best Regards, > >>>>> Daniel Santos > >>>>> > >>>>> > >>>> > >>>> > >> > > > > |
We don't have to include it in 1.1.4 since Meter's do not exist in 1.1;
my bad for tagging it in JIRA for 1.1.4. On 05.12.2016 14:18, Ufuk Celebi wrote: > Just to note that the bug mentioned by Chesnay does not invalidate Stefan's comments. ;-) > > Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261 > > I added an issue to improve the documentation about cancellation (https://issues.apache.org/jira/browse/FLINK-5260). > > Which version of Flink are you using? Chesnay's fix will make it into the upcoming 1.1.4 release. > > > On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) wrote: >> Hello Daniel, >> >> I'm afraid you stumbled upon a bug in Flink. Meters were not properly >> cleaned up, causing the underlying dropwizard meter update threads to >> not be shutdown either. >> >> I've opened a JIRA >> and will open a PR soon. >> >> Thank your for reporting this issue. >> >> Regards, >> Chesnay >> >> On 05.12.2016 12:05, Stefan Richter wrote: >>> Hi Daniel, >>> >>> the behaviour you observe looks like some threads are not canceled. >>> Thread cancelation in Flink (and Java in general) is always >>> cooperative, where cooperative means that the thread you want to >>> cancel should somehow check cancelation and react to it. Sometimes >>> this also requires some effort from the client that wants to cancel a >>> thread. So if you implement e.g. custom operators or functions with >>> aerospike, you must ensure that they a) react on cancelation and b) >>> cleanup their resources. If you do not consider this, your aerospike >>> client might stay in a blocking call forever, in particular blocking >>> IO calls are prone to this. What you need to ensure is that >>> cancelation from the clients includes closing IO resources such as >>> streams to unblock the thread and allow for termination. This means >>> that you need your code must (to a certain degree) actively >>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a >>> feature called CloseableRegistry, which makes participating in this >>> lifecycle easier w.r.t. closing resources. For the time being, you >>> should check that Flink’s task cancelation also causes your code to >>> close the aerospike client and check cancelation flags. >>> >>> Best, >>> Stefan >>> >>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: >>>> >>>> Hello, >>>> >>>> I have done some threads checking and dumps. And I have disabled the >>>> checkpointing. >>>> >>>> Here are my findings. >>>> >>>> I did a thread dump a few hours after I booted up the whole cluster. >>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) >>>> >>>> The dump shows that most threads are of 3 sources. >>>> * >>>> **OutputFlusher --- 634 -- Sleeping State* >>>> >>>> "OutputFlusher" - Thread t@4758 >>>> java.lang.Thread.State: TIMED_WAITING >>>> at java.lang.Thread.sleep(Native Method) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> * >>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics >>>> being used ) -- Parked State* >>>> >>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>> java.lang.Thread.State: TIMED_WAITING >>>> at sun.misc.Unsafe.park(Native Method) >>>> - parking to wait for (a >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>> at >>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>> at >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> * >>>> * >>>> >>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State >>>> * >>>> >>>> "tend" - Thread t@29011 >>>> java.lang.Thread.State: TIMED_WAITING >>>> at java.lang.Thread.sleep(Native Method) >>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> >>>> I have 2 streaming jobs and a batch Job that runs once in a while. >>>> >>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in >>>> RichSink . >>>> >>>> Streaming job B runs with a parallel of 24 and runs Aerospike in >>>> RichFilterFunction / RichMapFunction with open and close methods, in >>>> order to open and close the client. >>>> >>>> Batch Job runs Aerospike Client in RichFilterFunction / >>>> RichMapFunction with open and close methods in order to open and >>>> close the client. >>>> >>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked >>>> the threads and the JVM non-heap usage. >>>> >>>> JVM non-heap usage reaches 3GB, threads go down, but some still >>>> linger around and they are the following. >>>> >>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being >>>> used ) * >>>> >>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>> java.lang.Thread.State: TIMED_WAITING >>>> at sun.misc.Unsafe.park(Native Method) >>>> - parking to wait for (a >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>> at >>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>> at >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> * >>>> * >>>> >>>> *tend -- 432**( Aerospike Client Thread )* >>>> >>>> >>>> "tend" - Thread t@29011 >>>> java.lang.Thread.State: TIMED_WAITING >>>> at java.lang.Thread.sleep(Native Method) >>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> >>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . >>>> So I have 1220 threads that I believe that sould be dead and not >>>> running, since I have no jobs running at all. >>>> >>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing >>>> every job. >>>> >>>> >>>> Why the hell metrics grow to no end ? >>>> >>>> I am using the following libs for metrics : >>>> >>>> - metrics-graphite-3.1.0.jar >>>> >>>> - metrics-core-3.1.0.jar >>>> >>>> - flink-metrics-dropwizard-1.1.3.jar >>>> >>>> - flink-metrics-graphite-1.1.3.jar >>>> >>>> And the config for reporter is : >>>> >>>> metrics.reporters: graphite >>>> metrics.reporter.graphite.class: >>>> org.apache.flink.metrics.graphite.GraphiteReporter >>>> metrics.reporter.graphite.host: CARBONRELAYHOST >>>> metrics.reporter.graphite.port: 2003 >>>> >>>> >>>> Shouldn't also the Aerospike Client be closed ? Or am I missing >>>> something, or doing something wrong ? >>>> >>>> >>>> Sorry for the long post. >>>> >>>> Best Regards, >>>> >>>> Daniel Santos >>>> >>>> >>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: >>>>> Hey Daniel! >>>>> >>>>> Thanks for reporting this. Unbounded growth of non-heap memory is not expected. >> What kind of Threads are you seeing being spawned/lingering around? >>>>> As a first step, could you try to disable checkpointing and see how it behaves afterwards? >>>>> >>>>> – Ufuk >>>>> >>>>> On 29 November 2016 at 17:32:32, Daniel Santos ([hidden email]) wrote: >>>>>> Hello, >>>>>> >>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a >>>>>> HttpClient as a Sink, also Kafka as Sink. >>>>>> So it's possible that the state backend is the culprit? >>>>>> >>>>>> Curious thing is even when no jobs are running streaming or otherwise, >>>>>> the JVM Non-HEAP stays the same. >>>>>> Which I find it odd. >>>>>> >>>>>> Another curious thing is that it's proportional to an increase of JVM >>>>>> thread's number. >>>>>> Whenever there are more JVM threads running there is also more JVM >>>>>> Non-HEAP being used, which makes sense. >>>>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP >>>>>> memory. >>>>>> >>>>>> These observations described are based on what flink's metrics are being >>>>>> sent and recorded to our graphite's system. >>>>>> >>>>>> Best Regards, >>>>>> >>>>>> Daniel Santos >>>>>> >>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: >>>>>>> Are you using the RocksDB backend in native mode? If so then the >>>>>>> off-heap memory may be there. >>>>>>> >>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: >>>>>>> >>>>>>> i have the same problem,but i put the flink job into yarn. >>>>>>> but i put the job into yarn on the computer 22,and the job can >>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they >>>>>>> three different compu345ter, >>>>>>> however,on computer 22,the pid=3463,which is the job that put into >>>>>>> yarn,is have 2.3g memory,15% of total, >>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 >>>>>>> -ytm 1024 .... >>>>>>> why in conputer 22,has occupy so much momory?the job is running >>>>>>> computer 79 and computer 69. >>>>>>> What would be the possible causes of such behavior ? >>>>>>> Best Regards, >>>>>>> ----- 原始邮件 ----- >>>>>>> 发件人:Daniel Santos > > > >>>>>>> 收件人:[hidden email] >>>>>>> 主题:JVM Non Heap Memory >>>>>>> 日期:2016年11月29日 22点26分 >>>>>>> >>>>>>> >>>>>>> Hello, >>>>>>> Is it common to have high usage of Non-Heap in JVM ? >>>>>>> I am running flink in stand-alone cluster and in docker, with each >>>>>>> docker bieng capped at 6G of memory. >>>>>>> I have been struggling to keep memory usage in check. >>>>>>> The non-heap increases to no end. It start with just 100MB of >>>>>>> usage and >>>>>>> after a day it reaches to 1,3GB. >>>>>>> Then evetually reaches to 2GB and then eventually the docker is >>>>>>> killed >>>>>>> because it has reached the memory limit. >>>>>>> My configuration for each flink task manager is the following : >>>>>>> ----------- flink-conf.yaml -------------- >>>>>>> taskmanager.heap.mb: 3072 >>>>>>> taskmanager.numberOfTaskSlots: 8 >>>>>>> taskmanager.memory.preallocate: false >>>>>>> taskmanager.network.numberOfBuffers: 12500 >>>>>>> taskmanager.memory.off-heap: false >>>>>>> --------------------------------------------- >>>>>>> What would be the possible causes of such behavior ? >>>>>>> Best Regards, >>>>>>> Daniel Santos >>>>>>> >>>>>>> >>>>>> >> >> > |
Hello,
Thank you all for the kindly reply. I've got the general idea. I am using version flink's 1.1.3. So it seems the fix of Meter's won't make it to 1.1.4 ? Best Regards, Daniel Santos On 12/05/2016 01:28 PM, Chesnay Schepler wrote: > We don't have to include it in 1.1.4 since Meter's do not exist in > 1.1; my bad for tagging it in JIRA for 1.1.4. > > On 05.12.2016 14:18, Ufuk Celebi wrote: >> Just to note that the bug mentioned by Chesnay does not invalidate >> Stefan's comments. ;-) >> >> Chesnay's issue is here: >> https://issues.apache.org/jira/browse/FLINK-5261 >> >> I added an issue to improve the documentation about cancellation >> (https://issues.apache.org/jira/browse/FLINK-5260). >> >> Which version of Flink are you using? Chesnay's fix will make it into >> the upcoming 1.1.4 release. >> >> >> On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) >> wrote: >>> Hello Daniel, >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly >>> cleaned up, causing the underlying dropwizard meter update threads to >>> not be shutdown either. >>> I've opened a JIRA >>> and will open a PR soon. >>> Thank your for reporting this issue. >>> Regards, >>> Chesnay >>> On 05.12.2016 12:05, Stefan Richter wrote: >>>> Hi Daniel, >>>> >>>> the behaviour you observe looks like some threads are not canceled. >>>> Thread cancelation in Flink (and Java in general) is always >>>> cooperative, where cooperative means that the thread you want to >>>> cancel should somehow check cancelation and react to it. Sometimes >>>> this also requires some effort from the client that wants to cancel a >>>> thread. So if you implement e.g. custom operators or functions with >>>> aerospike, you must ensure that they a) react on cancelation and b) >>>> cleanup their resources. If you do not consider this, your aerospike >>>> client might stay in a blocking call forever, in particular blocking >>>> IO calls are prone to this. What you need to ensure is that >>>> cancelation from the clients includes closing IO resources such as >>>> streams to unblock the thread and allow for termination. This means >>>> that you need your code must (to a certain degree) actively >>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a >>>> feature called CloseableRegistry, which makes participating in this >>>> lifecycle easier w.r.t. closing resources. For the time being, you >>>> should check that Flink’s task cancelation also causes your code to >>>> close the aerospike client and check cancelation flags. >>>> >>>> Best, >>>> Stefan >>>> >>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: >>>>> >>>>> Hello, >>>>> >>>>> I have done some threads checking and dumps. And I have disabled the >>>>> checkpointing. >>>>> >>>>> Here are my findings. >>>>> >>>>> I did a thread dump a few hours after I booted up the whole cluster. >>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) >>>>> >>>>> The dump shows that most threads are of 3 sources. >>>>> * >>>>> **OutputFlusher --- 634 -- Sleeping State* >>>>> >>>>> "OutputFlusher" - Thread t@4758 >>>>> java.lang.Thread.State: TIMED_WAITING >>>>> at java.lang.Thread.sleep(Native Method) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) >>>>> >>>>> >>>>> Locked ownable synchronizers: >>>>> - None >>>>> * >>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics >>>>> being used ) -- Parked State* >>>>> >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>>> java.lang.Thread.State: TIMED_WAITING >>>>> at sun.misc.Unsafe.park(Native Method) >>>>> - parking to wait for (a >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>> >>>>> at >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>>> >>>>> at >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>>> >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>>> >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Locked ownable synchronizers: >>>>> - None >>>>> * >>>>> * >>>>> >>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State >>>>> * >>>>> >>>>> "tend" - Thread t@29011 >>>>> java.lang.Thread.State: TIMED_WAITING >>>>> at java.lang.Thread.sleep(Native Method) >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Locked ownable synchronizers: >>>>> - None >>>>> >>>>> >>>>> I have 2 streaming jobs and a batch Job that runs once in a while. >>>>> >>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in >>>>> RichSink . >>>>> >>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in >>>>> RichFilterFunction / RichMapFunction with open and close methods, in >>>>> order to open and close the client. >>>>> >>>>> Batch Job runs Aerospike Client in RichFilterFunction / >>>>> RichMapFunction with open and close methods in order to open and >>>>> close the client. >>>>> >>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked >>>>> the threads and the JVM non-heap usage. >>>>> >>>>> JVM non-heap usage reaches 3GB, threads go down, but some still >>>>> linger around and they are the following. >>>>> >>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being >>>>> used ) * >>>>> >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>>> java.lang.Thread.State: TIMED_WAITING >>>>> at sun.misc.Unsafe.park(Native Method) >>>>> - parking to wait for (a >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>> >>>>> at >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>>> >>>>> at >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>>> >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>>> >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Locked ownable synchronizers: >>>>> - None >>>>> >>>>> * >>>>> * >>>>> >>>>> *tend -- 432**( Aerospike Client Thread )* >>>>> >>>>> >>>>> "tend" - Thread t@29011 >>>>> java.lang.Thread.State: TIMED_WAITING >>>>> at java.lang.Thread.sleep(Native Method) >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Locked ownable synchronizers: >>>>> - None >>>>> >>>>> >>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . >>>>> So I have 1220 threads that I believe that sould be dead and not >>>>> running, since I have no jobs running at all. >>>>> >>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing >>>>> every job. >>>>> >>>>> >>>>> Why the hell metrics grow to no end ? >>>>> >>>>> I am using the following libs for metrics : >>>>> >>>>> - metrics-graphite-3.1.0.jar >>>>> >>>>> - metrics-core-3.1.0.jar >>>>> >>>>> - flink-metrics-dropwizard-1.1.3.jar >>>>> >>>>> - flink-metrics-graphite-1.1.3.jar >>>>> >>>>> And the config for reporter is : >>>>> >>>>> metrics.reporters: graphite >>>>> metrics.reporter.graphite.class: >>>>> org.apache.flink.metrics.graphite.GraphiteReporter >>>>> metrics.reporter.graphite.host: CARBONRELAYHOST >>>>> metrics.reporter.graphite.port: 2003 >>>>> >>>>> >>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing >>>>> something, or doing something wrong ? >>>>> >>>>> >>>>> Sorry for the long post. >>>>> >>>>> Best Regards, >>>>> >>>>> Daniel Santos >>>>> >>>>> >>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: >>>>>> Hey Daniel! >>>>>> >>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is >>>>>> not expected. >>> What kind of Threads are you seeing being spawned/lingering around? >>>>>> As a first step, could you try to disable checkpointing and see >>>>>> how it behaves afterwards? >>>>>> >>>>>> – Ufuk >>>>>> >>>>>> On 29 November 2016 at 17:32:32, Daniel Santos >>>>>> ([hidden email]) wrote: >>>>>>> Hello, >>>>>>> >>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, >>>>>>> and a >>>>>>> HttpClient as a Sink, also Kafka as Sink. >>>>>>> So it's possible that the state backend is the culprit? >>>>>>> >>>>>>> Curious thing is even when no jobs are running streaming or >>>>>>> otherwise, >>>>>>> the JVM Non-HEAP stays the same. >>>>>>> Which I find it odd. >>>>>>> >>>>>>> Another curious thing is that it's proportional to an increase >>>>>>> of JVM >>>>>>> thread's number. >>>>>>> Whenever there are more JVM threads running there is also more JVM >>>>>>> Non-HEAP being used, which makes sense. >>>>>>> But threads stick around never decreasing, too, likewise JVM >>>>>>> Non-HEAP >>>>>>> memory. >>>>>>> >>>>>>> These observations described are based on what flink's metrics >>>>>>> are being >>>>>>> sent and recorded to our graphite's system. >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Daniel Santos >>>>>>> >>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: >>>>>>>> Are you using the RocksDB backend in native mode? If so then the >>>>>>>> off-heap memory may be there. >>>>>>>> >>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: >>>>>>>> >>>>>>>> i have the same problem,but i put the flink job into yarn. >>>>>>>> but i put the job into yarn on the computer 22,and the job can >>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they >>>>>>>> three different compu345ter, >>>>>>>> however,on computer 22,the pid=3463,which is the job that put into >>>>>>>> yarn,is have 2.3g memory,15% of total, >>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 >>>>>>>> -ytm 1024 .... >>>>>>>> why in conputer 22,has occupy so much momory?the job is running >>>>>>>> computer 79 and computer 69. >>>>>>>> What would be the possible causes of such behavior ? >>>>>>>> Best Regards, >>>>>>>> ----- 原始邮件 ----- >>>>>>>> 发件人:Daniel Santos > > > >>>>>>>> 收件人:[hidden email] >>>>>>>> 主题:JVM Non Heap Memory >>>>>>>> 日期:2016年11月29日 22点26分 >>>>>>>> >>>>>>>> >>>>>>>> Hello, >>>>>>>> Is it common to have high usage of Non-Heap in JVM ? >>>>>>>> I am running flink in stand-alone cluster and in docker, with each >>>>>>>> docker bieng capped at 6G of memory. >>>>>>>> I have been struggling to keep memory usage in check. >>>>>>>> The non-heap increases to no end. It start with just 100MB of >>>>>>>> usage and >>>>>>>> after a day it reaches to 1,3GB. >>>>>>>> Then evetually reaches to 2GB and then eventually the docker is >>>>>>>> killed >>>>>>>> because it has reached the memory limit. >>>>>>>> My configuration for each flink task manager is the following : >>>>>>>> ----------- flink-conf.yaml -------------- >>>>>>>> taskmanager.heap.mb: 3072 >>>>>>>> taskmanager.numberOfTaskSlots: 8 >>>>>>>> taskmanager.memory.preallocate: false >>>>>>>> taskmanager.network.numberOfBuffers: 12500 >>>>>>>> taskmanager.memory.off-heap: false >>>>>>>> --------------------------------------------- >>>>>>>> What would be the possible causes of such behavior ? >>>>>>>> Best Regards, >>>>>>>> Daniel Santos >>>>>>>> >>>>>>>> >>>>>>> >> > |
Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink metrics are you using?
– Ufuk On 5 December 2016 at 16:44:47, Daniel Santos ([hidden email]) wrote: > Hello, > > Thank you all for the kindly reply. > > I've got the general idea. I am using version flink's 1.1.3. > > So it seems the fix of Meter's won't make it to 1.1.4 ? > > Best Regards, > > Daniel Santos > > > On 12/05/2016 01:28 PM, Chesnay Schepler wrote: > > We don't have to include it in 1.1.4 since Meter's do not exist in > > 1.1; my bad for tagging it in JIRA for 1.1.4. > > > > On 05.12.2016 14:18, Ufuk Celebi wrote: > >> Just to note that the bug mentioned by Chesnay does not invalidate > >> Stefan's comments. ;-) > >> > >> Chesnay's issue is here: > >> https://issues.apache.org/jira/browse/FLINK-5261 > >> > >> I added an issue to improve the documentation about cancellation > >> (https://issues.apache.org/jira/browse/FLINK-5260). > >> > >> Which version of Flink are you using? Chesnay's fix will make it into > >> the upcoming 1.1.4 release. > >> > >> > >> On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) > >> wrote: > >>> Hello Daniel, > >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly > >>> cleaned up, causing the underlying dropwizard meter update threads to > >>> not be shutdown either. > >>> I've opened a JIRA > >>> and will open a PR soon. > >>> Thank your for reporting this issue. > >>> Regards, > >>> Chesnay > >>> On 05.12.2016 12:05, Stefan Richter wrote: > >>>> Hi Daniel, > >>>> > >>>> the behaviour you observe looks like some threads are not canceled. > >>>> Thread cancelation in Flink (and Java in general) is always > >>>> cooperative, where cooperative means that the thread you want to > >>>> cancel should somehow check cancelation and react to it. Sometimes > >>>> this also requires some effort from the client that wants to cancel a > >>>> thread. So if you implement e.g. custom operators or functions with > >>>> aerospike, you must ensure that they a) react on cancelation and b) > >>>> cleanup their resources. If you do not consider this, your aerospike > >>>> client might stay in a blocking call forever, in particular blocking > >>>> IO calls are prone to this. What you need to ensure is that > >>>> cancelation from the clients includes closing IO resources such as > >>>> streams to unblock the thread and allow for termination. This means > >>>> that you need your code must (to a certain degree) actively > >>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a > >>>> feature called CloseableRegistry, which makes participating in this > >>>> lifecycle easier w.r.t. closing resources. For the time being, you > >>>> should check that Flink’s task cancelation also causes your code to > >>>> close the aerospike client and check cancelation flags. > >>>> > >>>> Best, > >>>> Stefan > >>>> > >>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: > >>>>> > >>>>> Hello, > >>>>> > >>>>> I have done some threads checking and dumps. And I have disabled the > >>>>> checkpointing. > >>>>> > >>>>> Here are my findings. > >>>>> > >>>>> I did a thread dump a few hours after I booted up the whole cluster. > >>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) > >>>>> > >>>>> The dump shows that most threads are of 3 sources. > >>>>> * > >>>>> **OutputFlusher --- 634 -- Sleeping State* > >>>>> > >>>>> "OutputFlusher" - Thread t@4758 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at > >>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) > >>>>> > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> * > >>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics > >>>>> being used ) -- Parked State* > >>>>> > >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at sun.misc.Unsafe.park(Native Method) > >>>>> - parking to wait for (a > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>>>> > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> * > >>>>> * > >>>>> > >>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State > >>>>> * > >>>>> > >>>>> "tend" - Thread t@29011 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) > >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> > >>>>> I have 2 streaming jobs and a batch Job that runs once in a while. > >>>>> > >>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in > >>>>> RichSink . > >>>>> > >>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in > >>>>> RichFilterFunction / RichMapFunction with open and close methods, in > >>>>> order to open and close the client. > >>>>> > >>>>> Batch Job runs Aerospike Client in RichFilterFunction / > >>>>> RichMapFunction with open and close methods in order to open and > >>>>> close the client. > >>>>> > >>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked > >>>>> the threads and the JVM non-heap usage. > >>>>> > >>>>> JVM non-heap usage reaches 3GB, threads go down, but some still > >>>>> linger around and they are the following. > >>>>> > >>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being > >>>>> used ) * > >>>>> > >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at sun.misc.Unsafe.park(Native Method) > >>>>> - parking to wait for (a > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>>>> > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> * > >>>>> * > >>>>> > >>>>> *tend -- 432**( Aerospike Client Thread )* > >>>>> > >>>>> > >>>>> "tend" - Thread t@29011 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) > >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> > >>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . > >>>>> So I have 1220 threads that I believe that sould be dead and not > >>>>> running, since I have no jobs running at all. > >>>>> > >>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing > >>>>> every job. > >>>>> > >>>>> > >>>>> Why the hell metrics grow to no end ? > >>>>> > >>>>> I am using the following libs for metrics : > >>>>> > >>>>> - metrics-graphite-3.1.0.jar > >>>>> > >>>>> - metrics-core-3.1.0.jar > >>>>> > >>>>> - flink-metrics-dropwizard-1.1.3.jar > >>>>> > >>>>> - flink-metrics-graphite-1.1.3.jar > >>>>> > >>>>> And the config for reporter is : > >>>>> > >>>>> metrics.reporters: graphite > >>>>> metrics.reporter.graphite.class: > >>>>> org.apache.flink.metrics.graphite.GraphiteReporter > >>>>> metrics.reporter.graphite.host: CARBONRELAYHOST > >>>>> metrics.reporter.graphite.port: 2003 > >>>>> > >>>>> > >>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing > >>>>> something, or doing something wrong ? > >>>>> > >>>>> > >>>>> Sorry for the long post. > >>>>> > >>>>> Best Regards, > >>>>> > >>>>> Daniel Santos > >>>>> > >>>>> > >>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: > >>>>>> Hey Daniel! > >>>>>> > >>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is > >>>>>> not expected. > >>> What kind of Threads are you seeing being spawned/lingering around? > >>>>>> As a first step, could you try to disable checkpointing and see > >>>>>> how it behaves afterwards? > >>>>>> > >>>>>> – Ufuk > >>>>>> > >>>>>> On 29 November 2016 at 17:32:32, Daniel Santos > >>>>>> ([hidden email]) wrote: > >>>>>>> Hello, > >>>>>>> > >>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, > >>>>>>> and a > >>>>>>> HttpClient as a Sink, also Kafka as Sink. > >>>>>>> So it's possible that the state backend is the culprit? > >>>>>>> > >>>>>>> Curious thing is even when no jobs are running streaming or > >>>>>>> otherwise, > >>>>>>> the JVM Non-HEAP stays the same. > >>>>>>> Which I find it odd. > >>>>>>> > >>>>>>> Another curious thing is that it's proportional to an increase > >>>>>>> of JVM > >>>>>>> thread's number. > >>>>>>> Whenever there are more JVM threads running there is also more JVM > >>>>>>> Non-HEAP being used, which makes sense. > >>>>>>> But threads stick around never decreasing, too, likewise JVM > >>>>>>> Non-HEAP > >>>>>>> memory. > >>>>>>> > >>>>>>> These observations described are based on what flink's metrics > >>>>>>> are being > >>>>>>> sent and recorded to our graphite's system. > >>>>>>> > >>>>>>> Best Regards, > >>>>>>> > >>>>>>> Daniel Santos > >>>>>>> > >>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: > >>>>>>>> Are you using the RocksDB backend in native mode? If so then the > >>>>>>>> off-heap memory may be there. > >>>>>>>> > >>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: > >>>>>>>> > >>>>>>>> i have the same problem,but i put the flink job into yarn. > >>>>>>>> but i put the job into yarn on the computer 22,and the job can > >>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they > >>>>>>>> three different compu345ter, > >>>>>>>> however,on computer 22,the pid=3463,which is the job that put into > >>>>>>>> yarn,is have 2.3g memory,15% of total, > >>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 > >>>>>>>> -ytm 1024 .... > >>>>>>>> why in conputer 22,has occupy so much momory?the job is running > >>>>>>>> computer 79 and computer 69. > >>>>>>>> What would be the possible causes of such behavior ? > >>>>>>>> Best Regards, > >>>>>>>> ----- 原始邮件 ----- > >>>>>>>> 发件人:Daniel Santos > > > > >>>>>>>> 收件人:[hidden email] > >>>>>>>> 主题:JVM Non Heap Memory > >>>>>>>> 日期:2016年11月29日 22点26分 > >>>>>>>> > >>>>>>>> > >>>>>>>> Hello, > >>>>>>>> Is it common to have high usage of Non-Heap in JVM ? > >>>>>>>> I am running flink in stand-alone cluster and in docker, with each > >>>>>>>> docker bieng capped at 6G of memory. > >>>>>>>> I have been struggling to keep memory usage in check. > >>>>>>>> The non-heap increases to no end. It start with just 100MB of > >>>>>>>> usage and > >>>>>>>> after a day it reaches to 1,3GB. > >>>>>>>> Then evetually reaches to 2GB and then eventually the docker is > >>>>>>>> killed > >>>>>>>> because it has reached the memory limit. > >>>>>>>> My configuration for each flink task manager is the following : > >>>>>>>> ----------- flink-conf.yaml -------------- > >>>>>>>> taskmanager.heap.mb: 3072 > >>>>>>>> taskmanager.numberOfTaskSlots: 8 > >>>>>>>> taskmanager.memory.preallocate: false > >>>>>>>> taskmanager.network.numberOfBuffers: 12500 > >>>>>>>> taskmanager.memory.off-heap: false > >>>>>>>> --------------------------------------------- > >>>>>>>> What would be the possible causes of such behavior ? > >>>>>>>> Best Regards, > >>>>>>>> Daniel Santos > >>>>>>>> > >>>>>>>> > >>>>>>> > >> > > > > |
Hey Daniel,
the fix won't make it into 1.1.4 since it is only relevant if you're using Flink Meters together with either the Graphite or Ganglia Reporter. The Meter metric is however not available in 1.1 at all, so it can't be the underlying cause. My fix is only for 1.2; the fixed issue could have caused the behavior. Now, for clarification, the "metrics-meter-tick-thread-X" threads are not created by Flink. With Meter's being out of the picture i thus think this is not an issue of Flink's metric system. Instead I believe kafka may be the culprit, I found a similar description here: https://issues.apache.org/jira/browse/KAFKA-1521 Which kafka version are you using? Kafka internally also uses the DropWizard library, and a particular version (2.2.0) of that is apparently known to be leaking threads. Regards, Chesnay On 05.12.2016 17:30, Ufuk Celebi wrote: > Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink metrics are you using? > > – Ufuk > > On 5 December 2016 at 16:44:47, Daniel Santos ([hidden email]) wrote: >> Hello, >> >> Thank you all for the kindly reply. >> >> I've got the general idea. I am using version flink's 1.1.3. >> >> So it seems the fix of Meter's won't make it to 1.1.4 ? >> >> Best Regards, >> >> Daniel Santos >> >> >> On 12/05/2016 01:28 PM, Chesnay Schepler wrote: >>> We don't have to include it in 1.1.4 since Meter's do not exist in >>> 1.1; my bad for tagging it in JIRA for 1.1.4. >>> >>> On 05.12.2016 14:18, Ufuk Celebi wrote: >>>> Just to note that the bug mentioned by Chesnay does not invalidate >>>> Stefan's comments. ;-) >>>> >>>> Chesnay's issue is here: >>>> https://issues.apache.org/jira/browse/FLINK-5261 >>>> >>>> I added an issue to improve the documentation about cancellation >>>> (https://issues.apache.org/jira/browse/FLINK-5260). >>>> >>>> Which version of Flink are you using? Chesnay's fix will make it into >>>> the upcoming 1.1.4 release. >>>> >>>> >>>> On 5 December 2016 at 14:04:49, Chesnay Schepler ([hidden email]) >>>> wrote: >>>>> Hello Daniel, >>>>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly >>>>> cleaned up, causing the underlying dropwizard meter update threads to >>>>> not be shutdown either. >>>>> I've opened a JIRA >>>>> and will open a PR soon. >>>>> Thank your for reporting this issue. >>>>> Regards, >>>>> Chesnay >>>>> On 05.12.2016 12:05, Stefan Richter wrote: >>>>>> Hi Daniel, >>>>>> >>>>>> the behaviour you observe looks like some threads are not canceled. >>>>>> Thread cancelation in Flink (and Java in general) is always >>>>>> cooperative, where cooperative means that the thread you want to >>>>>> cancel should somehow check cancelation and react to it. Sometimes >>>>>> this also requires some effort from the client that wants to cancel a >>>>>> thread. So if you implement e.g. custom operators or functions with >>>>>> aerospike, you must ensure that they a) react on cancelation and b) >>>>>> cleanup their resources. If you do not consider this, your aerospike >>>>>> client might stay in a blocking call forever, in particular blocking >>>>>> IO calls are prone to this. What you need to ensure is that >>>>>> cancelation from the clients includes closing IO resources such as >>>>>> streams to unblock the thread and allow for termination. This means >>>>>> that you need your code must (to a certain degree) actively >>>>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a >>>>>> feature called CloseableRegistry, which makes participating in this >>>>>> lifecycle easier w.r.t. closing resources. For the time being, you >>>>>> should check that Flink’s task cancelation also causes your code to >>>>>> close the aerospike client and check cancelation flags. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have done some threads checking and dumps. And I have disabled the >>>>>>> checkpointing. >>>>>>> >>>>>>> Here are my findings. >>>>>>> >>>>>>> I did a thread dump a few hours after I booted up the whole cluster. >>>>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) >>>>>>> >>>>>>> The dump shows that most threads are of 3 sources. >>>>>>> * >>>>>>> **OutputFlusher --- 634 -- Sleeping State* >>>>>>> >>>>>>> "OutputFlusher" - Thread t@4758 >>>>>>> java.lang.Thread.State: TIMED_WAITING >>>>>>> at java.lang.Thread.sleep(Native Method) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) >>>>>>> >>>>>>> >>>>>>> Locked ownable synchronizers: >>>>>>> - None >>>>>>> * >>>>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics >>>>>>> being used ) -- Parked State* >>>>>>> >>>>>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>>>>> java.lang.Thread.State: TIMED_WAITING >>>>>>> at sun.misc.Unsafe.park(Native Method) >>>>>>> - parking to wait for (a >>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Locked ownable synchronizers: >>>>>>> - None >>>>>>> * >>>>>>> * >>>>>>> >>>>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State >>>>>>> * >>>>>>> >>>>>>> "tend" - Thread t@29011 >>>>>>> java.lang.Thread.State: TIMED_WAITING >>>>>>> at java.lang.Thread.sleep(Native Method) >>>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Locked ownable synchronizers: >>>>>>> - None >>>>>>> >>>>>>> >>>>>>> I have 2 streaming jobs and a batch Job that runs once in a while. >>>>>>> >>>>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in >>>>>>> RichSink . >>>>>>> >>>>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in >>>>>>> RichFilterFunction / RichMapFunction with open and close methods, in >>>>>>> order to open and close the client. >>>>>>> >>>>>>> Batch Job runs Aerospike Client in RichFilterFunction / >>>>>>> RichMapFunction with open and close methods in order to open and >>>>>>> close the client. >>>>>>> >>>>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked >>>>>>> the threads and the JVM non-heap usage. >>>>>>> >>>>>>> JVM non-heap usage reaches 3GB, threads go down, but some still >>>>>>> linger around and they are the following. >>>>>>> >>>>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being >>>>>>> used ) * >>>>>>> >>>>>>> "metrics-meter-tick-thread-1" - Thread t@29024 >>>>>>> java.lang.Thread.State: TIMED_WAITING >>>>>>> at sun.misc.Unsafe.park(Native Method) >>>>>>> - parking to wait for (a >>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Locked ownable synchronizers: >>>>>>> - None >>>>>>> >>>>>>> * >>>>>>> * >>>>>>> >>>>>>> *tend -- 432**( Aerospike Client Thread )* >>>>>>> >>>>>>> >>>>>>> "tend" - Thread t@29011 >>>>>>> java.lang.Thread.State: TIMED_WAITING >>>>>>> at java.lang.Thread.sleep(Native Method) >>>>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) >>>>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Locked ownable synchronizers: >>>>>>> - None >>>>>>> >>>>>>> >>>>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . >>>>>>> So I have 1220 threads that I believe that sould be dead and not >>>>>>> running, since I have no jobs running at all. >>>>>>> >>>>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing >>>>>>> every job. >>>>>>> >>>>>>> >>>>>>> Why the hell metrics grow to no end ? >>>>>>> >>>>>>> I am using the following libs for metrics : >>>>>>> >>>>>>> - metrics-graphite-3.1.0.jar >>>>>>> >>>>>>> - metrics-core-3.1.0.jar >>>>>>> >>>>>>> - flink-metrics-dropwizard-1.1.3.jar >>>>>>> >>>>>>> - flink-metrics-graphite-1.1.3.jar >>>>>>> >>>>>>> And the config for reporter is : >>>>>>> >>>>>>> metrics.reporters: graphite >>>>>>> metrics.reporter.graphite.class: >>>>>>> org.apache.flink.metrics.graphite.GraphiteReporter >>>>>>> metrics.reporter.graphite.host: CARBONRELAYHOST >>>>>>> metrics.reporter.graphite.port: 2003 >>>>>>> >>>>>>> >>>>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing >>>>>>> something, or doing something wrong ? >>>>>>> >>>>>>> >>>>>>> Sorry for the long post. >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Daniel Santos >>>>>>> >>>>>>> >>>>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: >>>>>>>> Hey Daniel! >>>>>>>> >>>>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is >>>>>>>> not expected. >>>>> What kind of Threads are you seeing being spawned/lingering around? >>>>>>>> As a first step, could you try to disable checkpointing and see >>>>>>>> how it behaves afterwards? >>>>>>>> >>>>>>>> – Ufuk >>>>>>>> >>>>>>>> On 29 November 2016 at 17:32:32, Daniel Santos >>>>>>>> ([hidden email]) wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, >>>>>>>>> and a >>>>>>>>> HttpClient as a Sink, also Kafka as Sink. >>>>>>>>> So it's possible that the state backend is the culprit? >>>>>>>>> >>>>>>>>> Curious thing is even when no jobs are running streaming or >>>>>>>>> otherwise, >>>>>>>>> the JVM Non-HEAP stays the same. >>>>>>>>> Which I find it odd. >>>>>>>>> >>>>>>>>> Another curious thing is that it's proportional to an increase >>>>>>>>> of JVM >>>>>>>>> thread's number. >>>>>>>>> Whenever there are more JVM threads running there is also more JVM >>>>>>>>> Non-HEAP being used, which makes sense. >>>>>>>>> But threads stick around never decreasing, too, likewise JVM >>>>>>>>> Non-HEAP >>>>>>>>> memory. >>>>>>>>> >>>>>>>>> These observations described are based on what flink's metrics >>>>>>>>> are being >>>>>>>>> sent and recorded to our graphite's system. >>>>>>>>> >>>>>>>>> Best Regards, >>>>>>>>> >>>>>>>>> Daniel Santos >>>>>>>>> >>>>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: >>>>>>>>>> Are you using the RocksDB backend in native mode? If so then the >>>>>>>>>> off-heap memory may be there. >>>>>>>>>> >>>>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: >>>>>>>>>> >>>>>>>>>> i have the same problem,but i put the flink job into yarn. >>>>>>>>>> but i put the job into yarn on the computer 22,and the job can >>>>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they >>>>>>>>>> three different compu345ter, >>>>>>>>>> however,on computer 22,the pid=3463,which is the job that put into >>>>>>>>>> yarn,is have 2.3g memory,15% of total, >>>>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 >>>>>>>>>> -ytm 1024 .... >>>>>>>>>> why in conputer 22,has occupy so much momory?the job is running >>>>>>>>>> computer 79 and computer 69. >>>>>>>>>> What would be the possible causes of such behavior ? >>>>>>>>>> Best Regards, >>>>>>>>>> ----- 原始邮件 ----- >>>>>>>>>> 发件人:Daniel Santos > > > >>>>>>>>>> 收件人:[hidden email] >>>>>>>>>> 主题:JVM Non Heap Memory >>>>>>>>>> 日期:2016年11月29日 22点26分 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> Is it common to have high usage of Non-Heap in JVM ? >>>>>>>>>> I am running flink in stand-alone cluster and in docker, with each >>>>>>>>>> docker bieng capped at 6G of memory. >>>>>>>>>> I have been struggling to keep memory usage in check. >>>>>>>>>> The non-heap increases to no end. It start with just 100MB of >>>>>>>>>> usage and >>>>>>>>>> after a day it reaches to 1,3GB. >>>>>>>>>> Then evetually reaches to 2GB and then eventually the docker is >>>>>>>>>> killed >>>>>>>>>> because it has reached the memory limit. >>>>>>>>>> My configuration for each flink task manager is the following : >>>>>>>>>> ----------- flink-conf.yaml -------------- >>>>>>>>>> taskmanager.heap.mb: 3072 >>>>>>>>>> taskmanager.numberOfTaskSlots: 8 >>>>>>>>>> taskmanager.memory.preallocate: false >>>>>>>>>> taskmanager.network.numberOfBuffers: 12500 >>>>>>>>>> taskmanager.memory.off-heap: false >>>>>>>>>> --------------------------------------------- >>>>>>>>>> What would be the possible causes of such behavior ? >>>>>>>>>> Best Regards, >>>>>>>>>> Daniel Santos >>>>>>>>>> >>>>>>>>>> >> >> > |
Free forum by Nabble | Edit this page |