Hi, Could you attach full logs from those task managers? At first glance I don’t see a connection between those exceptions and any memory issue that you might had. It looks like a dependency issue in one (some? All?) of your jobs. Did you build your jars with -Pbuild-jar profile as described here: ? If that doesn’t help. Can you binary search which job is causing the problem? There might be some Flink incompatibility between different versions and rebuilding a job’s jar with a version matching to the cluster version might help. Piotrek
|
On 2017-11-09 20:08, Piotr Nowojski wrote:
> Hi, > > Could you attach full logs from those task managers? At first glance I > don’t see a connection between those exceptions and any memory issue > that you might had. It looks like a dependency issue in one (some? > All?) of your jobs. > > Did you build your jars with -Pbuild-jar profile as described here: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project > ? > > If that doesn’t help. Can you binary search which job is causing the > problem? There might be some Flink incompatibility between different > versions and rebuilding a job’s jar with a version matching to the > cluster version might help. > > Piotrek > >> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >> >> On 2017-11-08 18:30, Piotr Nowojski wrote: >> Btw, Ebru: >> I don’t agree that the main suspect is NetworkBufferPool. On your >> screenshots it’s memory consumption was reasonable and stable: >> 596MB >> -> 602MB -> 597MB. >> PoolThreadCache memory usage ~120MB is also reasonable. >> Do you experience any problems, like Out Of Memory >> errors/crashes/long >> GC pauses? Or just JVM process is using more memory over time? You >> are >> aware that JVM doesn’t like to release memory back to OS once it >> was >> used? So increasing memory usage until hitting some limit (for >> example >> JVM max heap size) is expected behaviour. >> Piotrek >> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >> wrote: >> I don’t know if this is relevant to this issue, but I was >> constantly getting failures trying to reproduce this leak using your >> Job, because you were using non deterministic getKey function: >> @Override >> public Integer getKey(Integer event) { >> Random randomGen = new Random((new Date()).getTime()); >> return randomGen.nextInt() % 8; >> } >> And quoting Java doc of KeySelector: >> "If invoked multiple times on the same object, the returned key must >> be the same.” >> I’m trying to reproduce this issue with following job: >> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >> Where IntegerSource is just an infinite source, DisardingSink is >> well just discarding incoming data. I’m cancelling the job every 5 >> seconds and so far (after ~15 minutes) my memory consumption is >> stable, well below maximum java heap size. >> Piotrek >> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >> wrote: >> Yes, I tested with just printing the stream. But it could take a >> lot of time to fail. >> On Wednesday, 8 November 2017, Piotr Nowojski >> <[hidden email]> wrote: >> Thanks for quick answer. >> So it will also fail after some time with `fromElements` source >> instead of Kafka, right? >> Did you try it also without a Kafka producer? >> Piotrek >> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >> wrote: >> Hi, >> You don't need data. With data it will die faster. I tested as >> well with a small data set, using the fromElements source, but it >> will take some time to die. It's better with some data. >> On 8 November 2017 at 14:54, Piotr Nowojski >> <[hidden email]> wrote: >> Hi, >> Thanks for sharing this job. >> Do I need to feed some data to the Kafka to reproduce this > issue with your script? > >>> Does this OOM issue also happen when you are not using the > Kafka source/sink? > >>> Piotrek >>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> > wrote: > >>> Hi, >>> This is the test flink job we created to trigger this leak > https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 > >>> And this is the python script we are using to execute the job > thousands of times to get the OOM problem > https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 > >>> The cluster we used for this has this configuration: >>> Instance type: t2.large >>> Number of workers: 2 >>> HeapMemory: 5500 >>> Number of task slots per node: 4 >>> TaskMangMemFraction: 0.5 >>> NumberOfNetworkBuffers: 2000 >>> We have tried several things, increasing the heap, reducing the > heap, more memory fraction, changes this value in the > taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to > work. > >>> Thanks for your help. >>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <[hidden email]> wrote: > >> On 2017-11-08 15:20, Piotr Nowojski wrote: >> Hi Ebru and Javier, >> Yes, if you could share this example job it would be helpful. >> Ebru: could you explain in a little more details how does > your Job(s) > >> look like? Could you post some code? If you are just using > maps and > >> filters there shouldn’t be any network transfers involved, > aside > >> from Source and Sink functions. >> Piotrek >> On 8 Nov 2017, at 12:54, ebru > <[hidden email]> wrote: > >> Hi Javier, >> It would be helpful if you share your test job with us. >> Which configurations did you try? >> -Ebru >> On 8 Nov 2017, at 14:43, Javier Lopez > <[hidden email]> > >> wrote: >> Hi, >> We have been facing a similar problem. We have tried some > different > >> configurations, as proposed in other email thread by Flavio > and > >> Kien, but it didn't work. We have a workaround similar to > the one > >> that Flavio has, we restart the taskmanagers once they reach > a > >> memory threshold. We created a small test to remove all of > our > >> dependencies and leave only flink native libraries. This > test reads > >> data from a Kafka topic and writes it back to another topic > in > >> Kafka. We cancel the job and start another every 5 seconds. > After > >> ~30 minutes of doing this process, the cluster reaches the > OS memory > >> limit and dies. >> Currently, we have a test cluster with 8 workers and 8 task > slots > >> per node. We have one job that uses 56 slots, and we cannot > execute > >> that job 5 times in a row because the whole cluster dies. If > you > >> want, we can publish our test job. >> Regards, >> On 8 November 2017 at 11:20, Aljoscha Krettek > <[hidden email]> > >> wrote: >> @Nico & @Piotr Could you please have a look at this? You > both > >> recently worked on the network stack and might be most > familiar with > >> this. >> On 8. Nov 2017, at 10:25, Flavio Pompermaier > <[hidden email]> > >> wrote: >> We also have the same problem in production. At the moment > the > >> solution is to restart the entire Flink cluster after every > job.. > >> We've tried to reproduce this problem with a test (see >> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we > don't > >> know whether the error produced by the test and the leak are >> correlated.. >> Best, >> Flavio >> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA > EBRU > >> <[hidden email]> wrote: >> On 2017-11-07 16:53, Ufuk Celebi wrote: >> Do you use any windowing? If yes, could you please share > that code? > >> If >> there is no stateful operation at all, it's strange where > the list > >> state instances are coming from. >> On Tue, Nov 7, 2017 at 2:35 PM, ebru > <[hidden email]> > >> wrote: >> Hi Ufuk, >> We don’t explicitly define any state descriptor. We only > use map > >> and filters >> operator. We thought that gc handle clearing the flink’s > internal > >> states. >> So how can we manage the memory if it is always increasing? >> - Ebru >> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >> Hey Ebru, the memory usage might be increasing as long as a > job is > >> running. >> This is expected (also in the case of multiple running > jobs). The > >> screenshots are not helpful in that regard. :-( >> What kind of stateful operations are you using? Depending on > your > >> use case, >> you have to manually call `clear()` on the state instance in > order > >> to >> release the managed state. >> Best, >> Ufuk >> On Tue, Nov 7, 2017 at 12:43 PM, ebru >> <[hidden email]> wrote: >> Begin forwarded message: >> From: ebru <[hidden email]> >> Subject: Re: Flink memory leak >> Date: 7 November 2017 at 14:09:17 GMT+3 >> To: Ufuk Celebi <[hidden email]> >> Hi Ufuk, >> There are there snapshots of htop output. >> 1. snapshot is initial state. >> 2. snapshot is after submitted one job. >> 3. Snapshot is the output of the one job with 15000 EPS. And > the > >> memory >> usage is always increasing over time. >> <1.png><2.png><3.png> >> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >> Hey Ebru, >> let me pull in Aljoscha (CC'd) who might have an idea what's > causing > >> this. >> Since multiple jobs are running, it will be hard to > understand to > >> which job the state descriptors from the heap snapshot > belong to. > >> - Is it possible to isolate the problem and reproduce the > behaviour > >> with only a single job? >> – Ufuk >> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU > ÇETİNKAYA EBRU > >> <[hidden email]> wrote: >> Hi, >> We are using Flink 1.3.1 in production, we have one job > manager and > >> 3 task >> managers in standalone mode. Recently, we've noticed that we > have > >> memory >> related problems. We use docker container to serve Flink > cluster. We > >> have >> 300 slots and 20 jobs are running with parallelism of 10. > Also the > >> job >> count >> may be change over time. Taskmanager memory usage always > increases. > >> After >> job cancelation this memory usage doesn't decrease. We've > tried to > >> investigate the problem and we've got the task manager jvm > heap > >> snapshot. >> According to the jam heap analysis, possible memory leak was > Flink > >> list >> state descriptor. But we are not sure that is the cause of > our > >> memory >> problem. How can we solve the problem? >> We have two types of Flink job. One has no state full > operator > >> contains only maps and filters and the other has time window > with > >> count trigger. >> * We've analysed the jvm heaps again in different > conditions. First > >> we analysed the snapshot when no flink jobs running on > cluster. (image > >> 1) >> * Then, we analysed the jvm heap snapshot when the flink job > that has > >> no state full operator is running. And according to the > results, leak > >> suspect was NetworkBufferPool (image 2) >> * Last analys, there were both two types of jobs running > and leak > >> suspect was again NetworkBufferPool. (image 3) >> In our system jobs are regularly cancelled and resubmitted so > we > >> noticed that when job is submitted some amount of memory > allocated and > >> after cancelation this allocated memory never freed. So over > time > >> memory usage is always increasing and exceeded the limits. > Links: > ------ > [1] https://issues.apache.org/jira/browse/FLINK-7845 > Hi Piotr, > There are two types of jobs. > In first, we use Kafka source and Kafka sink, there isn't any > window operator. > >> In second job, we use Kafka source, filesystem sink and > elastic search sink and window operator for buffering. > >>> > Hi Piotrek, > > Thanks for your reply. > > We've tested our link cluster again. We have 360 slots, and our > cluster configuration is like this; > > jobmanager.rpc.address: %JOBMANAGER% > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 1536 > taskmanager.heap.mb: 1536 > taskmanager.numberOfTaskSlots: 120 > taskmanager.memory.preallocate: false > parallelism.default: 1 > jobmanager.web.port: 8081 > state.backend: filesystem > state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% > state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% > taskmanager.network.numberOfBuffers: 5000 > > We are using docker based Flink cluster. > WE submitted 36 jobs with the parallelism of 10. After all slots > became full. Memory usage were increasing by the time and one by one > task managers start to die. And the exception was like this; > Taskmanager1 log: > Uncaught error from thread [flink-akka.actor.default-dispatcher-17] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > java.lang.NoClassDefFoundError: > org/apache/kafka/common/metrics/stats/Rate$1 > at > org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) > at > org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) > at > org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.metrics.stats.Rate$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 22 more > > Taskmanager2 log: > Uncaught error from thread [flink-akka.actor.default-dispatcher-17] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > Java.lang.NoClassDefFoundError: > org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 18 more > > -Ebru We attached the full log of the taskmanager1. This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers. -Ebru taskmanager1.log.zip (723K) Download Attachment |
Hi,
Thanks for the logs, however I do not see before mentioned exceptions in it. It ends with java.lang.InterruptedException Is it the correct log file? Also, could you attach the std output file of the failing TaskManager? Piotrek > On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: > > On 2017-11-09 20:08, Piotr Nowojski wrote: >> Hi, >> Could you attach full logs from those task managers? At first glance I >> don’t see a connection between those exceptions and any memory issue >> that you might had. It looks like a dependency issue in one (some? >> All?) of your jobs. >> Did you build your jars with -Pbuild-jar profile as described here: >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >> ? >> If that doesn’t help. Can you binary search which job is causing the >> problem? There might be some Flink incompatibility between different >> versions and rebuilding a job’s jar with a version matching to the >> cluster version might help. >> Piotrek >>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <[hidden email]> wrote: >>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>> Btw, Ebru: >>> I don’t agree that the main suspect is NetworkBufferPool. On your >>> screenshots it’s memory consumption was reasonable and stable: >>> 596MB >>> -> 602MB -> 597MB. >>> PoolThreadCache memory usage ~120MB is also reasonable. >>> Do you experience any problems, like Out Of Memory >>> errors/crashes/long >>> GC pauses? Or just JVM process is using more memory over time? You >>> are >>> aware that JVM doesn’t like to release memory back to OS once it >>> was >>> used? So increasing memory usage until hitting some limit (for >>> example >>> JVM max heap size) is expected behaviour. >>> Piotrek >>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>> wrote: >>> I don’t know if this is relevant to this issue, but I was >>> constantly getting failures trying to reproduce this leak using your >>> Job, because you were using non deterministic getKey function: >>> @Override >>> public Integer getKey(Integer event) { >>> Random randomGen = new Random((new Date()).getTime()); >>> return randomGen.nextInt() % 8; >>> } >>> And quoting Java doc of KeySelector: >>> "If invoked multiple times on the same object, the returned key must >>> be the same.” >>> I’m trying to reproduce this issue with following job: >>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>> Where IntegerSource is just an infinite source, DisardingSink is >>> well just discarding incoming data. I’m cancelling the job every 5 >>> seconds and so far (after ~15 minutes) my memory consumption is >>> stable, well below maximum java heap size. >>> Piotrek >>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>> wrote: >>> Yes, I tested with just printing the stream. But it could take a >>> lot of time to fail. >>> On Wednesday, 8 November 2017, Piotr Nowojski >>> <[hidden email]> wrote: >>> Thanks for quick answer. >>> So it will also fail after some time with `fromElements` source >>> instead of Kafka, right? >>> Did you try it also without a Kafka producer? >>> Piotrek >>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>> wrote: >>> Hi, >>> You don't need data. With data it will die faster. I tested as >>> well with a small data set, using the fromElements source, but it >>> will take some time to die. It's better with some data. >>> On 8 November 2017 at 14:54, Piotr Nowojski >>> <[hidden email]> wrote: >>> Hi, >>> Thanks for sharing this job. >>> Do I need to feed some data to the Kafka to reproduce this >> issue with your script? >>>> Does this OOM issue also happen when you are not using the >> Kafka source/sink? >>>> Piotrek >>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >> wrote: >>>> Hi, >>>> This is the test flink job we created to trigger this leak >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>> And this is the python script we are using to execute the job >> thousands of times to get the OOM problem >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>> The cluster we used for this has this configuration: >>>> Instance type: t2.large >>>> Number of workers: 2 >>>> HeapMemory: 5500 >>>> Number of task slots per node: 4 >>>> TaskMangMemFraction: 0.5 >>>> NumberOfNetworkBuffers: 2000 >>>> We have tried several things, increasing the heap, reducing the >> heap, more memory fraction, changes this value in the >> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >> work. >>>> Thanks for your help. >>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>> Hi Ebru and Javier, >>> Yes, if you could share this example job it would be helpful. >>> Ebru: could you explain in a little more details how does >> your Job(s) >>> look like? Could you post some code? If you are just using >> maps and >>> filters there shouldn’t be any network transfers involved, >> aside >>> from Source and Sink functions. >>> Piotrek >>> On 8 Nov 2017, at 12:54, ebru >> <[hidden email]> wrote: >>> Hi Javier, >>> It would be helpful if you share your test job with us. >>> Which configurations did you try? >>> -Ebru >>> On 8 Nov 2017, at 14:43, Javier Lopez >> <[hidden email]> >>> wrote: >>> Hi, >>> We have been facing a similar problem. We have tried some >> different >>> configurations, as proposed in other email thread by Flavio >> and >>> Kien, but it didn't work. We have a workaround similar to >> the one >>> that Flavio has, we restart the taskmanagers once they reach >> a >>> memory threshold. We created a small test to remove all of >> our >>> dependencies and leave only flink native libraries. This >> test reads >>> data from a Kafka topic and writes it back to another topic >> in >>> Kafka. We cancel the job and start another every 5 seconds. >> After >>> ~30 minutes of doing this process, the cluster reaches the >> OS memory >>> limit and dies. >>> Currently, we have a test cluster with 8 workers and 8 task >> slots >>> per node. We have one job that uses 56 slots, and we cannot >> execute >>> that job 5 times in a row because the whole cluster dies. If >> you >>> want, we can publish our test job. >>> Regards, >>> On 8 November 2017 at 11:20, Aljoscha Krettek >> <[hidden email]> >>> wrote: >>> @Nico & @Piotr Could you please have a look at this? You >> both >>> recently worked on the network stack and might be most >> familiar with >>> this. >>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >> <[hidden email]> >>> wrote: >>> We also have the same problem in production. At the moment >> the >>> solution is to restart the entire Flink cluster after every >> job.. >>> We've tried to reproduce this problem with a test (see >>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >> don't >>> know whether the error produced by the test and the leak are >>> correlated.. >>> Best, >>> Flavio >>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >> EBRU >>> <[hidden email]> wrote: >>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>> Do you use any windowing? If yes, could you please share >> that code? >>> If >>> there is no stateful operation at all, it's strange where >> the list >>> state instances are coming from. >>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >> <[hidden email]> >>> wrote: >>> Hi Ufuk, >>> We don’t explicitly define any state descriptor. We only >> use map >>> and filters >>> operator. We thought that gc handle clearing the flink’s >> internal >>> states. >>> So how can we manage the memory if it is always increasing? >>> - Ebru >>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>> Hey Ebru, the memory usage might be increasing as long as a >> job is >>> running. >>> This is expected (also in the case of multiple running >> jobs). The >>> screenshots are not helpful in that regard. :-( >>> What kind of stateful operations are you using? Depending on >> your >>> use case, >>> you have to manually call `clear()` on the state instance in >> order >>> to >>> release the managed state. >>> Best, >>> Ufuk >>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>> <[hidden email]> wrote: >>> Begin forwarded message: >>> From: ebru <[hidden email]> >>> Subject: Re: Flink memory leak >>> Date: 7 November 2017 at 14:09:17 GMT+3 >>> To: Ufuk Celebi <[hidden email]> >>> Hi Ufuk, >>> There are there snapshots of htop output. >>> 1. snapshot is initial state. >>> 2. snapshot is after submitted one job. >>> 3. Snapshot is the output of the one job with 15000 EPS. And >> the >>> memory >>> usage is always increasing over time. >>> <1.png><2.png><3.png> >>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>> Hey Ebru, >>> let me pull in Aljoscha (CC'd) who might have an idea what's >> causing >>> this. >>> Since multiple jobs are running, it will be hard to >> understand to >>> which job the state descriptors from the heap snapshot >> belong to. >>> - Is it possible to isolate the problem and reproduce the >> behaviour >>> with only a single job? >>> – Ufuk >>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >> ÇETİNKAYA EBRU >>> <[hidden email]> wrote: >>> Hi, >>> We are using Flink 1.3.1 in production, we have one job >> manager and >>> 3 task >>> managers in standalone mode. Recently, we've noticed that we >> have >>> memory >>> related problems. We use docker container to serve Flink >> cluster. We >>> have >>> 300 slots and 20 jobs are running with parallelism of 10. >> Also the >>> job >>> count >>> may be change over time. Taskmanager memory usage always >> increases. >>> After >>> job cancelation this memory usage doesn't decrease. We've >> tried to >>> investigate the problem and we've got the task manager jvm >> heap >>> snapshot. >>> According to the jam heap analysis, possible memory leak was >> Flink >>> list >>> state descriptor. But we are not sure that is the cause of >> our >>> memory >>> problem. How can we solve the problem? >>> We have two types of Flink job. One has no state full >> operator >>> contains only maps and filters and the other has time window >> with >>> count trigger. >>> * We've analysed the jvm heaps again in different >> conditions. First >>> we analysed the snapshot when no flink jobs running on >> cluster. (image >>> 1) >>> * Then, we analysed the jvm heap snapshot when the flink job >> that has >>> no state full operator is running. And according to the >> results, leak >>> suspect was NetworkBufferPool (image 2) >>> * Last analys, there were both two types of jobs running >> and leak >>> suspect was again NetworkBufferPool. (image 3) >>> In our system jobs are regularly cancelled and resubmitted so >> we >>> noticed that when job is submitted some amount of memory >> allocated and >>> after cancelation this allocated memory never freed. So over >> time >>> memory usage is always increasing and exceeded the limits. >> Links: >> ------ >> [1] https://issues.apache.org/jira/browse/FLINK-7845 >> Hi Piotr, >> There are two types of jobs. >> In first, we use Kafka source and Kafka sink, there isn't any >> window operator. >>> In second job, we use Kafka source, filesystem sink and >> elastic search sink and window operator for buffering. >> Hi Piotrek, >> Thanks for your reply. >> We've tested our link cluster again. We have 360 slots, and our >> cluster configuration is like this; >> jobmanager.rpc.address: %JOBMANAGER% >> jobmanager.rpc.port: 6123 >> jobmanager.heap.mb: 1536 >> taskmanager.heap.mb: 1536 >> taskmanager.numberOfTaskSlots: 120 >> taskmanager.memory.preallocate: false >> parallelism.default: 1 >> jobmanager.web.port: 8081 >> state.backend: filesystem >> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >> taskmanager.network.numberOfBuffers: 5000 >> We are using docker based Flink cluster. >> WE submitted 36 jobs with the parallelism of 10. After all slots >> became full. Memory usage were increasing by the time and one by one >> task managers start to die. And the exception was like this; >> Taskmanager1 log: >> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >> ActorSystem[flink] >> java.lang.NoClassDefFoundError: >> org/apache/kafka/common/metrics/stats/Rate$1 >> at >> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >> at >> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >> at >> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >> at >> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >> at >> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >> at >> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >> at >> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.kafka.common.metrics.stats.Rate$1 >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> ... 22 more >> Taskmanager2 log: >> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >> ActorSystem[flink] >> Java.lang.NoClassDefFoundError: >> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >> at >> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >> at >> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> ... 18 more >> -Ebru > Hi Piotrek, > > We attached the full log of the taskmanager1. > This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. > Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers. > > -Ebru<taskmanager1.log.zip> |
On 2017-11-10 11:04, Piotr Nowojski wrote:
> Hi, > > Thanks for the logs, however I do not see before mentioned exceptions > in it. It ends with java.lang.InterruptedException > > Is it the correct log file? Also, could you attach the std output file > of the failing TaskManager? > > Piotrek > >> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >> >> On 2017-11-09 20:08, Piotr Nowojski wrote: >>> Hi, >>> Could you attach full logs from those task managers? At first glance >>> I >>> don’t see a connection between those exceptions and any memory issue >>> that you might had. It looks like a dependency issue in one (some? >>> All?) of your jobs. >>> Did you build your jars with -Pbuild-jar profile as described here: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>> ? >>> If that doesn’t help. Can you binary search which job is causing the >>> problem? There might be some Flink incompatibility between different >>> versions and rebuilding a job’s jar with a version matching to the >>> cluster version might help. >>> Piotrek >>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>> Btw, Ebru: >>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>> screenshots it’s memory consumption was reasonable and stable: >>>> 596MB >>>> -> 602MB -> 597MB. >>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>> Do you experience any problems, like Out Of Memory >>>> errors/crashes/long >>>> GC pauses? Or just JVM process is using more memory over time? You >>>> are >>>> aware that JVM doesn’t like to release memory back to OS once it >>>> was >>>> used? So increasing memory usage until hitting some limit (for >>>> example >>>> JVM max heap size) is expected behaviour. >>>> Piotrek >>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>> wrote: >>>> I don’t know if this is relevant to this issue, but I was >>>> constantly getting failures trying to reproduce this leak using your >>>> Job, because you were using non deterministic getKey function: >>>> @Override >>>> public Integer getKey(Integer event) { >>>> Random randomGen = new Random((new Date()).getTime()); >>>> return randomGen.nextInt() % 8; >>>> } >>>> And quoting Java doc of KeySelector: >>>> "If invoked multiple times on the same object, the returned key must >>>> be the same.” >>>> I’m trying to reproduce this issue with following job: >>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>> Where IntegerSource is just an infinite source, DisardingSink is >>>> well just discarding incoming data. I’m cancelling the job every 5 >>>> seconds and so far (after ~15 minutes) my memory consumption is >>>> stable, well below maximum java heap size. >>>> Piotrek >>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>> wrote: >>>> Yes, I tested with just printing the stream. But it could take a >>>> lot of time to fail. >>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>> <[hidden email]> wrote: >>>> Thanks for quick answer. >>>> So it will also fail after some time with `fromElements` source >>>> instead of Kafka, right? >>>> Did you try it also without a Kafka producer? >>>> Piotrek >>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>> wrote: >>>> Hi, >>>> You don't need data. With data it will die faster. I tested as >>>> well with a small data set, using the fromElements source, but it >>>> will take some time to die. It's better with some data. >>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>> <[hidden email]> wrote: >>>> Hi, >>>> Thanks for sharing this job. >>>> Do I need to feed some data to the Kafka to reproduce this >>> issue with your script? >>>>> Does this OOM issue also happen when you are not using the >>> Kafka source/sink? >>>>> Piotrek >>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>> wrote: >>>>> Hi, >>>>> This is the test flink job we created to trigger this leak >>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>> And this is the python script we are using to execute the job >>> thousands of times to get the OOM problem >>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>> The cluster we used for this has this configuration: >>>>> Instance type: t2.large >>>>> Number of workers: 2 >>>>> HeapMemory: 5500 >>>>> Number of task slots per node: 4 >>>>> TaskMangMemFraction: 0.5 >>>>> NumberOfNetworkBuffers: 2000 >>>>> We have tried several things, increasing the heap, reducing the >>> heap, more memory fraction, changes this value in the >>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>> work. >>>>> Thanks for your help. >>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <[hidden email]> wrote: >>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>> Hi Ebru and Javier, >>>> Yes, if you could share this example job it would be helpful. >>>> Ebru: could you explain in a little more details how does >>> your Job(s) >>>> look like? Could you post some code? If you are just using >>> maps and >>>> filters there shouldn’t be any network transfers involved, >>> aside >>>> from Source and Sink functions. >>>> Piotrek >>>> On 8 Nov 2017, at 12:54, ebru >>> <[hidden email]> wrote: >>>> Hi Javier, >>>> It would be helpful if you share your test job with us. >>>> Which configurations did you try? >>>> -Ebru >>>> On 8 Nov 2017, at 14:43, Javier Lopez >>> <[hidden email]> >>>> wrote: >>>> Hi, >>>> We have been facing a similar problem. We have tried some >>> different >>>> configurations, as proposed in other email thread by Flavio >>> and >>>> Kien, but it didn't work. We have a workaround similar to >>> the one >>>> that Flavio has, we restart the taskmanagers once they reach >>> a >>>> memory threshold. We created a small test to remove all of >>> our >>>> dependencies and leave only flink native libraries. This >>> test reads >>>> data from a Kafka topic and writes it back to another topic >>> in >>>> Kafka. We cancel the job and start another every 5 seconds. >>> After >>>> ~30 minutes of doing this process, the cluster reaches the >>> OS memory >>>> limit and dies. >>>> Currently, we have a test cluster with 8 workers and 8 task >>> slots >>>> per node. We have one job that uses 56 slots, and we cannot >>> execute >>>> that job 5 times in a row because the whole cluster dies. If >>> you >>>> want, we can publish our test job. >>>> Regards, >>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>> <[hidden email]> >>>> wrote: >>>> @Nico & @Piotr Could you please have a look at this? You >>> both >>>> recently worked on the network stack and might be most >>> familiar with >>>> this. >>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>> <[hidden email]> >>>> wrote: >>>> We also have the same problem in production. At the moment >>> the >>>> solution is to restart the entire Flink cluster after every >>> job.. >>>> We've tried to reproduce this problem with a test (see >>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>> don't >>>> know whether the error produced by the test and the leak are >>>> correlated.. >>>> Best, >>>> Flavio >>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>> EBRU >>>> <[hidden email]> wrote: >>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>> Do you use any windowing? If yes, could you please share >>> that code? >>>> If >>>> there is no stateful operation at all, it's strange where >>> the list >>>> state instances are coming from. >>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>> <[hidden email]> >>>> wrote: >>>> Hi Ufuk, >>>> We don’t explicitly define any state descriptor. We only >>> use map >>>> and filters >>>> operator. We thought that gc handle clearing the flink’s >>> internal >>>> states. >>>> So how can we manage the memory if it is always increasing? >>>> - Ebru >>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>> Hey Ebru, the memory usage might be increasing as long as a >>> job is >>>> running. >>>> This is expected (also in the case of multiple running >>> jobs). The >>>> screenshots are not helpful in that regard. :-( >>>> What kind of stateful operations are you using? Depending on >>> your >>>> use case, >>>> you have to manually call `clear()` on the state instance in >>> order >>>> to >>>> release the managed state. >>>> Best, >>>> Ufuk >>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>> <[hidden email]> wrote: >>>> Begin forwarded message: >>>> From: ebru <[hidden email]> >>>> Subject: Re: Flink memory leak >>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>> To: Ufuk Celebi <[hidden email]> >>>> Hi Ufuk, >>>> There are there snapshots of htop output. >>>> 1. snapshot is initial state. >>>> 2. snapshot is after submitted one job. >>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>> the >>>> memory >>>> usage is always increasing over time. >>>> <1.png><2.png><3.png> >>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>> Hey Ebru, >>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>> causing >>>> this. >>>> Since multiple jobs are running, it will be hard to >>> understand to >>>> which job the state descriptors from the heap snapshot >>> belong to. >>>> - Is it possible to isolate the problem and reproduce the >>> behaviour >>>> with only a single job? >>>> – Ufuk >>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>> ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>> Hi, >>>> We are using Flink 1.3.1 in production, we have one job >>> manager and >>>> 3 task >>>> managers in standalone mode. Recently, we've noticed that we >>> have >>>> memory >>>> related problems. We use docker container to serve Flink >>> cluster. We >>>> have >>>> 300 slots and 20 jobs are running with parallelism of 10. >>> Also the >>>> job >>>> count >>>> may be change over time. Taskmanager memory usage always >>> increases. >>>> After >>>> job cancelation this memory usage doesn't decrease. We've >>> tried to >>>> investigate the problem and we've got the task manager jvm >>> heap >>>> snapshot. >>>> According to the jam heap analysis, possible memory leak was >>> Flink >>>> list >>>> state descriptor. But we are not sure that is the cause of >>> our >>>> memory >>>> problem. How can we solve the problem? >>>> We have two types of Flink job. One has no state full >>> operator >>>> contains only maps and filters and the other has time window >>> with >>>> count trigger. >>>> * We've analysed the jvm heaps again in different >>> conditions. First >>>> we analysed the snapshot when no flink jobs running on >>> cluster. (image >>>> 1) >>>> * Then, we analysed the jvm heap snapshot when the flink job >>> that has >>>> no state full operator is running. And according to the >>> results, leak >>>> suspect was NetworkBufferPool (image 2) >>>> * Last analys, there were both two types of jobs running >>> and leak >>>> suspect was again NetworkBufferPool. (image 3) >>>> In our system jobs are regularly cancelled and resubmitted so >>> we >>>> noticed that when job is submitted some amount of memory >>> allocated and >>>> after cancelation this allocated memory never freed. So over >>> time >>>> memory usage is always increasing and exceeded the limits. >>> Links: >>> ------ >>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>> Hi Piotr, >>> There are two types of jobs. >>> In first, we use Kafka source and Kafka sink, there isn't any >>> window operator. >>>> In second job, we use Kafka source, filesystem sink and >>> elastic search sink and window operator for buffering. >>> Hi Piotrek, >>> Thanks for your reply. >>> We've tested our link cluster again. We have 360 slots, and our >>> cluster configuration is like this; >>> jobmanager.rpc.address: %JOBMANAGER% >>> jobmanager.rpc.port: 6123 >>> jobmanager.heap.mb: 1536 >>> taskmanager.heap.mb: 1536 >>> taskmanager.numberOfTaskSlots: 120 >>> taskmanager.memory.preallocate: false >>> parallelism.default: 1 >>> jobmanager.web.port: 8081 >>> state.backend: filesystem >>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>> taskmanager.network.numberOfBuffers: 5000 >>> We are using docker based Flink cluster. >>> WE submitted 36 jobs with the parallelism of 10. After all slots >>> became full. Memory usage were increasing by the time and one by one >>> task managers start to die. And the exception was like this; >>> Taskmanager1 log: >>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>> ActorSystem[flink] >>> java.lang.NoClassDefFoundError: >>> org/apache/kafka/common/metrics/stats/Rate$1 >>> at >>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>> at >>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>> at >>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>> at >>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>> at >>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.kafka.common.metrics.stats.Rate$1 >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> ... 22 more >>> Taskmanager2 log: >>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>> ActorSystem[flink] >>> Java.lang.NoClassDefFoundError: >>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>> at >>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>> at >>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> ... 18 more >>> -Ebru >> Hi Piotrek, >> >> We attached the full log of the taskmanager1. >> This may not be a dependency issue because until all of the task slots >> is full, we didn't get any No Class Def Found exception, when there is >> available memory jobs can run without exception for days. >> Also there is Kafka Instance Already Exist exception in full log, but >> this not relevant and doesn't effect jobs or task managers. >> >> -Ebru<taskmanager1.log.zip> Sorry we attached wrong log file. I've attached all task managers and job manager's log. All task managers and job manager was killed. logs.zip (5M) Download Attachment |
jobmanager1.log and taskmanager2.log are the same. Can you also submit files containing std output?
Piotrek > On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: > > On 2017-11-10 11:04, Piotr Nowojski wrote: >> Hi, >> Thanks for the logs, however I do not see before mentioned exceptions >> in it. It ends with java.lang.InterruptedException >> Is it the correct log file? Also, could you attach the std output file >> of the failing TaskManager? >> Piotrek >>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>> Hi, >>>> Could you attach full logs from those task managers? At first glance I >>>> don’t see a connection between those exceptions and any memory issue >>>> that you might had. It looks like a dependency issue in one (some? >>>> All?) of your jobs. >>>> Did you build your jars with -Pbuild-jar profile as described here: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>> ? >>>> If that doesn’t help. Can you binary search which job is causing the >>>> problem? There might be some Flink incompatibility between different >>>> versions and rebuilding a job’s jar with a version matching to the >>>> cluster version might help. >>>> Piotrek >>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <[hidden email]> wrote: >>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>> Btw, Ebru: >>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>> screenshots it’s memory consumption was reasonable and stable: >>>>> 596MB >>>>> -> 602MB -> 597MB. >>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>> Do you experience any problems, like Out Of Memory >>>>> errors/crashes/long >>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>> are >>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>> was >>>>> used? So increasing memory usage until hitting some limit (for >>>>> example >>>>> JVM max heap size) is expected behaviour. >>>>> Piotrek >>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>>> wrote: >>>>> I don’t know if this is relevant to this issue, but I was >>>>> constantly getting failures trying to reproduce this leak using your >>>>> Job, because you were using non deterministic getKey function: >>>>> @Override >>>>> public Integer getKey(Integer event) { >>>>> Random randomGen = new Random((new Date()).getTime()); >>>>> return randomGen.nextInt() % 8; >>>>> } >>>>> And quoting Java doc of KeySelector: >>>>> "If invoked multiple times on the same object, the returned key must >>>>> be the same.” >>>>> I’m trying to reproduce this issue with following job: >>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>> stable, well below maximum java heap size. >>>>> Piotrek >>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>> wrote: >>>>> Yes, I tested with just printing the stream. But it could take a >>>>> lot of time to fail. >>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>> <[hidden email]> wrote: >>>>> Thanks for quick answer. >>>>> So it will also fail after some time with `fromElements` source >>>>> instead of Kafka, right? >>>>> Did you try it also without a Kafka producer? >>>>> Piotrek >>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>> wrote: >>>>> Hi, >>>>> You don't need data. With data it will die faster. I tested as >>>>> well with a small data set, using the fromElements source, but it >>>>> will take some time to die. It's better with some data. >>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>> <[hidden email]> wrote: >>>>> Hi, >>>>> Thanks for sharing this job. >>>>> Do I need to feed some data to the Kafka to reproduce this >>>> issue with your script? >>>>>> Does this OOM issue also happen when you are not using the >>>> Kafka source/sink? >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>> wrote: >>>>>> Hi, >>>>>> This is the test flink job we created to trigger this leak >>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>> And this is the python script we are using to execute the job >>>> thousands of times to get the OOM problem >>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>> The cluster we used for this has this configuration: >>>>>> Instance type: t2.large >>>>>> Number of workers: 2 >>>>>> HeapMemory: 5500 >>>>>> Number of task slots per node: 4 >>>>>> TaskMangMemFraction: 0.5 >>>>>> NumberOfNetworkBuffers: 2000 >>>>>> We have tried several things, increasing the heap, reducing the >>>> heap, more memory fraction, changes this value in the >>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>> work. >>>>>> Thanks for your help. >>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>> Hi Ebru and Javier, >>>>> Yes, if you could share this example job it would be helpful. >>>>> Ebru: could you explain in a little more details how does >>>> your Job(s) >>>>> look like? Could you post some code? If you are just using >>>> maps and >>>>> filters there shouldn’t be any network transfers involved, >>>> aside >>>>> from Source and Sink functions. >>>>> Piotrek >>>>> On 8 Nov 2017, at 12:54, ebru >>>> <[hidden email]> wrote: >>>>> Hi Javier, >>>>> It would be helpful if you share your test job with us. >>>>> Which configurations did you try? >>>>> -Ebru >>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>> <[hidden email]> >>>>> wrote: >>>>> Hi, >>>>> We have been facing a similar problem. We have tried some >>>> different >>>>> configurations, as proposed in other email thread by Flavio >>>> and >>>>> Kien, but it didn't work. We have a workaround similar to >>>> the one >>>>> that Flavio has, we restart the taskmanagers once they reach >>>> a >>>>> memory threshold. We created a small test to remove all of >>>> our >>>>> dependencies and leave only flink native libraries. This >>>> test reads >>>>> data from a Kafka topic and writes it back to another topic >>>> in >>>>> Kafka. We cancel the job and start another every 5 seconds. >>>> After >>>>> ~30 minutes of doing this process, the cluster reaches the >>>> OS memory >>>>> limit and dies. >>>>> Currently, we have a test cluster with 8 workers and 8 task >>>> slots >>>>> per node. We have one job that uses 56 slots, and we cannot >>>> execute >>>>> that job 5 times in a row because the whole cluster dies. If >>>> you >>>>> want, we can publish our test job. >>>>> Regards, >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>> <[hidden email]> >>>>> wrote: >>>>> @Nico & @Piotr Could you please have a look at this? You >>>> both >>>>> recently worked on the network stack and might be most >>>> familiar with >>>>> this. >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>> <[hidden email]> >>>>> wrote: >>>>> We also have the same problem in production. At the moment >>>> the >>>>> solution is to restart the entire Flink cluster after every >>>> job.. >>>>> We've tried to reproduce this problem with a test (see >>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>> don't >>>>> know whether the error produced by the test and the leak are >>>>> correlated.. >>>>> Best, >>>>> Flavio >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>> EBRU >>>>> <[hidden email]> wrote: >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>> Do you use any windowing? If yes, could you please share >>>> that code? >>>>> If >>>>> there is no stateful operation at all, it's strange where >>>> the list >>>>> state instances are coming from. >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>> <[hidden email]> >>>>> wrote: >>>>> Hi Ufuk, >>>>> We don’t explicitly define any state descriptor. We only >>>> use map >>>>> and filters >>>>> operator. We thought that gc handle clearing the flink’s >>>> internal >>>>> states. >>>>> So how can we manage the memory if it is always increasing? >>>>> - Ebru >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>> Hey Ebru, the memory usage might be increasing as long as a >>>> job is >>>>> running. >>>>> This is expected (also in the case of multiple running >>>> jobs). The >>>>> screenshots are not helpful in that regard. :-( >>>>> What kind of stateful operations are you using? Depending on >>>> your >>>>> use case, >>>>> you have to manually call `clear()` on the state instance in >>>> order >>>>> to >>>>> release the managed state. >>>>> Best, >>>>> Ufuk >>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>> <[hidden email]> wrote: >>>>> Begin forwarded message: >>>>> From: ebru <[hidden email]> >>>>> Subject: Re: Flink memory leak >>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>> To: Ufuk Celebi <[hidden email]> >>>>> Hi Ufuk, >>>>> There are there snapshots of htop output. >>>>> 1. snapshot is initial state. >>>>> 2. snapshot is after submitted one job. >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>> the >>>>> memory >>>>> usage is always increasing over time. >>>>> <1.png><2.png><3.png> >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>> Hey Ebru, >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>> causing >>>>> this. >>>>> Since multiple jobs are running, it will be hard to >>>> understand to >>>>> which job the state descriptors from the heap snapshot >>>> belong to. >>>>> - Is it possible to isolate the problem and reproduce the >>>> behaviour >>>>> with only a single job? >>>>> – Ufuk >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>> ÇETİNKAYA EBRU >>>>> <[hidden email]> wrote: >>>>> Hi, >>>>> We are using Flink 1.3.1 in production, we have one job >>>> manager and >>>>> 3 task >>>>> managers in standalone mode. Recently, we've noticed that we >>>> have >>>>> memory >>>>> related problems. We use docker container to serve Flink >>>> cluster. We >>>>> have >>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>> Also the >>>>> job >>>>> count >>>>> may be change over time. Taskmanager memory usage always >>>> increases. >>>>> After >>>>> job cancelation this memory usage doesn't decrease. We've >>>> tried to >>>>> investigate the problem and we've got the task manager jvm >>>> heap >>>>> snapshot. >>>>> According to the jam heap analysis, possible memory leak was >>>> Flink >>>>> list >>>>> state descriptor. But we are not sure that is the cause of >>>> our >>>>> memory >>>>> problem. How can we solve the problem? >>>>> We have two types of Flink job. One has no state full >>>> operator >>>>> contains only maps and filters and the other has time window >>>> with >>>>> count trigger. >>>>> * We've analysed the jvm heaps again in different >>>> conditions. First >>>>> we analysed the snapshot when no flink jobs running on >>>> cluster. (image >>>>> 1) >>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>> that has >>>>> no state full operator is running. And according to the >>>> results, leak >>>>> suspect was NetworkBufferPool (image 2) >>>>> * Last analys, there were both two types of jobs running >>>> and leak >>>>> suspect was again NetworkBufferPool. (image 3) >>>>> In our system jobs are regularly cancelled and resubmitted so >>>> we >>>>> noticed that when job is submitted some amount of memory >>>> allocated and >>>>> after cancelation this allocated memory never freed. So over >>>> time >>>>> memory usage is always increasing and exceeded the limits. >>>> Links: >>>> ------ >>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>> Hi Piotr, >>>> There are two types of jobs. >>>> In first, we use Kafka source and Kafka sink, there isn't any >>>> window operator. >>>>> In second job, we use Kafka source, filesystem sink and >>>> elastic search sink and window operator for buffering. >>>> Hi Piotrek, >>>> Thanks for your reply. >>>> We've tested our link cluster again. We have 360 slots, and our >>>> cluster configuration is like this; >>>> jobmanager.rpc.address: %JOBMANAGER% >>>> jobmanager.rpc.port: 6123 >>>> jobmanager.heap.mb: 1536 >>>> taskmanager.heap.mb: 1536 >>>> taskmanager.numberOfTaskSlots: 120 >>>> taskmanager.memory.preallocate: false >>>> parallelism.default: 1 >>>> jobmanager.web.port: 8081 >>>> state.backend: filesystem >>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>> taskmanager.network.numberOfBuffers: 5000 >>>> We are using docker based Flink cluster. >>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>> became full. Memory usage were increasing by the time and one by one >>>> task managers start to die. And the exception was like this; >>>> Taskmanager1 log: >>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>> ActorSystem[flink] >>>> java.lang.NoClassDefFoundError: >>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>> at >>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>> at >>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>> at >>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>> at >>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> ... 22 more >>>> Taskmanager2 log: >>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>> ActorSystem[flink] >>>> Java.lang.NoClassDefFoundError: >>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> ... 18 more >>>> -Ebru >>> Hi Piotrek, >>> We attached the full log of the taskmanager1. >>> This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. >>> Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers. >>> -Ebru<taskmanager1.log.zip> > Hi, > > Sorry we attached wrong log file. I've attached all task managers and job manager's log. All task managers and job manager was killed.<logs.zip> |
On 2017-11-10 13:14, Piotr Nowojski wrote:
> jobmanager1.log and taskmanager2.log are the same. Can you also submit > files containing std output? > > Piotrek > >> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >> >> On 2017-11-10 11:04, Piotr Nowojski wrote: >>> Hi, >>> Thanks for the logs, however I do not see before mentioned exceptions >>> in it. It ends with java.lang.InterruptedException >>> Is it the correct log file? Also, could you attach the std output >>> file >>> of the failing TaskManager? >>> Piotrek >>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>> Hi, >>>>> Could you attach full logs from those task managers? At first >>>>> glance I >>>>> don’t see a connection between those exceptions and any memory >>>>> issue >>>>> that you might had. It looks like a dependency issue in one (some? >>>>> All?) of your jobs. >>>>> Did you build your jars with -Pbuild-jar profile as described here: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>> ? >>>>> If that doesn’t help. Can you binary search which job is causing >>>>> the >>>>> problem? There might be some Flink incompatibility between >>>>> different >>>>> versions and rebuilding a job’s jar with a version matching to the >>>>> cluster version might help. >>>>> Piotrek >>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>> Btw, Ebru: >>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>> 596MB >>>>>> -> 602MB -> 597MB. >>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>> Do you experience any problems, like Out Of Memory >>>>>> errors/crashes/long >>>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>>> are >>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>> was >>>>>> used? So increasing memory usage until hitting some limit (for >>>>>> example >>>>>> JVM max heap size) is expected behaviour. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>>>> wrote: >>>>>> I don’t know if this is relevant to this issue, but I was >>>>>> constantly getting failures trying to reproduce this leak using >>>>>> your >>>>>> Job, because you were using non deterministic getKey function: >>>>>> @Override >>>>>> public Integer getKey(Integer event) { >>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>> return randomGen.nextInt() % 8; >>>>>> } >>>>>> And quoting Java doc of KeySelector: >>>>>> "If invoked multiple times on the same object, the returned key >>>>>> must >>>>>> be the same.” >>>>>> I’m trying to reproduce this issue with following job: >>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>> stable, well below maximum java heap size. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>> wrote: >>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>> lot of time to fail. >>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>> <[hidden email]> wrote: >>>>>> Thanks for quick answer. >>>>>> So it will also fail after some time with `fromElements` source >>>>>> instead of Kafka, right? >>>>>> Did you try it also without a Kafka producer? >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>> wrote: >>>>>> Hi, >>>>>> You don't need data. With data it will die faster. I tested as >>>>>> well with a small data set, using the fromElements source, but it >>>>>> will take some time to die. It's better with some data. >>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>> <[hidden email]> wrote: >>>>>> Hi, >>>>>> Thanks for sharing this job. >>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>> issue with your script? >>>>>>> Does this OOM issue also happen when you are not using the >>>>> Kafka source/sink? >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>>> wrote: >>>>>>> Hi, >>>>>>> This is the test flink job we created to trigger this leak >>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>> And this is the python script we are using to execute the job >>>>> thousands of times to get the OOM problem >>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>> The cluster we used for this has this configuration: >>>>>>> Instance type: t2.large >>>>>>> Number of workers: 2 >>>>>>> HeapMemory: 5500 >>>>>>> Number of task slots per node: 4 >>>>>>> TaskMangMemFraction: 0.5 >>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>> We have tried several things, increasing the heap, reducing the >>>>> heap, more memory fraction, changes this value in the >>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>> work. >>>>>>> Thanks for your help. >>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <[hidden email]> wrote: >>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>> Hi Ebru and Javier, >>>>>> Yes, if you could share this example job it would be helpful. >>>>>> Ebru: could you explain in a little more details how does >>>>> your Job(s) >>>>>> look like? Could you post some code? If you are just using >>>>> maps and >>>>>> filters there shouldn’t be any network transfers involved, >>>>> aside >>>>>> from Source and Sink functions. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 12:54, ebru >>>>> <[hidden email]> wrote: >>>>>> Hi Javier, >>>>>> It would be helpful if you share your test job with us. >>>>>> Which configurations did you try? >>>>>> -Ebru >>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>> <[hidden email]> >>>>>> wrote: >>>>>> Hi, >>>>>> We have been facing a similar problem. We have tried some >>>>> different >>>>>> configurations, as proposed in other email thread by Flavio >>>>> and >>>>>> Kien, but it didn't work. We have a workaround similar to >>>>> the one >>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>> a >>>>>> memory threshold. We created a small test to remove all of >>>>> our >>>>>> dependencies and leave only flink native libraries. This >>>>> test reads >>>>>> data from a Kafka topic and writes it back to another topic >>>>> in >>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>> After >>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>> OS memory >>>>>> limit and dies. >>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>> slots >>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>> execute >>>>>> that job 5 times in a row because the whole cluster dies. If >>>>> you >>>>>> want, we can publish our test job. >>>>>> Regards, >>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>> <[hidden email]> >>>>>> wrote: >>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>> both >>>>>> recently worked on the network stack and might be most >>>>> familiar with >>>>>> this. >>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>> <[hidden email]> >>>>>> wrote: >>>>>> We also have the same problem in production. At the moment >>>>> the >>>>>> solution is to restart the entire Flink cluster after every >>>>> job.. >>>>>> We've tried to reproduce this problem with a test (see >>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>> don't >>>>>> know whether the error produced by the test and the leak are >>>>>> correlated.. >>>>>> Best, >>>>>> Flavio >>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>> EBRU >>>>>> <[hidden email]> wrote: >>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>> Do you use any windowing? If yes, could you please share >>>>> that code? >>>>>> If >>>>>> there is no stateful operation at all, it's strange where >>>>> the list >>>>>> state instances are coming from. >>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>> <[hidden email]> >>>>>> wrote: >>>>>> Hi Ufuk, >>>>>> We don’t explicitly define any state descriptor. We only >>>>> use map >>>>>> and filters >>>>>> operator. We thought that gc handle clearing the flink’s >>>>> internal >>>>>> states. >>>>>> So how can we manage the memory if it is always increasing? >>>>>> - Ebru >>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>> job is >>>>>> running. >>>>>> This is expected (also in the case of multiple running >>>>> jobs). The >>>>>> screenshots are not helpful in that regard. :-( >>>>>> What kind of stateful operations are you using? Depending on >>>>> your >>>>>> use case, >>>>>> you have to manually call `clear()` on the state instance in >>>>> order >>>>>> to >>>>>> release the managed state. >>>>>> Best, >>>>>> Ufuk >>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>> <[hidden email]> wrote: >>>>>> Begin forwarded message: >>>>>> From: ebru <[hidden email]> >>>>>> Subject: Re: Flink memory leak >>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>> To: Ufuk Celebi <[hidden email]> >>>>>> Hi Ufuk, >>>>>> There are there snapshots of htop output. >>>>>> 1. snapshot is initial state. >>>>>> 2. snapshot is after submitted one job. >>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>> the >>>>>> memory >>>>>> usage is always increasing over time. >>>>>> <1.png><2.png><3.png> >>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>> Hey Ebru, >>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>> causing >>>>>> this. >>>>>> Since multiple jobs are running, it will be hard to >>>>> understand to >>>>>> which job the state descriptors from the heap snapshot >>>>> belong to. >>>>>> - Is it possible to isolate the problem and reproduce the >>>>> behaviour >>>>>> with only a single job? >>>>>> – Ufuk >>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>> ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>> Hi, >>>>>> We are using Flink 1.3.1 in production, we have one job >>>>> manager and >>>>>> 3 task >>>>>> managers in standalone mode. Recently, we've noticed that we >>>>> have >>>>>> memory >>>>>> related problems. We use docker container to serve Flink >>>>> cluster. We >>>>>> have >>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>> Also the >>>>>> job >>>>>> count >>>>>> may be change over time. Taskmanager memory usage always >>>>> increases. >>>>>> After >>>>>> job cancelation this memory usage doesn't decrease. We've >>>>> tried to >>>>>> investigate the problem and we've got the task manager jvm >>>>> heap >>>>>> snapshot. >>>>>> According to the jam heap analysis, possible memory leak was >>>>> Flink >>>>>> list >>>>>> state descriptor. But we are not sure that is the cause of >>>>> our >>>>>> memory >>>>>> problem. How can we solve the problem? >>>>>> We have two types of Flink job. One has no state full >>>>> operator >>>>>> contains only maps and filters and the other has time window >>>>> with >>>>>> count trigger. >>>>>> * We've analysed the jvm heaps again in different >>>>> conditions. First >>>>>> we analysed the snapshot when no flink jobs running on >>>>> cluster. (image >>>>>> 1) >>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>> that has >>>>>> no state full operator is running. And according to the >>>>> results, leak >>>>>> suspect was NetworkBufferPool (image 2) >>>>>> * Last analys, there were both two types of jobs running >>>>> and leak >>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>> we >>>>>> noticed that when job is submitted some amount of memory >>>>> allocated and >>>>>> after cancelation this allocated memory never freed. So over >>>>> time >>>>>> memory usage is always increasing and exceeded the limits. >>>>> Links: >>>>> ------ >>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>> Hi Piotr, >>>>> There are two types of jobs. >>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>> window operator. >>>>>> In second job, we use Kafka source, filesystem sink and >>>>> elastic search sink and window operator for buffering. >>>>> Hi Piotrek, >>>>> Thanks for your reply. >>>>> We've tested our link cluster again. We have 360 slots, and our >>>>> cluster configuration is like this; >>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>> jobmanager.rpc.port: 6123 >>>>> jobmanager.heap.mb: 1536 >>>>> taskmanager.heap.mb: 1536 >>>>> taskmanager.numberOfTaskSlots: 120 >>>>> taskmanager.memory.preallocate: false >>>>> parallelism.default: 1 >>>>> jobmanager.web.port: 8081 >>>>> state.backend: filesystem >>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>> taskmanager.network.numberOfBuffers: 5000 >>>>> We are using docker based Flink cluster. >>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>> became full. Memory usage were increasing by the time and one by >>>>> one >>>>> task managers start to die. And the exception was like this; >>>>> Taskmanager1 log: >>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>> for >>>>> ActorSystem[flink] >>>>> java.lang.NoClassDefFoundError: >>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>> at >>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>> at >>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>> at >>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>> at >>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>> at >>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 22 more >>>>> Taskmanager2 log: >>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>> for >>>>> ActorSystem[flink] >>>>> Java.lang.NoClassDefFoundError: >>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>> at >>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 18 more >>>>> -Ebru >>>> Hi Piotrek, >>>> We attached the full log of the taskmanager1. >>>> This may not be a dependency issue because until all of the task >>>> slots is full, we didn't get any No Class Def Found exception, when >>>> there is available memory jobs can run without exception for days. >>>> Also there is Kafka Instance Already Exist exception in full log, >>>> but this not relevant and doesn't effect jobs or task managers. >>>> -Ebru<taskmanager1.log.zip> >> Hi, >> >> Sorry we attached wrong log file. I've attached all task managers and >> job manager's log. All task managers and job manager was >> killed.<logs.zip> time, it start using swap. I will send part-by-part because of size limit issues. logs2-1.zip (6M) Download Attachment |
In reply to this post by Piotr Nowojski
On 2017-11-10 13:14, Piotr Nowojski wrote:
> jobmanager1.log and taskmanager2.log are the same. Can you also submit > files containing std output? > > Piotrek > >> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >> >> On 2017-11-10 11:04, Piotr Nowojski wrote: >>> Hi, >>> Thanks for the logs, however I do not see before mentioned exceptions >>> in it. It ends with java.lang.InterruptedException >>> Is it the correct log file? Also, could you attach the std output >>> file >>> of the failing TaskManager? >>> Piotrek >>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>> Hi, >>>>> Could you attach full logs from those task managers? At first >>>>> glance I >>>>> don’t see a connection between those exceptions and any memory >>>>> issue >>>>> that you might had. It looks like a dependency issue in one (some? >>>>> All?) of your jobs. >>>>> Did you build your jars with -Pbuild-jar profile as described here: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>> ? >>>>> If that doesn’t help. Can you binary search which job is causing >>>>> the >>>>> problem? There might be some Flink incompatibility between >>>>> different >>>>> versions and rebuilding a job’s jar with a version matching to the >>>>> cluster version might help. >>>>> Piotrek >>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>> Btw, Ebru: >>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>> 596MB >>>>>> -> 602MB -> 597MB. >>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>> Do you experience any problems, like Out Of Memory >>>>>> errors/crashes/long >>>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>>> are >>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>> was >>>>>> used? So increasing memory usage until hitting some limit (for >>>>>> example >>>>>> JVM max heap size) is expected behaviour. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>>>> wrote: >>>>>> I don’t know if this is relevant to this issue, but I was >>>>>> constantly getting failures trying to reproduce this leak using >>>>>> your >>>>>> Job, because you were using non deterministic getKey function: >>>>>> @Override >>>>>> public Integer getKey(Integer event) { >>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>> return randomGen.nextInt() % 8; >>>>>> } >>>>>> And quoting Java doc of KeySelector: >>>>>> "If invoked multiple times on the same object, the returned key >>>>>> must >>>>>> be the same.” >>>>>> I’m trying to reproduce this issue with following job: >>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>> stable, well below maximum java heap size. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>> wrote: >>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>> lot of time to fail. >>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>> <[hidden email]> wrote: >>>>>> Thanks for quick answer. >>>>>> So it will also fail after some time with `fromElements` source >>>>>> instead of Kafka, right? >>>>>> Did you try it also without a Kafka producer? >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>> wrote: >>>>>> Hi, >>>>>> You don't need data. With data it will die faster. I tested as >>>>>> well with a small data set, using the fromElements source, but it >>>>>> will take some time to die. It's better with some data. >>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>> <[hidden email]> wrote: >>>>>> Hi, >>>>>> Thanks for sharing this job. >>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>> issue with your script? >>>>>>> Does this OOM issue also happen when you are not using the >>>>> Kafka source/sink? >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>>> wrote: >>>>>>> Hi, >>>>>>> This is the test flink job we created to trigger this leak >>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>> And this is the python script we are using to execute the job >>>>> thousands of times to get the OOM problem >>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>> The cluster we used for this has this configuration: >>>>>>> Instance type: t2.large >>>>>>> Number of workers: 2 >>>>>>> HeapMemory: 5500 >>>>>>> Number of task slots per node: 4 >>>>>>> TaskMangMemFraction: 0.5 >>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>> We have tried several things, increasing the heap, reducing the >>>>> heap, more memory fraction, changes this value in the >>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>> work. >>>>>>> Thanks for your help. >>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <[hidden email]> wrote: >>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>> Hi Ebru and Javier, >>>>>> Yes, if you could share this example job it would be helpful. >>>>>> Ebru: could you explain in a little more details how does >>>>> your Job(s) >>>>>> look like? Could you post some code? If you are just using >>>>> maps and >>>>>> filters there shouldn’t be any network transfers involved, >>>>> aside >>>>>> from Source and Sink functions. >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 12:54, ebru >>>>> <[hidden email]> wrote: >>>>>> Hi Javier, >>>>>> It would be helpful if you share your test job with us. >>>>>> Which configurations did you try? >>>>>> -Ebru >>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>> <[hidden email]> >>>>>> wrote: >>>>>> Hi, >>>>>> We have been facing a similar problem. We have tried some >>>>> different >>>>>> configurations, as proposed in other email thread by Flavio >>>>> and >>>>>> Kien, but it didn't work. We have a workaround similar to >>>>> the one >>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>> a >>>>>> memory threshold. We created a small test to remove all of >>>>> our >>>>>> dependencies and leave only flink native libraries. This >>>>> test reads >>>>>> data from a Kafka topic and writes it back to another topic >>>>> in >>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>> After >>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>> OS memory >>>>>> limit and dies. >>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>> slots >>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>> execute >>>>>> that job 5 times in a row because the whole cluster dies. If >>>>> you >>>>>> want, we can publish our test job. >>>>>> Regards, >>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>> <[hidden email]> >>>>>> wrote: >>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>> both >>>>>> recently worked on the network stack and might be most >>>>> familiar with >>>>>> this. >>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>> <[hidden email]> >>>>>> wrote: >>>>>> We also have the same problem in production. At the moment >>>>> the >>>>>> solution is to restart the entire Flink cluster after every >>>>> job.. >>>>>> We've tried to reproduce this problem with a test (see >>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>> don't >>>>>> know whether the error produced by the test and the leak are >>>>>> correlated.. >>>>>> Best, >>>>>> Flavio >>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>> EBRU >>>>>> <[hidden email]> wrote: >>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>> Do you use any windowing? If yes, could you please share >>>>> that code? >>>>>> If >>>>>> there is no stateful operation at all, it's strange where >>>>> the list >>>>>> state instances are coming from. >>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>> <[hidden email]> >>>>>> wrote: >>>>>> Hi Ufuk, >>>>>> We don’t explicitly define any state descriptor. We only >>>>> use map >>>>>> and filters >>>>>> operator. We thought that gc handle clearing the flink’s >>>>> internal >>>>>> states. >>>>>> So how can we manage the memory if it is always increasing? >>>>>> - Ebru >>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>> job is >>>>>> running. >>>>>> This is expected (also in the case of multiple running >>>>> jobs). The >>>>>> screenshots are not helpful in that regard. :-( >>>>>> What kind of stateful operations are you using? Depending on >>>>> your >>>>>> use case, >>>>>> you have to manually call `clear()` on the state instance in >>>>> order >>>>>> to >>>>>> release the managed state. >>>>>> Best, >>>>>> Ufuk >>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>> <[hidden email]> wrote: >>>>>> Begin forwarded message: >>>>>> From: ebru <[hidden email]> >>>>>> Subject: Re: Flink memory leak >>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>> To: Ufuk Celebi <[hidden email]> >>>>>> Hi Ufuk, >>>>>> There are there snapshots of htop output. >>>>>> 1. snapshot is initial state. >>>>>> 2. snapshot is after submitted one job. >>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>> the >>>>>> memory >>>>>> usage is always increasing over time. >>>>>> <1.png><2.png><3.png> >>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>> Hey Ebru, >>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>> causing >>>>>> this. >>>>>> Since multiple jobs are running, it will be hard to >>>>> understand to >>>>>> which job the state descriptors from the heap snapshot >>>>> belong to. >>>>>> - Is it possible to isolate the problem and reproduce the >>>>> behaviour >>>>>> with only a single job? >>>>>> – Ufuk >>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>> ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>> Hi, >>>>>> We are using Flink 1.3.1 in production, we have one job >>>>> manager and >>>>>> 3 task >>>>>> managers in standalone mode. Recently, we've noticed that we >>>>> have >>>>>> memory >>>>>> related problems. We use docker container to serve Flink >>>>> cluster. We >>>>>> have >>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>> Also the >>>>>> job >>>>>> count >>>>>> may be change over time. Taskmanager memory usage always >>>>> increases. >>>>>> After >>>>>> job cancelation this memory usage doesn't decrease. We've >>>>> tried to >>>>>> investigate the problem and we've got the task manager jvm >>>>> heap >>>>>> snapshot. >>>>>> According to the jam heap analysis, possible memory leak was >>>>> Flink >>>>>> list >>>>>> state descriptor. But we are not sure that is the cause of >>>>> our >>>>>> memory >>>>>> problem. How can we solve the problem? >>>>>> We have two types of Flink job. One has no state full >>>>> operator >>>>>> contains only maps and filters and the other has time window >>>>> with >>>>>> count trigger. >>>>>> * We've analysed the jvm heaps again in different >>>>> conditions. First >>>>>> we analysed the snapshot when no flink jobs running on >>>>> cluster. (image >>>>>> 1) >>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>> that has >>>>>> no state full operator is running. And according to the >>>>> results, leak >>>>>> suspect was NetworkBufferPool (image 2) >>>>>> * Last analys, there were both two types of jobs running >>>>> and leak >>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>> we >>>>>> noticed that when job is submitted some amount of memory >>>>> allocated and >>>>>> after cancelation this allocated memory never freed. So over >>>>> time >>>>>> memory usage is always increasing and exceeded the limits. >>>>> Links: >>>>> ------ >>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>> Hi Piotr, >>>>> There are two types of jobs. >>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>> window operator. >>>>>> In second job, we use Kafka source, filesystem sink and >>>>> elastic search sink and window operator for buffering. >>>>> Hi Piotrek, >>>>> Thanks for your reply. >>>>> We've tested our link cluster again. We have 360 slots, and our >>>>> cluster configuration is like this; >>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>> jobmanager.rpc.port: 6123 >>>>> jobmanager.heap.mb: 1536 >>>>> taskmanager.heap.mb: 1536 >>>>> taskmanager.numberOfTaskSlots: 120 >>>>> taskmanager.memory.preallocate: false >>>>> parallelism.default: 1 >>>>> jobmanager.web.port: 8081 >>>>> state.backend: filesystem >>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>> taskmanager.network.numberOfBuffers: 5000 >>>>> We are using docker based Flink cluster. >>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>> became full. Memory usage were increasing by the time and one by >>>>> one >>>>> task managers start to die. And the exception was like this; >>>>> Taskmanager1 log: >>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>> for >>>>> ActorSystem[flink] >>>>> java.lang.NoClassDefFoundError: >>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>> at >>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>> at >>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>> at >>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>> at >>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>> at >>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 22 more >>>>> Taskmanager2 log: >>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>> for >>>>> ActorSystem[flink] >>>>> Java.lang.NoClassDefFoundError: >>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>> at >>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>> at >>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 18 more >>>>> -Ebru >>>> Hi Piotrek, >>>> We attached the full log of the taskmanager1. >>>> This may not be a dependency issue because until all of the task >>>> slots is full, we didn't get any No Class Def Found exception, when >>>> there is available memory jobs can run without exception for days. >>>> Also there is Kafka Instance Already Exist exception in full log, >>>> but this not relevant and doesn't effect jobs or task managers. >>>> -Ebru<taskmanager1.log.zip> >> Hi, >> >> Sorry we attached wrong log file. I've attached all task managers and >> job manager's log. All task managers and job manager was >> killed.<logs.zip> logs2-2.zip (8M) Download Attachment |
In reply to this post by Piotr Nowojski
I do not see anything abnormal in the logs before this error :(
What are your JVM settings and which java version are you running? What happens if you limit the heap size so that the swap is never used? Piotrek > On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: > > On 2017-11-10 13:14, Piotr Nowojski wrote: >> jobmanager1.log and taskmanager2.log are the same. Can you also submit >> files containing std output? >> Piotrek >>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>> Hi, >>>> Thanks for the logs, however I do not see before mentioned exceptions >>>> in it. It ends with java.lang.InterruptedException >>>> Is it the correct log file? Also, could you attach the std output file >>>> of the failing TaskManager? >>>> Piotrek >>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>> Hi, >>>>>> Could you attach full logs from those task managers? At first glance I >>>>>> don’t see a connection between those exceptions and any memory issue >>>>>> that you might had. It looks like a dependency issue in one (some? >>>>>> All?) of your jobs. >>>>>> Did you build your jars with -Pbuild-jar profile as described here: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>>> ? >>>>>> If that doesn’t help. Can you binary search which job is causing the >>>>>> problem? There might be some Flink incompatibility between different >>>>>> versions and rebuilding a job’s jar with a version matching to the >>>>>> cluster version might help. >>>>>> Piotrek >>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>> <[hidden email]> wrote: >>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>> Btw, Ebru: >>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>>> 596MB >>>>>>> -> 602MB -> 597MB. >>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>> Do you experience any problems, like Out Of Memory >>>>>>> errors/crashes/long >>>>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>>>> are >>>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>>> was >>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>> example >>>>>>> JVM max heap size) is expected behaviour. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>>>>> wrote: >>>>>>> I don’t know if this is relevant to this issue, but I was >>>>>>> constantly getting failures trying to reproduce this leak using your >>>>>>> Job, because you were using non deterministic getKey function: >>>>>>> @Override >>>>>>> public Integer getKey(Integer event) { >>>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>>> return randomGen.nextInt() % 8; >>>>>>> } >>>>>>> And quoting Java doc of KeySelector: >>>>>>> "If invoked multiple times on the same object, the returned key must >>>>>>> be the same.” >>>>>>> I’m trying to reproduce this issue with following job: >>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>> stable, well below maximum java heap size. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>>> wrote: >>>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>>> lot of time to fail. >>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>> <[hidden email]> wrote: >>>>>>> Thanks for quick answer. >>>>>>> So it will also fail after some time with `fromElements` source >>>>>>> instead of Kafka, right? >>>>>>> Did you try it also without a Kafka producer? >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>>> wrote: >>>>>>> Hi, >>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>> well with a small data set, using the fromElements source, but it >>>>>>> will take some time to die. It's better with some data. >>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>> <[hidden email]> wrote: >>>>>>> Hi, >>>>>>> Thanks for sharing this job. >>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>> issue with your script? >>>>>>>> Does this OOM issue also happen when you are not using the >>>>>> Kafka source/sink? >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>>>> wrote: >>>>>>>> Hi, >>>>>>>> This is the test flink job we created to trigger this leak >>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>> And this is the python script we are using to execute the job >>>>>> thousands of times to get the OOM problem >>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>> The cluster we used for this has this configuration: >>>>>>>> Instance type: t2.large >>>>>>>> Number of workers: 2 >>>>>>>> HeapMemory: 5500 >>>>>>>> Number of task slots per node: 4 >>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>> We have tried several things, increasing the heap, reducing the >>>>>> heap, more memory fraction, changes this value in the >>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>>> work. >>>>>>>> Thanks for your help. >>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>> Hi Ebru and Javier, >>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>> Ebru: could you explain in a little more details how does >>>>>> your Job(s) >>>>>>> look like? Could you post some code? If you are just using >>>>>> maps and >>>>>>> filters there shouldn’t be any network transfers involved, >>>>>> aside >>>>>>> from Source and Sink functions. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>> <[hidden email]> wrote: >>>>>>> Hi Javier, >>>>>>> It would be helpful if you share your test job with us. >>>>>>> Which configurations did you try? >>>>>>> -Ebru >>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>> <[hidden email]> >>>>>>> wrote: >>>>>>> Hi, >>>>>>> We have been facing a similar problem. We have tried some >>>>>> different >>>>>>> configurations, as proposed in other email thread by Flavio >>>>>> and >>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>> the one >>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>> a >>>>>>> memory threshold. We created a small test to remove all of >>>>>> our >>>>>>> dependencies and leave only flink native libraries. This >>>>>> test reads >>>>>>> data from a Kafka topic and writes it back to another topic >>>>>> in >>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>> After >>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>> OS memory >>>>>>> limit and dies. >>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>> slots >>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>> execute >>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>> you >>>>>>> want, we can publish our test job. >>>>>>> Regards, >>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>> <[hidden email]> >>>>>>> wrote: >>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>> both >>>>>>> recently worked on the network stack and might be most >>>>>> familiar with >>>>>>> this. >>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>> <[hidden email]> >>>>>>> wrote: >>>>>>> We also have the same problem in production. At the moment >>>>>> the >>>>>>> solution is to restart the entire Flink cluster after every >>>>>> job.. >>>>>>> We've tried to reproduce this problem with a test (see >>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>> don't >>>>>>> know whether the error produced by the test and the leak are >>>>>>> correlated.. >>>>>>> Best, >>>>>>> Flavio >>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>>> EBRU >>>>>>> <[hidden email]> wrote: >>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>> Do you use any windowing? If yes, could you please share >>>>>> that code? >>>>>>> If >>>>>>> there is no stateful operation at all, it's strange where >>>>>> the list >>>>>>> state instances are coming from. >>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>> <[hidden email]> >>>>>>> wrote: >>>>>>> Hi Ufuk, >>>>>>> We don’t explicitly define any state descriptor. We only >>>>>> use map >>>>>>> and filters >>>>>>> operator. We thought that gc handle clearing the flink’s >>>>>> internal >>>>>>> states. >>>>>>> So how can we manage the memory if it is always increasing? >>>>>>> - Ebru >>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>> job is >>>>>>> running. >>>>>>> This is expected (also in the case of multiple running >>>>>> jobs). The >>>>>>> screenshots are not helpful in that regard. :-( >>>>>>> What kind of stateful operations are you using? Depending on >>>>>> your >>>>>>> use case, >>>>>>> you have to manually call `clear()` on the state instance in >>>>>> order >>>>>>> to >>>>>>> release the managed state. >>>>>>> Best, >>>>>>> Ufuk >>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>> <[hidden email]> wrote: >>>>>>> Begin forwarded message: >>>>>>> From: ebru <[hidden email]> >>>>>>> Subject: Re: Flink memory leak >>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>> To: Ufuk Celebi <[hidden email]> >>>>>>> Hi Ufuk, >>>>>>> There are there snapshots of htop output. >>>>>>> 1. snapshot is initial state. >>>>>>> 2. snapshot is after submitted one job. >>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>> the >>>>>>> memory >>>>>>> usage is always increasing over time. >>>>>>> <1.png><2.png><3.png> >>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>>> Hey Ebru, >>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>> causing >>>>>>> this. >>>>>>> Since multiple jobs are running, it will be hard to >>>>>> understand to >>>>>>> which job the state descriptors from the heap snapshot >>>>>> belong to. >>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>> behaviour >>>>>>> with only a single job? >>>>>>> – Ufuk >>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>>> ÇETİNKAYA EBRU >>>>>>> <[hidden email]> wrote: >>>>>>> Hi, >>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>> manager and >>>>>>> 3 task >>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>> have >>>>>>> memory >>>>>>> related problems. We use docker container to serve Flink >>>>>> cluster. We >>>>>>> have >>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>> Also the >>>>>>> job >>>>>>> count >>>>>>> may be change over time. Taskmanager memory usage always >>>>>> increases. >>>>>>> After >>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>> tried to >>>>>>> investigate the problem and we've got the task manager jvm >>>>>> heap >>>>>>> snapshot. >>>>>>> According to the jam heap analysis, possible memory leak was >>>>>> Flink >>>>>>> list >>>>>>> state descriptor. But we are not sure that is the cause of >>>>>> our >>>>>>> memory >>>>>>> problem. How can we solve the problem? >>>>>>> We have two types of Flink job. One has no state full >>>>>> operator >>>>>>> contains only maps and filters and the other has time window >>>>>> with >>>>>>> count trigger. >>>>>>> * We've analysed the jvm heaps again in different >>>>>> conditions. First >>>>>>> we analysed the snapshot when no flink jobs running on >>>>>> cluster. (image >>>>>>> 1) >>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>> that has >>>>>>> no state full operator is running. And according to the >>>>>> results, leak >>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>> * Last analys, there were both two types of jobs running >>>>>> and leak >>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>> we >>>>>>> noticed that when job is submitted some amount of memory >>>>>> allocated and >>>>>>> after cancelation this allocated memory never freed. So over >>>>>> time >>>>>>> memory usage is always increasing and exceeded the limits. >>>>>> Links: >>>>>> ------ >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>> Hi Piotr, >>>>>> There are two types of jobs. >>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>> window operator. >>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>> elastic search sink and window operator for buffering. >>>>>> Hi Piotrek, >>>>>> Thanks for your reply. >>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>> cluster configuration is like this; >>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>> jobmanager.rpc.port: 6123 >>>>>> jobmanager.heap.mb: 1536 >>>>>> taskmanager.heap.mb: 1536 >>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>> taskmanager.memory.preallocate: false >>>>>> parallelism.default: 1 >>>>>> jobmanager.web.port: 8081 >>>>>> state.backend: filesystem >>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>> We are using docker based Flink cluster. >>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>> became full. Memory usage were increasing by the time and one by one >>>>>> task managers start to die. And the exception was like this; >>>>>> Taskmanager1 log: >>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>> ActorSystem[flink] >>>>>> java.lang.NoClassDefFoundError: >>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>> at >>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>> at >>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>> at >>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>> at >>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> ... 22 more >>>>>> Taskmanager2 log: >>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>> ActorSystem[flink] >>>>>> Java.lang.NoClassDefFoundError: >>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> ... 18 more >>>>>> -Ebru >>>>> Hi Piotrek, >>>>> We attached the full log of the taskmanager1. >>>>> This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. >>>>> Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers. >>>>> -Ebru<taskmanager1.log.zip> >>> Hi, >>> Sorry we attached wrong log file. I've attached all task managers and job manager's log. All task managers and job manager was killed.<logs.zip> > > We were lost the std output files so we've reproduced the problem. I attached task managers and job manager log and also std output files. And after some time, it start using swap, the screenshot of http output is also attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png> |
On 2017-11-10 17:50, Piotr Nowojski wrote:
> I do not see anything abnormal in the logs before this error :( > > What are your JVM settings and which java version are you running? > What happens if you limit the heap size so that the swap is never > used? > > Piotrek > >> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >> <[hidden email]> wrote: >> >> On 2017-11-10 13:14, Piotr Nowojski wrote: >>> jobmanager1.log and taskmanager2.log are the same. Can you also >>> submit >>> files containing std output? >>> Piotrek >>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <[hidden email]> wrote: >>>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>>> Hi, >>>>> Thanks for the logs, however I do not see before mentioned >>>>> exceptions >>>>> in it. It ends with java.lang.InterruptedException >>>>> Is it the correct log file? Also, could you attach the std output >>>>> file >>>>> of the failing TaskManager? >>>>> Piotrek >>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>> <[hidden email]> wrote: >>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>>> Hi, >>>>>>> Could you attach full logs from those task managers? At first >>>>>>> glance I >>>>>>> don’t see a connection between those exceptions and any memory >>>>>>> issue >>>>>>> that you might had. It looks like a dependency issue in one >>>>>>> (some? >>>>>>> All?) of your jobs. >>>>>>> Did you build your jars with -Pbuild-jar profile as described >>>>>>> here: >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>>>> ? >>>>>>> If that doesn’t help. Can you binary search which job is causing >>>>>>> the >>>>>>> problem? There might be some Flink incompatibility between >>>>>>> different >>>>>>> versions and rebuilding a job’s jar with a version matching to >>>>>>> the >>>>>>> cluster version might help. >>>>>>> Piotrek >>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>>> <[hidden email]> wrote: >>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>>> Btw, Ebru: >>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On >>>>>>>> your >>>>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>>>> 596MB >>>>>>>> -> 602MB -> 597MB. >>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>>> Do you experience any problems, like Out Of Memory >>>>>>>> errors/crashes/long >>>>>>>> GC pauses? Or just JVM process is using more memory over time? >>>>>>>> You >>>>>>>> are >>>>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>>>> was >>>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>>> example >>>>>>>> JVM max heap size) is expected behaviour. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski >>>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>> I don’t know if this is relevant to this issue, but I was >>>>>>>> constantly getting failures trying to reproduce this leak using >>>>>>>> your >>>>>>>> Job, because you were using non deterministic getKey function: >>>>>>>> @Override >>>>>>>> public Integer getKey(Integer event) { >>>>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>>>> return randomGen.nextInt() % 8; >>>>>>>> } >>>>>>>> And quoting Java doc of KeySelector: >>>>>>>> "If invoked multiple times on the same object, the returned key >>>>>>>> must >>>>>>>> be the same.” >>>>>>>> I’m trying to reproduce this issue with following job: >>>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>>>> well just discarding incoming data. I’m cancelling the job every >>>>>>>> 5 >>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>>> stable, well below maximum java heap size. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>>>> wrote: >>>>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>>>> lot of time to fail. >>>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>>> <[hidden email]> wrote: >>>>>>>> Thanks for quick answer. >>>>>>>> So it will also fail after some time with `fromElements` source >>>>>>>> instead of Kafka, right? >>>>>>>> Did you try it also without a Kafka producer? >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>>>> wrote: >>>>>>>> Hi, >>>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>>> well with a small data set, using the fromElements source, but >>>>>>>> it >>>>>>>> will take some time to die. It's better with some data. >>>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>>> <[hidden email]> wrote: >>>>>>>> Hi, >>>>>>>> Thanks for sharing this job. >>>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>>> issue with your script? >>>>>>>>> Does this OOM issue also happen when you are not using the >>>>>>> Kafka source/sink? >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>>>>> wrote: >>>>>>>>> Hi, >>>>>>>>> This is the test flink job we created to trigger this leak >>>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>>> And this is the python script we are using to execute the job >>>>>>> thousands of times to get the OOM problem >>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>>> The cluster we used for this has this configuration: >>>>>>>>> Instance type: t2.large >>>>>>>>> Number of workers: 2 >>>>>>>>> HeapMemory: 5500 >>>>>>>>> Number of task slots per node: 4 >>>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>>> We have tried several things, increasing the heap, reducing the >>>>>>> heap, more memory fraction, changes this value in the >>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>>>> work. >>>>>>>>> Thanks for your help. >>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>> <[hidden email]> wrote: >>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>>> Hi Ebru and Javier, >>>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>>> Ebru: could you explain in a little more details how does >>>>>>> your Job(s) >>>>>>>> look like? Could you post some code? If you are just using >>>>>>> maps and >>>>>>>> filters there shouldn’t be any network transfers involved, >>>>>>> aside >>>>>>>> from Source and Sink functions. >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>>> <[hidden email]> wrote: >>>>>>>> Hi Javier, >>>>>>>> It would be helpful if you share your test job with us. >>>>>>>> Which configurations did you try? >>>>>>>> -Ebru >>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>> Hi, >>>>>>>> We have been facing a similar problem. We have tried some >>>>>>> different >>>>>>>> configurations, as proposed in other email thread by Flavio >>>>>>> and >>>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>>> the one >>>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>>> a >>>>>>>> memory threshold. We created a small test to remove all of >>>>>>> our >>>>>>>> dependencies and leave only flink native libraries. This >>>>>>> test reads >>>>>>>> data from a Kafka topic and writes it back to another topic >>>>>>> in >>>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>>> After >>>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>>> OS memory >>>>>>>> limit and dies. >>>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>>> slots >>>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>>> execute >>>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>>> you >>>>>>>> want, we can publish our test job. >>>>>>>> Regards, >>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>>> both >>>>>>>> recently worked on the network stack and might be most >>>>>>> familiar with >>>>>>>> this. >>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>> We also have the same problem in production. At the moment >>>>>>> the >>>>>>>> solution is to restart the entire Flink cluster after every >>>>>>> job.. >>>>>>>> We've tried to reproduce this problem with a test (see >>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>>> don't >>>>>>>> know whether the error produced by the test and the leak are >>>>>>>> correlated.. >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>>>> EBRU >>>>>>>> <[hidden email]> wrote: >>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>>> Do you use any windowing? If yes, could you please share >>>>>>> that code? >>>>>>>> If >>>>>>>> there is no stateful operation at all, it's strange where >>>>>>> the list >>>>>>>> state instances are coming from. >>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>> Hi Ufuk, >>>>>>>> We don’t explicitly define any state descriptor. We only >>>>>>> use map >>>>>>>> and filters >>>>>>>> operator. We thought that gc handle clearing the flink’s >>>>>>> internal >>>>>>>> states. >>>>>>>> So how can we manage the memory if it is always increasing? >>>>>>>> - Ebru >>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>>> job is >>>>>>>> running. >>>>>>>> This is expected (also in the case of multiple running >>>>>>> jobs). The >>>>>>>> screenshots are not helpful in that regard. :-( >>>>>>>> What kind of stateful operations are you using? Depending on >>>>>>> your >>>>>>>> use case, >>>>>>>> you have to manually call `clear()` on the state instance in >>>>>>> order >>>>>>>> to >>>>>>>> release the managed state. >>>>>>>> Best, >>>>>>>> Ufuk >>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>>> <[hidden email]> wrote: >>>>>>>> Begin forwarded message: >>>>>>>> From: ebru <[hidden email]> >>>>>>>> Subject: Re: Flink memory leak >>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>>> To: Ufuk Celebi <[hidden email]> >>>>>>>> Hi Ufuk, >>>>>>>> There are there snapshots of htop output. >>>>>>>> 1. snapshot is initial state. >>>>>>>> 2. snapshot is after submitted one job. >>>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>>> the >>>>>>>> memory >>>>>>>> usage is always increasing over time. >>>>>>>> <1.png><2.png><3.png> >>>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>>>> Hey Ebru, >>>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>>> causing >>>>>>>> this. >>>>>>>> Since multiple jobs are running, it will be hard to >>>>>>> understand to >>>>>>>> which job the state descriptors from the heap snapshot >>>>>>> belong to. >>>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>>> behaviour >>>>>>>> with only a single job? >>>>>>>> – Ufuk >>>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>>>> ÇETİNKAYA EBRU >>>>>>>> <[hidden email]> wrote: >>>>>>>> Hi, >>>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>>> manager and >>>>>>>> 3 task >>>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>>> have >>>>>>>> memory >>>>>>>> related problems. We use docker container to serve Flink >>>>>>> cluster. We >>>>>>>> have >>>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>>> Also the >>>>>>>> job >>>>>>>> count >>>>>>>> may be change over time. Taskmanager memory usage always >>>>>>> increases. >>>>>>>> After >>>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>>> tried to >>>>>>>> investigate the problem and we've got the task manager jvm >>>>>>> heap >>>>>>>> snapshot. >>>>>>>> According to the jam heap analysis, possible memory leak was >>>>>>> Flink >>>>>>>> list >>>>>>>> state descriptor. But we are not sure that is the cause of >>>>>>> our >>>>>>>> memory >>>>>>>> problem. How can we solve the problem? >>>>>>>> We have two types of Flink job. One has no state full >>>>>>> operator >>>>>>>> contains only maps and filters and the other has time window >>>>>>> with >>>>>>>> count trigger. >>>>>>>> * We've analysed the jvm heaps again in different >>>>>>> conditions. First >>>>>>>> we analysed the snapshot when no flink jobs running on >>>>>>> cluster. (image >>>>>>>> 1) >>>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>>> that has >>>>>>>> no state full operator is running. And according to the >>>>>>> results, leak >>>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>>> * Last analys, there were both two types of jobs running >>>>>>> and leak >>>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>>> we >>>>>>>> noticed that when job is submitted some amount of memory >>>>>>> allocated and >>>>>>>> after cancelation this allocated memory never freed. So over >>>>>>> time >>>>>>>> memory usage is always increasing and exceeded the limits. >>>>>>> Links: >>>>>>> ------ >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>>> Hi Piotr, >>>>>>> There are two types of jobs. >>>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>>> window operator. >>>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>>> elastic search sink and window operator for buffering. >>>>>>> Hi Piotrek, >>>>>>> Thanks for your reply. >>>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>>> cluster configuration is like this; >>>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>>> jobmanager.rpc.port: 6123 >>>>>>> jobmanager.heap.mb: 1536 >>>>>>> taskmanager.heap.mb: 1536 >>>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>>> taskmanager.memory.preallocate: false >>>>>>> parallelism.default: 1 >>>>>>> jobmanager.web.port: 8081 >>>>>>> state.backend: filesystem >>>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>>> We are using docker based Flink cluster. >>>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>>> became full. Memory usage were increasing by the time and one by >>>>>>> one >>>>>>> task managers start to die. And the exception was like this; >>>>>>> Taskmanager1 log: >>>>>>> Uncaught error from thread >>>>>>> [flink-akka.actor.default-dispatcher-17] >>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>>>> for >>>>>>> ActorSystem[flink] >>>>>>> java.lang.NoClassDefFoundError: >>>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>>> at >>>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>>> at >>>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>>> at >>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>>> at >>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>> at >>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>> at >>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> ... 22 more >>>>>>> Taskmanager2 log: >>>>>>> Uncaught error from thread >>>>>>> [flink-akka.actor.default-dispatcher-17] >>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled >>>>>>> for >>>>>>> ActorSystem[flink] >>>>>>> Java.lang.NoClassDefFoundError: >>>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>> at >>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>> at >>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>> at >>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> ... 18 more >>>>>>> -Ebru >>>>>> Hi Piotrek, >>>>>> We attached the full log of the taskmanager1. >>>>>> This may not be a dependency issue because until all of the task >>>>>> slots is full, we didn't get any No Class Def Found exception, >>>>>> when there is available memory jobs can run without exception for >>>>>> days. >>>>>> Also there is Kafka Instance Already Exist exception in full log, >>>>>> but this not relevant and doesn't effect jobs or task managers. >>>>>> -Ebru<taskmanager1.log.zip> >>>> Hi, >>>> Sorry we attached wrong log file. I've attached all task managers >>>> and job manager's log. All task managers and job manager was >>>> killed.<logs.zip> >> >> We were lost the std output files so we've reproduced the problem. I >> attached task managers and job manager log and also std output files. >> And after some time, it start using swap, the screenshot of http >> output is also >> attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png> -XX:+UseG1GC \ -XX:+UseStringDeduplication \ java version "1.8.0_131 How can we limit the heap size, we've already set job manager.heap.mb and task manager.heap.mb configs as 1536. Did you mean the limit the docker containers heap size? |
On 2017-11-10 18:01, ÇETİNKAYA EBRU ÇETİNKAYA EBRU wrote:
> On 2017-11-10 17:50, Piotr Nowojski wrote: >> I do not see anything abnormal in the logs before this error :( >> >> What are your JVM settings and which java version are you running? >> What happens if you limit the heap size so that the swap is never >> used? >> >> Piotrek >> >>> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <[hidden email]> wrote: >>> >>> On 2017-11-10 13:14, Piotr Nowojski wrote: >>>> jobmanager1.log and taskmanager2.log are the same. Can you also >>>> submit >>>> files containing std output? >>>> Piotrek >>>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <[hidden email]> wrote: >>>>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>>>> Hi, >>>>>> Thanks for the logs, however I do not see before mentioned >>>>>> exceptions >>>>>> in it. It ends with java.lang.InterruptedException >>>>>> Is it the correct log file? Also, could you attach the std output >>>>>> file >>>>>> of the failing TaskManager? >>>>>> Piotrek >>>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>> <[hidden email]> wrote: >>>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>>>> Hi, >>>>>>>> Could you attach full logs from those task managers? At first >>>>>>>> glance I >>>>>>>> don’t see a connection between those exceptions and any memory >>>>>>>> issue >>>>>>>> that you might had. It looks like a dependency issue in one >>>>>>>> (some? >>>>>>>> All?) of your jobs. >>>>>>>> Did you build your jars with -Pbuild-jar profile as described >>>>>>>> here: >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>>>>> ? >>>>>>>> If that doesn’t help. Can you binary search which job is causing >>>>>>>> the >>>>>>>> problem? There might be some Flink incompatibility between >>>>>>>> different >>>>>>>> versions and rebuilding a job’s jar with a version matching to >>>>>>>> the >>>>>>>> cluster version might help. >>>>>>>> Piotrek >>>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>>>> Btw, Ebru: >>>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On >>>>>>>>> your >>>>>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>>>>> 596MB >>>>>>>>> -> 602MB -> 597MB. >>>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>>>> Do you experience any problems, like Out Of Memory >>>>>>>>> errors/crashes/long >>>>>>>>> GC pauses? Or just JVM process is using more memory over time? >>>>>>>>> You >>>>>>>>> are >>>>>>>>> aware that JVM doesn’t like to release memory back to OS once >>>>>>>>> it >>>>>>>>> was >>>>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>>>> example >>>>>>>>> JVM max heap size) is expected behaviour. >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski >>>>>>>>> <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> I don’t know if this is relevant to this issue, but I was >>>>>>>>> constantly getting failures trying to reproduce this leak using >>>>>>>>> your >>>>>>>>> Job, because you were using non deterministic getKey function: >>>>>>>>> @Override >>>>>>>>> public Integer getKey(Integer event) { >>>>>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>>>>> return randomGen.nextInt() % 8; >>>>>>>>> } >>>>>>>>> And quoting Java doc of KeySelector: >>>>>>>>> "If invoked multiple times on the same object, the returned key >>>>>>>>> must >>>>>>>>> be the same.” >>>>>>>>> I’m trying to reproduce this issue with following job: >>>>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>>>> Where IntegerSource is just an infinite source, DisardingSink >>>>>>>>> is >>>>>>>>> well just discarding incoming data. I’m cancelling the job >>>>>>>>> every 5 >>>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>>>> stable, well below maximum java heap size. >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> Yes, I tested with just printing the stream. But it could take >>>>>>>>> a >>>>>>>>> lot of time to fail. >>>>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> Thanks for quick answer. >>>>>>>>> So it will also fail after some time with `fromElements` source >>>>>>>>> instead of Kafka, right? >>>>>>>>> Did you try it also without a Kafka producer? >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> Hi, >>>>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>>>> well with a small data set, using the fromElements source, but >>>>>>>>> it >>>>>>>>> will take some time to die. It's better with some data. >>>>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> Hi, >>>>>>>>> Thanks for sharing this job. >>>>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>>>> issue with your script? >>>>>>>>>> Does this OOM issue also happen when you are not using the >>>>>>>> Kafka source/sink? >>>>>>>>>> Piotrek >>>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez >>>>>>>>>> <[hidden email]> >>>>>>>> wrote: >>>>>>>>>> Hi, >>>>>>>>>> This is the test flink job we created to trigger this leak >>>>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>>>> And this is the python script we are using to execute the job >>>>>>>> thousands of times to get the OOM problem >>>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>>>> The cluster we used for this has this configuration: >>>>>>>>>> Instance type: t2.large >>>>>>>>>> Number of workers: 2 >>>>>>>>>> HeapMemory: 5500 >>>>>>>>>> Number of task slots per node: 4 >>>>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>>>> We have tried several things, increasing the heap, reducing >>>>>>>>>> the >>>>>>>> heap, more memory fraction, changes this value in the >>>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>>>>> work. >>>>>>>>>> Thanks for your help. >>>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>>> <[hidden email]> wrote: >>>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>>>> Hi Ebru and Javier, >>>>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>>>> Ebru: could you explain in a little more details how does >>>>>>>> your Job(s) >>>>>>>>> look like? Could you post some code? If you are just using >>>>>>>> maps and >>>>>>>>> filters there shouldn’t be any network transfers involved, >>>>>>>> aside >>>>>>>>> from Source and Sink functions. >>>>>>>>> Piotrek >>>>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>>>> <[hidden email]> wrote: >>>>>>>>> Hi Javier, >>>>>>>>> It would be helpful if you share your test job with us. >>>>>>>>> Which configurations did you try? >>>>>>>>> -Ebru >>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>>>> <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> Hi, >>>>>>>>> We have been facing a similar problem. We have tried some >>>>>>>> different >>>>>>>>> configurations, as proposed in other email thread by Flavio >>>>>>>> and >>>>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>>>> the one >>>>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>>>> a >>>>>>>>> memory threshold. We created a small test to remove all of >>>>>>>> our >>>>>>>>> dependencies and leave only flink native libraries. This >>>>>>>> test reads >>>>>>>>> data from a Kafka topic and writes it back to another topic >>>>>>>> in >>>>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>>>> After >>>>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>>>> OS memory >>>>>>>>> limit and dies. >>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>>>> slots >>>>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>>>> execute >>>>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>>>> you >>>>>>>>> want, we can publish our test job. >>>>>>>>> Regards, >>>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>>>> <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>>>> both >>>>>>>>> recently worked on the network stack and might be most >>>>>>>> familiar with >>>>>>>>> this. >>>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>>>> <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> We also have the same problem in production. At the moment >>>>>>>> the >>>>>>>>> solution is to restart the entire Flink cluster after every >>>>>>>> job.. >>>>>>>>> We've tried to reproduce this problem with a test (see >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>>>> don't >>>>>>>>> know whether the error produced by the test and the leak are >>>>>>>>> correlated.. >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>>>>> EBRU >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>>>> Do you use any windowing? If yes, could you please share >>>>>>>> that code? >>>>>>>>> If >>>>>>>>> there is no stateful operation at all, it's strange where >>>>>>>> the list >>>>>>>>> state instances are coming from. >>>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>>>> <[hidden email]> >>>>>>>>> wrote: >>>>>>>>> Hi Ufuk, >>>>>>>>> We don’t explicitly define any state descriptor. We only >>>>>>>> use map >>>>>>>>> and filters >>>>>>>>> operator. We thought that gc handle clearing the flink’s >>>>>>>> internal >>>>>>>>> states. >>>>>>>>> So how can we manage the memory if it is always increasing? >>>>>>>>> - Ebru >>>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>>>> job is >>>>>>>>> running. >>>>>>>>> This is expected (also in the case of multiple running >>>>>>>> jobs). The >>>>>>>>> screenshots are not helpful in that regard. :-( >>>>>>>>> What kind of stateful operations are you using? Depending on >>>>>>>> your >>>>>>>>> use case, >>>>>>>>> you have to manually call `clear()` on the state instance in >>>>>>>> order >>>>>>>>> to >>>>>>>>> release the managed state. >>>>>>>>> Best, >>>>>>>>> Ufuk >>>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> Begin forwarded message: >>>>>>>>> From: ebru <[hidden email]> >>>>>>>>> Subject: Re: Flink memory leak >>>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>>>> To: Ufuk Celebi <[hidden email]> >>>>>>>>> Hi Ufuk, >>>>>>>>> There are there snapshots of htop output. >>>>>>>>> 1. snapshot is initial state. >>>>>>>>> 2. snapshot is after submitted one job. >>>>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>>>> the >>>>>>>>> memory >>>>>>>>> usage is always increasing over time. >>>>>>>>> <1.png><2.png><3.png> >>>>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>>>>> Hey Ebru, >>>>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>>>> causing >>>>>>>>> this. >>>>>>>>> Since multiple jobs are running, it will be hard to >>>>>>>> understand to >>>>>>>>> which job the state descriptors from the heap snapshot >>>>>>>> belong to. >>>>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>>>> behaviour >>>>>>>>> with only a single job? >>>>>>>>> – Ufuk >>>>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>>>>> ÇETİNKAYA EBRU >>>>>>>>> <[hidden email]> wrote: >>>>>>>>> Hi, >>>>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>>>> manager and >>>>>>>>> 3 task >>>>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>>>> have >>>>>>>>> memory >>>>>>>>> related problems. We use docker container to serve Flink >>>>>>>> cluster. We >>>>>>>>> have >>>>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>>>> Also the >>>>>>>>> job >>>>>>>>> count >>>>>>>>> may be change over time. Taskmanager memory usage always >>>>>>>> increases. >>>>>>>>> After >>>>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>>>> tried to >>>>>>>>> investigate the problem and we've got the task manager jvm >>>>>>>> heap >>>>>>>>> snapshot. >>>>>>>>> According to the jam heap analysis, possible memory leak was >>>>>>>> Flink >>>>>>>>> list >>>>>>>>> state descriptor. But we are not sure that is the cause of >>>>>>>> our >>>>>>>>> memory >>>>>>>>> problem. How can we solve the problem? >>>>>>>>> We have two types of Flink job. One has no state full >>>>>>>> operator >>>>>>>>> contains only maps and filters and the other has time window >>>>>>>> with >>>>>>>>> count trigger. >>>>>>>>> * We've analysed the jvm heaps again in different >>>>>>>> conditions. First >>>>>>>>> we analysed the snapshot when no flink jobs running on >>>>>>>> cluster. (image >>>>>>>>> 1) >>>>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>>>> that has >>>>>>>>> no state full operator is running. And according to the >>>>>>>> results, leak >>>>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>>>> * Last analys, there were both two types of jobs running >>>>>>>> and leak >>>>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>>>> we >>>>>>>>> noticed that when job is submitted some amount of memory >>>>>>>> allocated and >>>>>>>>> after cancelation this allocated memory never freed. So over >>>>>>>> time >>>>>>>>> memory usage is always increasing and exceeded the limits. >>>>>>>> Links: >>>>>>>> ------ >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>>>> Hi Piotr, >>>>>>>> There are two types of jobs. >>>>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>>>> window operator. >>>>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>>>> elastic search sink and window operator for buffering. >>>>>>>> Hi Piotrek, >>>>>>>> Thanks for your reply. >>>>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>>>> cluster configuration is like this; >>>>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>>>> jobmanager.rpc.port: 6123 >>>>>>>> jobmanager.heap.mb: 1536 >>>>>>>> taskmanager.heap.mb: 1536 >>>>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>>>> taskmanager.memory.preallocate: false >>>>>>>> parallelism.default: 1 >>>>>>>> jobmanager.web.port: 8081 >>>>>>>> state.backend: filesystem >>>>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>>>> We are using docker based Flink cluster. >>>>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>>>> became full. Memory usage were increasing by the time and one by >>>>>>>> one >>>>>>>> task managers start to die. And the exception was like this; >>>>>>>> Taskmanager1 log: >>>>>>>> Uncaught error from thread >>>>>>>> [flink-akka.actor.default-dispatcher-17] >>>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is >>>>>>>> enabled for >>>>>>>> ActorSystem[flink] >>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>>>> at >>>>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>>>> at >>>>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>>>> at >>>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>>>> at >>>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>>> at >>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>>> at >>>>>>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>> at >>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>> ... 22 more >>>>>>>> Taskmanager2 log: >>>>>>>> Uncaught error from thread >>>>>>>> [flink-akka.actor.default-dispatcher-17] >>>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is >>>>>>>> enabled for >>>>>>>> ActorSystem[flink] >>>>>>>> Java.lang.NoClassDefFoundError: >>>>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>>> at >>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>>> at >>>>>>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>> at >>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>> ... 18 more >>>>>>>> -Ebru >>>>>>> Hi Piotrek, >>>>>>> We attached the full log of the taskmanager1. >>>>>>> This may not be a dependency issue because until all of the task >>>>>>> slots is full, we didn't get any No Class Def Found exception, >>>>>>> when there is available memory jobs can run without exception for >>>>>>> days. >>>>>>> Also there is Kafka Instance Already Exist exception in full log, >>>>>>> but this not relevant and doesn't effect jobs or task managers. >>>>>>> -Ebru<taskmanager1.log.zip> >>>>> Hi, >>>>> Sorry we attached wrong log file. I've attached all task managers >>>>> and job manager's log. All task managers and job manager was >>>>> killed.<logs.zip> >>> >>> We were lost the std output files so we've reproduced the problem. I >>> attached task managers and job manager log and also std output files. >>> And after some time, it start using swap, the screenshot of http >>> output is also >>> attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png> > We use this jvm options, > -XX:+UseG1GC \ > -XX:+UseStringDeduplication \ > java version "1.8.0_131 > How can we limit the heap size, we've already set job manager.heap.mb > and task manager.heap.mb configs as 1536. > Did you mean the limit the docker containers heap size? killed. I attached the production environment logs. Thanks for your help. production.zip (707K) Download Attachment |
I have a couple of concerns.
1. Your logs seems to be incomplete. There are for example missing at the beginning configuration output (see attached example log). Also output file seems strange to me (like duplicated log file). Please submit full logs. 2. If your heap size is 1.5GB, how is it possible that on your screenshot you are showing memory usage ~40GBs with some process using 10GB? Please analyse what is actually consuming all of that memory and ensure that your machine does not use swap. If the memory consumption comes from Flink, please check jvm's memory pools using jconsole, maybe something off heap is using the memory. Especially pay attention at your PermGen/Metaspace/Code pools, since they can cause class loading issues. 3. 1.5GB for heap is very low value. From your screenshots I assumed that you have set heap to some enormous value (htop screenshot showing ~40GB memory usage on the machine). It might be just too small value and you should increase it. Especially that you are trying to run multiple jobs at the same time with 300 task slots. But increase it only after you solve your issue of other things eating up your memory. Piotrek > On 10 Nov 2017, at 16:05, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: > > On 2017-11-10 18:01, ÇETİNKAYA EBRU ÇETİNKAYA EBRU wrote: >> On 2017-11-10 17:50, Piotr Nowojski wrote: >>> I do not see anything abnormal in the logs before this error :( >>> What are your JVM settings and which java version are you running? >>> What happens if you limit the heap size so that the swap is never >>> used? >>> Piotrek >>>> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>>> On 2017-11-10 13:14, Piotr Nowojski wrote: >>>>> jobmanager1.log and taskmanager2.log are the same. Can you also submit >>>>> files containing std output? >>>>> Piotrek >>>>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>>>>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>>>>> Hi, >>>>>>> Thanks for the logs, however I do not see before mentioned exceptions >>>>>>> in it. It ends with java.lang.InterruptedException >>>>>>> Is it the correct log file? Also, could you attach the std output file >>>>>>> of the failing TaskManager? >>>>>>> Piotrek >>>>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <[hidden email]> wrote: >>>>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>>>>> Hi, >>>>>>>>> Could you attach full logs from those task managers? At first glance I >>>>>>>>> don’t see a connection between those exceptions and any memory issue >>>>>>>>> that you might had. It looks like a dependency issue in one (some? >>>>>>>>> All?) of your jobs. >>>>>>>>> Did you build your jars with -Pbuild-jar profile as described here: >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>>>>>> ? >>>>>>>>> If that doesn’t help. Can you binary search which job is causing the >>>>>>>>> problem? There might be some Flink incompatibility between different >>>>>>>>> versions and rebuilding a job’s jar with a version matching to the >>>>>>>>> cluster version might help. >>>>>>>>> Piotrek >>>>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>>>>> Btw, Ebru: >>>>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>>>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>>>>>> 596MB >>>>>>>>>> -> 602MB -> 597MB. >>>>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>>>>> Do you experience any problems, like Out Of Memory >>>>>>>>>> errors/crashes/long >>>>>>>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>>>>>>> are >>>>>>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>>>>>> was >>>>>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>>>>> example >>>>>>>>>> JVM max heap size) is expected behaviour. >>>>>>>>>> Piotrek >>>>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> I don’t know if this is relevant to this issue, but I was >>>>>>>>>> constantly getting failures trying to reproduce this leak using your >>>>>>>>>> Job, because you were using non deterministic getKey function: >>>>>>>>>> @Override >>>>>>>>>> public Integer getKey(Integer event) { >>>>>>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>>>>>> return randomGen.nextInt() % 8; >>>>>>>>>> } >>>>>>>>>> And quoting Java doc of KeySelector: >>>>>>>>>> "If invoked multiple times on the same object, the returned key must >>>>>>>>>> be the same.” >>>>>>>>>> I’m trying to reproduce this issue with following job: >>>>>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>>>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>>>>> stable, well below maximum java heap size. >>>>>>>>>> Piotrek >>>>>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>>>>>> lot of time to fail. >>>>>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> Thanks for quick answer. >>>>>>>>>> So it will also fail after some time with `fromElements` source >>>>>>>>>> instead of Kafka, right? >>>>>>>>>> Did you try it also without a Kafka producer? >>>>>>>>>> Piotrek >>>>>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> Hi, >>>>>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>>>>> well with a small data set, using the fromElements source, but it >>>>>>>>>> will take some time to die. It's better with some data. >>>>>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> Hi, >>>>>>>>>> Thanks for sharing this job. >>>>>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>>>>> issue with your script? >>>>>>>>>>> Does this OOM issue also happen when you are not using the >>>>>>>>> Kafka source/sink? >>>>>>>>>>> Piotrek >>>>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <[hidden email]> >>>>>>>>> wrote: >>>>>>>>>>> Hi, >>>>>>>>>>> This is the test flink job we created to trigger this leak >>>>>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>>>>> And this is the python script we are using to execute the job >>>>>>>>> thousands of times to get the OOM problem >>>>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>>>>> The cluster we used for this has this configuration: >>>>>>>>>>> Instance type: t2.large >>>>>>>>>>> Number of workers: 2 >>>>>>>>>>> HeapMemory: 5500 >>>>>>>>>>> Number of task slots per node: 4 >>>>>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>>>>> We have tried several things, increasing the heap, reducing the >>>>>>>>> heap, more memory fraction, changes this value in the >>>>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>>>>>> work. >>>>>>>>>>> Thanks for your help. >>>>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>>>>> Hi Ebru and Javier, >>>>>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>>>>> Ebru: could you explain in a little more details how does >>>>>>>>> your Job(s) >>>>>>>>>> look like? Could you post some code? If you are just using >>>>>>>>> maps and >>>>>>>>>> filters there shouldn’t be any network transfers involved, >>>>>>>>> aside >>>>>>>>>> from Source and Sink functions. >>>>>>>>>> Piotrek >>>>>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> Hi Javier, >>>>>>>>>> It would be helpful if you share your test job with us. >>>>>>>>>> Which configurations did you try? >>>>>>>>>> -Ebru >>>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>>>>> <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> Hi, >>>>>>>>>> We have been facing a similar problem. We have tried some >>>>>>>>> different >>>>>>>>>> configurations, as proposed in other email thread by Flavio >>>>>>>>> and >>>>>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>>>>> the one >>>>>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>>>>> a >>>>>>>>>> memory threshold. We created a small test to remove all of >>>>>>>>> our >>>>>>>>>> dependencies and leave only flink native libraries. This >>>>>>>>> test reads >>>>>>>>>> data from a Kafka topic and writes it back to another topic >>>>>>>>> in >>>>>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>>>>> After >>>>>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>>>>> OS memory >>>>>>>>>> limit and dies. >>>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>>>>> slots >>>>>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>>>>> execute >>>>>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>>>>> you >>>>>>>>>> want, we can publish our test job. >>>>>>>>>> Regards, >>>>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>>>>> <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>>>>> both >>>>>>>>>> recently worked on the network stack and might be most >>>>>>>>> familiar with >>>>>>>>>> this. >>>>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>>>>> <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> We also have the same problem in production. At the moment >>>>>>>>> the >>>>>>>>>> solution is to restart the entire Flink cluster after every >>>>>>>>> job.. >>>>>>>>>> We've tried to reproduce this problem with a test (see >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>>>>> don't >>>>>>>>>> know whether the error produced by the test and the leak are >>>>>>>>>> correlated.. >>>>>>>>>> Best, >>>>>>>>>> Flavio >>>>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>>>>>> EBRU >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>>>>> Do you use any windowing? If yes, could you please share >>>>>>>>> that code? >>>>>>>>>> If >>>>>>>>>> there is no stateful operation at all, it's strange where >>>>>>>>> the list >>>>>>>>>> state instances are coming from. >>>>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>>>>> <[hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> Hi Ufuk, >>>>>>>>>> We don’t explicitly define any state descriptor. We only >>>>>>>>> use map >>>>>>>>>> and filters >>>>>>>>>> operator. We thought that gc handle clearing the flink’s >>>>>>>>> internal >>>>>>>>>> states. >>>>>>>>>> So how can we manage the memory if it is always increasing? >>>>>>>>>> - Ebru >>>>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <[hidden email]> wrote: >>>>>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>>>>> job is >>>>>>>>>> running. >>>>>>>>>> This is expected (also in the case of multiple running >>>>>>>>> jobs). The >>>>>>>>>> screenshots are not helpful in that regard. :-( >>>>>>>>>> What kind of stateful operations are you using? Depending on >>>>>>>>> your >>>>>>>>>> use case, >>>>>>>>>> you have to manually call `clear()` on the state instance in >>>>>>>>> order >>>>>>>>>> to >>>>>>>>>> release the managed state. >>>>>>>>>> Best, >>>>>>>>>> Ufuk >>>>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> Begin forwarded message: >>>>>>>>>> From: ebru <[hidden email]> >>>>>>>>>> Subject: Re: Flink memory leak >>>>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>>>>> To: Ufuk Celebi <[hidden email]> >>>>>>>>>> Hi Ufuk, >>>>>>>>>> There are there snapshots of htop output. >>>>>>>>>> 1. snapshot is initial state. >>>>>>>>>> 2. snapshot is after submitted one job. >>>>>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>>>>> the >>>>>>>>>> memory >>>>>>>>>> usage is always increasing over time. >>>>>>>>>> <1.png><2.png><3.png> >>>>>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <[hidden email]> wrote: >>>>>>>>>> Hey Ebru, >>>>>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>>>>> causing >>>>>>>>>> this. >>>>>>>>>> Since multiple jobs are running, it will be hard to >>>>>>>>> understand to >>>>>>>>>> which job the state descriptors from the heap snapshot >>>>>>>>> belong to. >>>>>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>>>>> behaviour >>>>>>>>>> with only a single job? >>>>>>>>>> – Ufuk >>>>>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>>>>>> ÇETİNKAYA EBRU >>>>>>>>>> <[hidden email]> wrote: >>>>>>>>>> Hi, >>>>>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>>>>> manager and >>>>>>>>>> 3 task >>>>>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>>>>> have >>>>>>>>>> memory >>>>>>>>>> related problems. We use docker container to serve Flink >>>>>>>>> cluster. We >>>>>>>>>> have >>>>>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>>>>> Also the >>>>>>>>>> job >>>>>>>>>> count >>>>>>>>>> may be change over time. Taskmanager memory usage always >>>>>>>>> increases. >>>>>>>>>> After >>>>>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>>>>> tried to >>>>>>>>>> investigate the problem and we've got the task manager jvm >>>>>>>>> heap >>>>>>>>>> snapshot. >>>>>>>>>> According to the jam heap analysis, possible memory leak was >>>>>>>>> Flink >>>>>>>>>> list >>>>>>>>>> state descriptor. But we are not sure that is the cause of >>>>>>>>> our >>>>>>>>>> memory >>>>>>>>>> problem. How can we solve the problem? >>>>>>>>>> We have two types of Flink job. One has no state full >>>>>>>>> operator >>>>>>>>>> contains only maps and filters and the other has time window >>>>>>>>> with >>>>>>>>>> count trigger. >>>>>>>>>> * We've analysed the jvm heaps again in different >>>>>>>>> conditions. First >>>>>>>>>> we analysed the snapshot when no flink jobs running on >>>>>>>>> cluster. (image >>>>>>>>>> 1) >>>>>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>>>>> that has >>>>>>>>>> no state full operator is running. And according to the >>>>>>>>> results, leak >>>>>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>>>>> * Last analys, there were both two types of jobs running >>>>>>>>> and leak >>>>>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>>>>> we >>>>>>>>>> noticed that when job is submitted some amount of memory >>>>>>>>> allocated and >>>>>>>>>> after cancelation this allocated memory never freed. So over >>>>>>>>> time >>>>>>>>>> memory usage is always increasing and exceeded the limits. >>>>>>>>> Links: >>>>>>>>> ------ >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>>>>> Hi Piotr, >>>>>>>>> There are two types of jobs. >>>>>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>>>>> window operator. >>>>>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>>>>> elastic search sink and window operator for buffering. >>>>>>>>> Hi Piotrek, >>>>>>>>> Thanks for your reply. >>>>>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>>>>> cluster configuration is like this; >>>>>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>>>>> jobmanager.rpc.port: 6123 >>>>>>>>> jobmanager.heap.mb: 1536 >>>>>>>>> taskmanager.heap.mb: 1536 >>>>>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>>>>> taskmanager.memory.preallocate: false >>>>>>>>> parallelism.default: 1 >>>>>>>>> jobmanager.web.port: 8081 >>>>>>>>> state.backend: filesystem >>>>>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>>>>> We are using docker based Flink cluster. >>>>>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>>>>> became full. Memory usage were increasing by the time and one by one >>>>>>>>> task managers start to die. And the exception was like this; >>>>>>>>> Taskmanager1 log: >>>>>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>>>>> ActorSystem[flink] >>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>>>>> at >>>>>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>>>>> at >>>>>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>>>>> at >>>>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>>>>> at >>>>>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>>>> at >>>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>>> at >>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>>> ... 22 more >>>>>>>>> Taskmanager2 log: >>>>>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>>>>> ActorSystem[flink] >>>>>>>>> Java.lang.NoClassDefFoundError: >>>>>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>>>>> at >>>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>>> at >>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>>> ... 18 more >>>>>>>>> -Ebru >>>>>>>> Hi Piotrek, >>>>>>>> We attached the full log of the taskmanager1. >>>>>>>> This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. >>>>>>>> Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers. >>>>>>>> -Ebru<taskmanager1.log.zip> >>>>>> Hi, >>>>>> Sorry we attached wrong log file. I've attached all task managers and job manager's log. All task managers and job manager was killed.<logs.zip> >>>> We were lost the std output files so we've reproduced the problem. I attached task managers and job manager log and also std output files. And after some time, it start using swap, the screenshot of http output is also attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png> >> We use this jvm options, >> -XX:+UseG1GC \ >> -XX:+UseStringDeduplication \ >> java version "1.8.0_131 >> How can we limit the heap size, we've already set job manager.heap.mb >> and task manager.heap.mb configs as 1536. >> Did you mean the limit the docker containers heap size? > Btw, we've faced the same problem at our production environment. When we checked the environment, all of the task managers and job manager was killed. I attached the production environment logs. > Thanks for your help.<production.zip> flink-pnowojski-taskmanager-9-piotr-mbp.log (31K) Download Attachment |
In reply to this post by ebru
Ebru, Javier, Flavio:
I tried to reproduce memory leak by submitting a job, that was generating classes with random names. And indeed I have found one. Memory was accumulating in `char[]` instances that belonged to `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing in size up to the point I got:
java.lang.OutOfMemoryError: Java heap space This seems like an old known “feature” of JDK: Can any of you confirm that this is the issue that you are experiencing? If not, I would really need more help/information from you to track this down. Piotrek
|
What should we do to confirm it? Do you have any github repo start from?
On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski <[hidden email]> wrote:
Flavio Pompermaier Development Department OKKAM S.r.l. Tel. <a href="tel:+39%200461%20041809" value="+390461041809" target="_blank">+(39) 0461 041809 |
Best would be to analyse memory usage via some profiler. What I have done was:
1. Run your scenario on the test cluster until memory consumption goes up 2. Stop submitting new jobs, cancel or running jobs 3. Manually triggered GC couple of times via jconsole (other tools can do that as well) 4. Analyse memory consumption via: A) Oracle’s Mission Control (Java Mission Control, jmc) - analyse memory consumption and check which memory pool is growing (OldGen heap? Metaspace? Code Cache? Non heap?) - run flight record with checked all memory options - check which objects were using a lot of memory B) VisualVM - take heap dump and analyse what is using up all of this memory C) jconsole - this can tell you memory pool status of you JVMs, but will not tell you what objects are actually exhausting the pools Couple of remarks: - because of GC memory usage can goes up and down. Important is the trend of local minimums measured just after manually triggered GC - you might have to repeat steps 2, 3, 4 to actually see what has increased between submitting the jobs - by default network buffers are using 10% of heap space in byte[], so you can ignore those - this JDK bug that I have reproduced was visible by huge memory consumption of multiple char[] and ConcurrentHashMap$Node instances Piotrek
|
Hi Piotrek,
We’ve analysed our task managers memory pool with VisualVm and Console, we attached the screenshot of results. Could you help us about evaluating the results? -Ebru
jvm.zip (8M) Download Attachment |
Thank you for those screenshots, they help a lot.
However I do not see any obvious candidate for a memory leak. There is a slight upward trend in "G1 Old Gen”, but this can be misleading. To further analyse what’s going you need to run your test case for a longer time. Also you will need to take two heap dumps to compare histograms from different time points to calculate “delta changes” between them. Please keep in mind that to avoid random noise it would be best to stop submitting jobs and manually perform GC (from jconsole) before collecting heap dump. Otherwise it will be “polluted” by non-leaked objects and you might need to collect more heap dumps to correctly spot where is the upward trend among all of the noise. Piotrek
|
Free forum by Nabble | Edit this page |