Hi All,
ISSUE ------ Flink application runs for sometime and suddenly the CPU shoots up and touches the peak, POD memory reaches to the peak, GC count increases, Old-gen spaces reach close to 100%. Full GC doesn't clean up heap space. At this point I stopped sending the data and cancelled the Flink Jobs. Still the Old-Gen space doesn't come down. I took a heap dump and can see that lot of Objects in the java.lang.Finalizer class. I have attached the details in a word document. I do have the heap dump but it is close to 2GB of compressed size. Is it safe to upload somewhere and share it here?. This issue doesn't happen in Flink: 1.4.0 and Beam: release-2.4.0 WORKING CLUSTER INFO (Flink: 1.4.0 and Beam: release-2.4.0) ---------------------------------------------------- Application reads from Kafka and does aggregations and writes into Kafka. Application has 5 minutes windows. Application uses Beam constructs to build the pipeline. To read and write we use Beam connectors. Flink version: 1.4.0 Beam version: release-2.4.0 Backend State: State backend is in the Heap and check pointing happening to the distributed File System. No of task Managers: 1 Heap: 6.4 GB CPU: 4 Cores Standalone cluster deployment on a Kubernetes pod NOT WORKING CLUSTER INFO (Flink version: 1.8.3 and Beam version: release-2.15.0) ---------- Application details are same as above No change in application and the rate at which data is injected. But change in Flink and Beam versions Flink version: 1.8.3 Beam version: release-2.15.0 Backend State: State backend is in the Heap and check pointing happening to the distributed File System. No of task Managers: 1 Heap: 6.5 GB CPU: 4 Cores Deployment: Standalone cluster deployment on a Kubernetes pod My Observations ------------- 1) CPU flame graph shows that in the working version, the cpu time on GC is lesser compared to non-working version (Please see the attached Flame Graph. CPU-flame-WORKING.svg for working cluster and CPU-flame-NOT-working.svg) 2) I have attached the flame graph for native memory MALLOC calls when the issue was happening. Please find the attached SVG image (malloc-NOT-working.svg). The POD memory peaks when this issue happens. For me, it looks like the GC process is requesting a lot of native memory. 3) When the issue is happening the GC cpu usage is very high. Please see the flame graph (CPU-graph-at-issuetime.svg) Note: SVG file can be opened using any browser and it is clickable while opened. -- Thanks Josson malloc-NOT-working.svg (134K) Download Attachment CPU-flame-NOT-working.svg (1M) Download Attachment CPU-graph-at-issuetime.svg (1M) Download Attachment CPU-flame-WORKING.svg (1M) Download Attachment memory-issue.docx (7M) Download Attachment |
Hi, Have you tried using a more recent Flink version? 1.8.x is no longer supported, and latest versions might not have this issue anymore. Secondly, have you tried backtracking those references to the Finalizers? Assuming that Finalizer is indeed the class causing problems. Also it may well be a non Flink issue [1]. Best regards, Piotrek czw., 3 wrz 2020 o 04:47 Josson Paul <[hidden email]> napisał(a):
|
1) We are in the process of migrating to Flink 1.11. But it is going to take a while before we can make everything work with the latest version. Meanwhile since this is happening in production I am trying to solve this. 2) Finalizae class is pointing to org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService . This class has a finalize method. I have attached spreadsheet (Object-explorer.csv) to give you a high level view 3) The difference between working cluster and NON working cluster is only on Beam and Flink. Hardware, Input message rate, Application jars, Kafka are all the same between those 2 clusters. Working cluster was with Flink 1.4 and Beam 2.4.0 Any insights into this will help me to debug further Thanks, Josson On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <[hidden email]> wrote:
Thanks Josson Object-explorer.csv (68K) Download Attachment |
Hi Josson, 2. Are you sure that all/vast majority of those objects are pointing towards SystemProcessingTimeService? And is this really the problem of those objects? Are they taking that much of the memory? 3. It still could be Kafka's problem, as it's likely that between 1.4 and 1.8.x we bumped Kafka dependencies. Frankly if that's not some other external dependency issue, I would expect that the problem might lie somewhere completely else. Flink's code relaying on the finalisation hasn't changed since 2015/2016. On the other hand there were quite a bit of changes between 1.4 and 1.8.x, some of them were affecting memory usage. Have you read release notes for versions 1.5, 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have memory related notes that could be addressed via configuration changes. Thanks, Piotrek czw., 3 wrz 2020 o 18:50 Josson Paul <[hidden email]> napisał(a):
|
Hi Piotr, 2) SystemProcessingTimeService holds the HeapKeyedStateBackend and HeapKeyedStateBackend has lot of Objects and that is filling the Heap 3) I am not using Flink Kafka Connector. But we are using Apache Beam kafka connector. There is a change in the Apache Beam version. But the kafka client we are using is the same as the one which was working in the other cluster where Flink was 1.4. There is no change in Hardware/Java/Kafka/Kafka Client/Application between the cluster which is working and not working I am aware of the memory changes and network buffer changes between 1.4 and 1.8. Flink 1.4 had network buffers on Heap and 1.8 network buffers are on the native memory. I modified the Flink 1.8 code to put it back to Heap memory but the issue didn't get resolved. Mine is a streaming job so we set 'taskmanager.memory.fraction' to very minimal and that heap is fully available for user data. Flink 1.4 was not using Credit based Flow control and Flink 1.8 uses Credit based Flow control. Our set up has only 1 task manager and 4 parallelisms. According to this video https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (16:21) if tasks are in same task manager, Flink doesn't use Credit Based Flow control. Essentially no change between Flink 1.4 and 1.8 in our set up. Still I tried to change the Credit Based Flow Control to False and test my setup. The problem persists. What I noticed in Flink 1.4 is that it doesn't read data from Kafka if there is not sufficient heap memory to process data. Somehow this is not happening in Flink 1.8 and it fills the heap soon enough not to get GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8. My understanding on back pressure is that it is not based on Heap memory but based on how fast the Network buffers are filled. Is this correct?. Does Flink use TCP connection to communicate between tasks if the tasks are in the same Task manager?. Thanks, josson On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <[hidden email]> wrote:
Thanks
Josson |
Hi Josson, Thanks for getting back. What are the JVM settings and in particular GC settings that you are using (G1GC?)? It could also be an issue that in 1.4 you were just slightly below the threshold of GC issues, while in 1.8, something is using a bit more memory, causing the GC issues to appear? Have you tried just increasing the heap size? Have you tried to compare on the job start up, what is the usage and size of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can point us in the right direction. > My understanding on back pressure is that it is not based on Heap memory but based on how fast the Network buffers are filled. Is this correct?. > Does Flink use TCP connection to communicate between tasks if the tasks are in the same Task manager?.No, local input channels are being used then, but memory for network buffers is assigned to tasks regardless of the fraction of local input channels in the task. However with just single taskmanager and parallelism of 4, the amount of the memory used by the network stack should be insignificant, at least as long as you have a reasonably sized job graph (32KB * (2 * parallelism + 7) * number of tasks). > What I noticed in Flink 1.4 is that it doesn't read data from Kafka if there is not sufficient heap memory to process data. Somehow this is not happening in Flink 1.8 and it fills the heap soon enough not to get GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8. No, there were no changes in this part as far as I remember. Tasks when producing records are serialising them and putting into the network buffers. If there are no available network buffers, the task is back pressuring and stops processing new records. Best regards, Piotrek wt., 8 wrz 2020 o 21:51 Josson Paul <[hidden email]> napisał(a):
|
Hi Piotr, JVM start up for Flink 1.4 ------------------------------- java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-taskmgr-assurance-1-77d44cf64-z8gd4.heapdump-Xmx6554m-Xms6554m-XX:MaxMetaspaceSize=512m-XX:+HeapDumpOnOutOfMemoryError-XX:+UseG1GC-XX:CICompilerCount=4-XX:MaxGCPauseMillis=1000-XX:+DisableExplicitGC-XX:ParallelGCThreads=4-Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9 %p-Dio.netty.eventLoopThreads=3-Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/log4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetworkaddress.cache.ttl=120-Dnum.cores=3-XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.parallelism=3-XX:ConcGCThreads=4 -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.dir=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.dir=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pipelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.maglev.MaglevServerstartmaglev
I tested this setup by reducing the JVM heap by 1GB. It still worked perfectly with some lags here and there. JVM start up for Flink 1.8 ------------------------------------ a) I started with the same configuration as above. Kubenetis POD went out of memory. At this point I realized that in Flink 1.8 network buffer pools are moved to native memory. Based on calculations it was coming to 200MB in native memory. I increased the overall POD memory to accommodate the buffer pool change keeping the heap the same. b) Even after I modified the overall POD memory, the POD still crashed. At this point I generated Flame graphs to identify the CPU/Malloc calls (Attached as part of the initial email). Realized that cpu usage of G1GC is significantly different from Flink 1.4. Now I made 2 changes
Above changes helped to hold the cluster a little longer. But it still crashed after sometime. c) Now I made the below changes.
This helped to avoid the POD going out of memory. But the Old Gen heap issue was very evident now (Please see the attached word document). d) Allocated additional heap memory of 700 MB along with the above changes. This also didn't help. It just prolonged the crash. Now I need help from others to which direction I want to take this to . My worry is even if I upgrade to flink 1.11 this issue might still persist. I have attached a screenshot from Heap dump to show you the difference between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is created. Not sure whether this change has something to do with this memory issue that I am facing. Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8 Thanks, Josson On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <[hidden email]> wrote:
Thanks
Josson |
Hi Josson, Thanks again for the detailed answer, and sorry that I can not help you with some immediate answer. I presume that jvm args for 1.8 are the same? Can you maybe post what exactly has crashed in your cases a) and b)? Re c), in the previously attached word document, it looks like Flink was running without problems for a couple of hours/minutes, everything was stable, no signs of growing memory consumption, impending problem, until around 23:15, when the problem started, right? Has something else happened at that time, something that could explain the spike? A checkpoint? Job crash/restart? Load spike? A couple of other random guesses: - have you been monitoring other memory pools for Flink 1.4 and 1.8? Like meta space? Growing meta space size can sometimes cause problems. It shouldn't be the case here, as you configured XX:MaxMetaspaceSize, but it might be still worth checking... - another random idea, have you tried upgrading JDK? Maybe that would solve the problem? Best regards, Piotrek śr., 9 wrz 2020 o 19:53 Josson Paul <[hidden email]> napisał(a):
|
What looks a bit strange to me is that with a running job, the
SystemProcessingTimeService should actually not be collected (since it is still in use)! My guess is that something is indeed happening during that time frame (maybe job restarts?) and I would propose to check your logs for anything suspicious in there. When I did experiments with Beam pipelines on our platform [1], I also noticed, that the standard fat jars that Beam creates include Flink runtime classes it shouldn't (at least if you are submitting to a separate Flink cluster). This can cause all sorts of problems and I would recommend removing those from the fat jar as documented in [1]. Nico [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099 On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote: > Hi Josson, > > Thanks again for the detailed answer, and sorry that I can not help you > with some immediate answer. I presume that jvm args for 1.8 are the same? > > Can you maybe post what exactly has crashed in your cases a) and b)? > Re c), in the previously attached word document, it looks like Flink was > running without problems for a couple of hours/minutes, everything was > stable, no signs of growing memory consumption, impending problem, until > around 23:15, when the problem started, right? Has something else happened > at that time, something that could explain the spike? A checkpoint? Job > crash/restart? Load spike? > > A couple of other random guesses: > - have you been monitoring other memory pools for Flink 1.4 and 1.8? Like > meta space? Growing meta space size can sometimes cause problems. It > shouldn't be the case here, as you configured XX:MaxMetaspaceSize, but it > might be still worth checking... > - another random idea, have you tried upgrading JDK? Maybe that would solve > the problem? > > Best regards, > Piotrek > > śr., 9 wrz 2020 o 19:53 Josson Paul <[hidden email]> napisał(a): > > Hi Piotr, > > > > *JVM start up for Flink 1.4* > > > > *-------------------------------* > > > > java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta > > skmgr-assurance-1-77d44cf64-z8gd4.heapdump- > > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m* > > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4 > > *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4* > > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9 > > %p*-Dio.netty.eventLoopThreads=3* > > -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo > > g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw > > orkaddress.cache.ttl=120-Dnum.cores=3- > > *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par > > allelism=3-XX:ConcGCThreads=4 * > > -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di > > r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di > > r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/ > > resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi > > pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m > > aglev.MaglevServerstartmaglev> > > 1. taskmanager.memory.fraction = 0.7f (This was coming to 4.5 GB. I > > didn't know at that time that we could set memory fraction to zero > > because > > ours is a streaming job. It was picking up the default ) > > 2. Network buffer pool memory was 646MB on the Heap (I think this > > was the default based on some calculations in the Flink 1.4) > > 3. G1GC region size was 4MB (Default) > > > > I tested this setup by reducing the JVM heap by *1GB.* It still worked > > perfectly with some lags here and there. > > > > *JVM start up for Flink 1.8* > > *------------------------------------* > > a) I started with the same configuration as above. Kubenetis POD went out > > of memory. At this point I realized that in Flink 1.8 network buffer > > pools > > are moved to native memory. Based on calculations it was coming to 200MB > > in > > native memory. I increased the overall POD memory to accommodate the > > buffer pool change keeping the *heap the same*. > > > > b) Even after I modified the overall POD memory, the POD still crashed. > > At this point I generated Flame graphs to identify the CPU/Malloc calls > > (Attached as part of the initial email). Realized that cpu usage of G1GC > > is > > significantly different from Flink 1.4. Now I made 2 changes > > > > 1. taskmanager.memory.fraction = 0.01f (This will give more heap for > > user code) > > 2. Increased cpu from 3 to 4 cores. > > > > Above changes helped to hold the cluster a little longer. But it > > > > still crashed after sometime. > > > > c) Now I made the below changes. > > > > 1. I came across this -> > > http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002 > > 622.html . Now I changed the G1GC region space to *8MB *instead of the > > default 4MB*.* > > 2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later experiments) > > 3. Played around with G1RSetSparseRegionEntries > > > > This helped to avoid the POD going out of memory. But the Old Gen > > > > heap issue was very evident now (Please see the attached word document). > > > > d) Allocated additional heap memory of *700 MB *along with the above > > > > changes. This also didn't help. It just prolonged the crash. Now I need > > help from others to which direction I want to take this to . > > > > My worry is even if I upgrade to flink 1.11 this issue might still > > persist. > > > > I have attached a screenshot from Heap dump to show you the difference > > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is created. Not > > sure whether this change has something to do with this memory issue that I > > am facing. > > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8 > > > > > > Thanks, > > Josson > > > > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <[hidden email]> > > > > wrote: > >> Hi Josson, > >> > >> Thanks for getting back. > >> > >> What are the JVM settings and in particular GC settings that you are > >> using (G1GC?)? > >> It could also be an issue that in 1.4 you were just slightly below the > >> threshold of GC issues, while in 1.8, something is using a bit more > >> memory, > >> causing the GC issues to appear? Have you tried just increasing the heap > >> size? > >> Have you tried to compare on the job start up, what is the usage and size > >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can point us in > >> the right direction. > >> > >> > My understanding on back pressure is that it is not based on Heap > >> > >> memory but based on how fast the Network buffers are filled. Is this > >> correct?. > >> > >> > Does Flink use TCP connection to communicate between tasks if the tasks > >> > >> are in the same Task manager?. > >> > >> No, local input channels are being used then, but memory for network > >> buffers is assigned to tasks regardless of the fraction of local input > >> channels in the task. However with just single taskmanager and > >> parallelism > >> of 4, the amount of the memory used by the network stack should be > >> insignificant, at least as long as you have a reasonably sized job graph > >> (32KB * (2 * parallelism + 7) * number of tasks). > >> > >> > What I noticed in Flink 1.4 is that it doesn't read data from Kafka if > >> > >> there is not sufficient heap memory to process data. Somehow this is not > >> happening in Flink 1.8 and it fills the heap soon enough not to get > >> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8. > >> > >> No, there were no changes in this part as far as I remember. Tasks when > >> producing records are serialising them and putting into the network > >> buffers. If there are no available network buffers, the task is back > >> pressuring and stops processing new records. > >> > >> Best regards, > >> Piotrek > >> > >> wt., 8 wrz 2020 o 21:51 Josson Paul <[hidden email]> napisał(a): > >>> Hi Piotr, > >>> > >>> 2) SystemProcessingTimeService holds the HeapKeyedStateBackend and > >>> > >>> HeapKeyedStateBackend has lot of Objects and that is filling the Heap > >>> > >>> 3) I am not using Flink Kafka Connector. But we are using Apache Beam > >>> > >>> kafka connector. There is a change in the Apache Beam version. But the > >>> kafka client we are using is the same as the one which was working in > >>> the > >>> other cluster where Flink was 1.4. > >>> > >>> *There is no change in Hardware/Java/Kafka/Kafka Client/Application > >>> > >>> between the cluster which is working and not working* > >>> > >>> I am aware of the memory changes and network buffer changes between 1.4 > >>> and 1.8. > >>> > >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers are on the > >>> native memory. I modified the Flink 1.8 code to put it back to Heap > >>> memory > >>> but the issue didn't get resolved. > >>> > >>> Mine is a streaming job so we set 'taskmanager.memory.fraction' to very > >>> minimal and that heap is fully available for user data. > >>> > >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8 uses > >>> Credit based Flow control. *Our set up has only 1 task manager and 4 > >>> parallelisms*. According to this video > >>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward ( > >>> *16:21*) if tasks are in same task manager, Flink doesn't use Credit > >>> Based Flow control. Essentially no change between Flink 1.4 and 1.8 in > >>> *our > >>> set up*. Still I tried to change the Credit Based Flow Control to False > >>> and test my setup. The problem persists. > >>> > >>> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if > >>> there is not sufficient heap memory to process data. Somehow this is not > >>> happening in Flink 1.8 and it fills the heap soon enough not to get > >>> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8. > >>> > >>> My understanding on back pressure is that it is not based on Heap memory > >>> but based on how fast the Network buffers are filled. Is this correct?. > >>> Does Flink use TCP connection to communicate between tasks if the tasks > >>> are in the same Task manager?. > >>> > >>> Thanks, > >>> josson > >>> > >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <[hidden email]> > >>> > >>> wrote: > >>>> Hi Josson, > >>>> > >>>> 2. Are you sure that all/vast majority of those objects are pointing > >>>> towards SystemProcessingTimeService? And is this really the problem of > >>>> those objects? Are they taking that much of the memory? > >>>> 3. It still could be Kafka's problem, as it's likely that between 1.4 > >>>> and 1.8.x we bumped Kafka dependencies. > >>>> > >>>> Frankly if that's not some other external dependency issue, I would > >>>> expect that the problem might lie somewhere completely else. Flink's > >>>> code > >>>> relaying on the finalisation hasn't changed since 2015/2016. On the > >>>> other > >>>> hand there were quite a bit of changes between 1.4 and 1.8.x, some of > >>>> them > >>>> were affecting memory usage. Have you read release notes for versions > >>>> 1.5, > >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have memory > >>>> related notes that could be addressed via configuration changes. > >>>> > >>>> Thanks, > >>>> Piotrek > >>>> > >>>> [1] > >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not > >>>> es/flink-1.5.html [2] > >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not > >>>> es/flink-1.8.html>>>> > >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <[hidden email]> napisał(a): > >>>>> 1) We are in the process of migrating to Flink 1.11. But it is going > >>>>> to take a while before we can make everything work with the latest > >>>>> version. > >>>>> Meanwhile since this is happening in production I am trying to solve > >>>>> this. > >>>>> 2) Finalizae class is pointing > >>>>> to > >>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService > >>>>> . > >>>>> This class has a finalize method. I have attached spreadsheet ( > >>>>> *Object-explorer.csv*) to give you a high level view > >>>>> 3) The difference between working cluster and NON working cluster is > >>>>> only on Beam and Flink. Hardware, Input message rate, Application > >>>>> jars, > >>>>> Kafka are all the same between those 2 clusters. Working cluster was > >>>>> with > >>>>> Flink 1.4 and Beam 2.4.0 > >>>>> > >>>>> Any insights into this will help me to debug further > >>>>> > >>>>> Thanks, > >>>>> Josson > >>>>> > >>>>> > >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <[hidden email]> > >>>>> > >>>>> wrote: > >>>>>> Hi, > >>>>>> > >>>>>> Have you tried using a more recent Flink version? 1.8.x is no longer > >>>>>> supported, and latest versions might not have this issue anymore. > >>>>>> > >>>>>> Secondly, have you tried backtracking those references to the > >>>>>> Finalizers? Assuming that Finalizer is indeed the class causing > >>>>>> problems. > >>>>>> > >>>>>> Also it may well be a non Flink issue [1]. > >>>>>> > >>>>>> Best regards, > >>>>>> Piotrek > >>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546 > >>>>>> > >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <[hidden email]> > >>>>>> > >>>>>> napisał(a): > >>>>>>> Hi All, > >>>>>>> > >>>>>>> *ISSUE* > >>>>>>> ------ > >>>>>>> Flink application runs for sometime and suddenly the CPU shoots up > >>>>>>> and touches the peak, POD memory reaches to the peak, GC count > >>>>>>> increases, > >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean up heap > >>>>>>> space. At > >>>>>>> this point I stopped sending the data and cancelled the Flink Jobs. > >>>>>>> Still > >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and can see > >>>>>>> that > >>>>>>> lot of Objects in the java.lang.Finalizer class. I have attached the > >>>>>>> details in a word document. I do have the heap dump but it is close > >>>>>>> to 2GB > >>>>>>> of compressed size. Is it safe to upload somewhere and share it > >>>>>>> here?. > >>>>>>> > >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: release-2.4.0 > >>>>>>> > >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: release-2.4.0) > >>>>>>> ---------------------------------------------------- > >>>>>>> > >>>>>>> Application reads from Kafka and does aggregations and writes into > >>>>>>> Kafka. Application has 5 minutes windows. Application uses Beam > >>>>>>> constructs > >>>>>>> to build the pipeline. To read and write we use Beam connectors. > >>>>>>> > >>>>>>> Flink version: 1.4.0 > >>>>>>> Beam version: release-2.4.0 > >>>>>>> Backend State: State backend is in the Heap and check pointing > >>>>>>> happening to the distributed File System. > >>>>>>> > >>>>>>> No of task Managers: 1 > >>>>>>> Heap: 6.4 GB > >>>>>>> CPU: 4 Cores > >>>>>>> Standalone cluster deployment on a Kubernetes pod > >>>>>>> > >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam version: > >>>>>>> release-2.15.0) > >>>>>>> ---------- > >>>>>>> Application details are same as above > >>>>>>> > >>>>>>> *No change in application and the rate at which data is injected. > >>>>>>> But change in Flink and Beam versions* > >>>>>>> > >>>>>>> > >>>>>>> Flink version: 1.8.3 > >>>>>>> Beam version: release-2.15.0 > >>>>>>> Backend State: State backend is in the Heap and check pointing > >>>>>>> happening to the distributed File System. > >>>>>>> > >>>>>>> No of task Managers: 1 > >>>>>>> Heap: 6.5 GB > >>>>>>> CPU: 4 Cores > >>>>>>> > >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes pod > >>>>>>> > >>>>>>> My Observations > >>>>>>> ------------- > >>>>>>> > >>>>>>> 1) CPU flame graph shows that in the working version, the cpu time > >>>>>>> on GC is lesser compared to non-working version (Please see the > >>>>>>> attached > >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and > >>>>>>> *CPU-flame-NOT-working.svg*) > >>>>>>> > >>>>>>> 2) I have attached the flame graph for native memory MALLOC calls > >>>>>>> when the issue was happening. Please find the attached SVG image ( > >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this issue > >>>>>>> happens. For me, it looks like the GC process is requesting a lot of > >>>>>>> native > >>>>>>> memory. > >>>>>>> > >>>>>>> 3) When the issue is happening the GC cpu usage is very high. Please > >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*) > >>>>>>> > >>>>>>> Note: SVG file can be opened using any browser and it is clickable > >>>>>>> while opened. > >>>>>>> -- > >>>>>>> Thanks > >>>>>>> Josson > >>>>> > >>>>> -- > >>>>> Thanks > >>>>> Josson > >>> > >>> -- > >>> Thanks > >>> Josson > > > > -- > > Thanks > > Josson |
I have attached two word documents. Flink1.4 and Flink1.8 I reduced the heap size in the cluster and tried the experiment in both Flink 1.4 and Flink 1.8. My goal was to simulate ingestion rate of 200 Clients/sec (Not going into the details here). In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1 hour. You can see the details in the attached Flink1.4 document file. You can see the GC activity and Cpu. Both are holding good. In Flin 1.8 I could reach only 160 Clients/Sec and the issue started happening. Issue started within 15 minutes of starting the ingestion. [hidden email] , you can see that there is no meta space related issue. All the GC related details are available in the doc. Especially see the difference in Heap dump of 'Biggest Objects' in both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem is expected?. @Nicko, We are not doing the fat jar stuff. [hidden email] , we are in the process of upgrading to Java 11 and Flink 1.11. But I need at least 2 months. I am not getting the Finalizer problem in the latest heap dump. Maybe it was happening only 1 or 2 times. Please let me know if you need additional input Thanks, Josson On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <[hidden email]> wrote: What looks a bit strange to me is that with a running job, the Thanks
Josson |
Hi Josson, Have you checked the logs as Nico suggested? At 18:55 there is a dip in non-heap memory, just about when the problems started happening. Maybe you could post the TM logs? Have you tried updating JVM to a newer version? Also it looks like the heap size is the same between 1.4 and 1.8, but in an earlier message you said you increased it by 700MB? Piotrek pt., 11 wrz 2020 o 05:07 Josson Paul <[hidden email]> napisał(a):
|
[hidden email] [hidden email] I have attached the Taskmanager/GC/thread dumps in a zip file. I don't see any issues in the TM logs. Tried to upgrade to Java 9. Flink is on top of another platform which threw errors while upgrading to Java 9. I can't do much for now. We will upgrade to Jdk 11 in another 2 months. Regarding the Heap size. The new experiment I did was on 4gb Heap on both Flink 1.4 and Flink 1.8. Questions I am trying to get answered are 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes out of memory whatever be the ingestion rate. our Windows are 5 minutes windows. 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or Full GC doesn't reclaim space. On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <[hidden email]> wrote:
Thanks
Josson logs.zip (1M) Download Attachment |
Hi Josson, The TM logs that you attached are only from a 5 minutes time period. Are you sure they are encompassing the period before the potential failure and after the potential failure? It would be also nice if you would provide the logs matching to the charts (like the one you were providing in the previous messages), to correlate events (spike in latency/GC with some timestamp from the logs). I was not asking necessarily to upgrade to Java9, but an updated/bug fixed version of Java8 [1]. > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes out of memory whatever be the ingestion rate. our Windows are 5 minutes windows. > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or Full GC doesn't reclaim space. In both cases there is the same mechanism for the backpressure. If a task's output runs out of buffers to put produced records, it will block the task. It can be that between 1.4 and 1.8, with credit based flow control changes, the amount of available buffers for the tasks on your setup has grown, so the tasks are backpressuring later. This in turn can sometimes mean that at any point of time there is more data buffered on the operator's state, like `WindowOperator`. I'm not sure what's the best/easiest way how to check this: 1. the amount of buffered data might be visible via metrics [2][3] 2. if you enable DEBUG logs, it should be visible via: > LOG.debug("Using a local buffer pool with {}-{} buffers", numberOfRequiredMemorySegments, maxNumberOfMemorySegments); entry logged by `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`. Piotrek pon., 14 wrz 2020 o 05:04 Josson Paul <[hidden email]> napisał(a):
|
[hidden email] [hidden email] An update. I am able to figure out the problem code. A change in the Apache Beam code is causing this problem.
Beam introduced a lock on the “emit” in Unbounded Source. The lock is on the Flink’s check point lock. Now the same lock is used by Flink’s timer service to emit the Watermarks. Flink’s timer service is starved to get hold of the lock and for some reason it never gets that lock. Aftereffect of this situation is that the ‘WaterMark’ is never emitted by Flink’s timer service. Because there is no Watermarks flowing through the system, Sliding Windows are never closed. Data gets accumulated in the Window.
This problem occurs only if we have external lookup calls (like Redis) happen before the data goes to Sliding Window. Something like below.
KafkaSource à Transforms (Occasional Redis lookup)->SlidingWindow->Transforms->Kafka Sink
https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 . This is Beam 2.4 and you can see that there is no synchronized block at line 257 and 270.
https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 . This is Beam 2.15. See the synchronized block introduced in line 264 and 280. We are using Beam 2.15 and Flink 1.8.
Beam introduced this synchronized block because of this bug. https://issues.apache.org/jira/browse/BEAM-3087
After I removed that synchronized keyword everything started working fine in my application.
What do you guys think about this?. Why does Beam need a Synchronized block there?
Beam is using this lock -> Thanks, Josson On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <[hidden email]> wrote:
Thanks
Josson |
Hi Josson, Thanks for great investigation and coming back to use. Aljoscha, could you help us here? It looks like you were involved in this original BEAM-3087 issue. Best, Piotrek pt., 23 paź 2020 o 07:36 Josson Paul <[hidden email]> napisał(a):
|
Hi,
nice work on debugging this! We need the synchronized block in the source because the call to reader.advance() (via the invoker) and reader.getCurrent() (via emitElement()) must be atomic with respect to state. We cannot advance the reader state, not emit that record but still checkpoint the new reader state. The monitor ensures that no checkpoint can happen in between those to calls. The basic problem is now that we starve checkpointing because the monitor/lock is not fair. This could be solved by using a fair lock but that would require Flink proper to be changed to use a fair lock instead of a monitor/synchronized. I don't see this as an immediate solution. One thing that exacerbates this problem is that too many things are happening "under" the synchronized block. All the transforms before a shuffle/rebalance/keyBy are chained to the source, which means that they are invoked from the emitElement() call. You could see this by printing/logging a stacktrace in your user function that does the Redis lookups. A possible mitigation would be to disable chaining globally by inserting a `flinkStreamEnv.disableOperatorChaining()` in [1]. A more surgical version would be to only disable chaining for sources. I'm attaching a patch for that in case you're willing to try it out. This is for latest master but it's easy enough to apply manually. Best, Aljoscha [1] https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225 On 23.10.20 09:47, Piotr Nowojski wrote: > Hi Josson, > > Thanks for great investigation and coming back to use. Aljoscha, could you > help us here? It looks like you were involved in this original BEAM-3087 > issue. > > Best, > Piotrek > > pt., 23 paź 2020 o 07:36 Josson Paul <[hidden email]> napisał(a): > >> @Piotr Nowojski <[hidden email]> @Nico Kruber <[hidden email]> >> >> An update. >> >> I am able to figure out the problem code. A change in the Apache Beam code >> is causing this problem. >> >> >> >> >> >> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on >> the Flink’s check point lock. Now the same lock is used by Flink’s timer >> service to emit the Watermarks. Flink’s timer service is starved to get >> hold of the lock and for some reason it never gets that lock. Aftereffect >> of this situation is that the ‘WaterMark’ is never emitted by Flink’s >> timer service. Because there is no Watermarks flowing through the system, >> Sliding Windows are never closed. Data gets accumulated in the Window. >> >> >> >> This problem occurs only if we have external lookup calls (like Redis) >> happen before the data goes to Sliding Window. Something like below. >> >> >> >> KafkaSource à Transforms (Occasional Redis >> lookup)->SlidingWindow->Transforms->Kafka Sink >> >> >> >> >> >> >> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 >> . This is Beam 2.4 and you can see that there is no synchronized block at >> line 257 and 270. >> >> >> >> >> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 >> . This is Beam 2.15. See the synchronized block introduced in line 264 and >> 280. We are using Beam 2.15 and Flink 1.8. >> >> >> >> Beam introduced this synchronized block because of this bug. >> https://issues.apache.org/jira/browse/BEAM-3087 >> >> >> >> After I removed that synchronized keyword everything started working fine >> in my application. >> >> >> >> What do you guys think about this?. Why does Beam need a Synchronized >> block there? >> >> >> >> Beam is using this lock -> >> >> >> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282 >> >> >> >> Thanks, >> >> Josson >> >> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <[hidden email]> >> wrote: >> >>> Hi Josson, >>> >>> The TM logs that you attached are only from a 5 minutes time period. Are >>> you sure they are encompassing the period before the potential failure and >>> after the potential failure? It would be also nice if you would provide the >>> logs matching to the charts (like the one you were providing in the >>> previous messages), to correlate events (spike in latency/GC with some >>> timestamp from the logs). >>> >>> I was not asking necessarily to upgrade to Java9, but an updated/bug >>> fixed version of Java8 [1]. >>> >>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never >>> goes out of memory whatever be the ingestion rate. our Windows are 5 >>> minutes windows. >>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or >>> Full GC doesn't reclaim space. >>> >>> In both cases there is the same mechanism for the backpressure. If a >>> task's output runs out of buffers to put produced records, it will block >>> the task. It can be that between 1.4 and 1.8, with credit based flow >>> control changes, the amount of available buffers for the tasks on your >>> setup has grown, so the tasks are backpressuring later. This in turn can >>> sometimes mean that at any point of time there is more data buffered on the >>> operator's state, like `WindowOperator`. I'm not sure what's the >>> best/easiest way how to check this: >>> >>> 1. the amount of buffered data might be visible via metrics [2][3] >>> 2. if you enable DEBUG logs, it should be visible via: >>> >>>> LOG.debug("Using a local buffer pool with {}-{} buffers", >>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments); >>> >>> entry logged by >>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`. >>> >>> Piotrek >>> >>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network >>> >>> pon., 14 wrz 2020 o 05:04 Josson Paul <[hidden email]> napisał(a): >>> >>>> @Piotr Nowojski <[hidden email]> @Nico Kruber <[hidden email]> >>>> I have attached the Taskmanager/GC/thread dumps in a zip file. >>>> >>>> I don't see any issues in the TM logs. >>>> Tried to upgrade to Java 9. Flink is on top of another platform which >>>> threw errors while upgrading to Java 9. I can't do much for now. We will >>>> upgrade to Jdk 11 in another 2 months. >>>> >>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on >>>> both Flink 1.4 and Flink 1.8. >>>> >>>> Questions I am trying to get answered are >>>> >>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes >>>> out of memory whatever be the ingestion rate. our Windows are 5 >>>> minutes windows. >>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or >>>> Full GC doesn't reclaim space. >>>> >>>> >>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <[hidden email]> >>>> wrote: >>>> >>>>> Hi Josson, >>>>> >>>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in >>>>> non-heap memory, just about when the problems started happening. Maybe you >>>>> could post the TM logs? >>>>> Have you tried updating JVM to a newer version? >>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but >>>>> in an earlier message you said you increased it by 700MB? >>>>> >>>>> Piotrek >>>>> >>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <[hidden email]> napisał(a): >>>>> >>>>>> I have attached two word documents. >>>>>> Flink1.4 and Flink1.8 >>>>>> I reduced the heap size in the cluster and tried the experiment in >>>>>> both Flink 1.4 and Flink 1.8. >>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going >>>>>> into the details here). >>>>>> >>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1 >>>>>> hour. You can see the details in the attached Flink1.4 document file. You >>>>>> can see the GC activity and Cpu. Both are holding good. >>>>>> >>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started >>>>>> happening. Issue started within 15 minutes of starting the ingestion. @Piotr >>>>>> Nowojski <[hidden email]> , you can see that there is no meta >>>>>> space related issue. All the GC related details are available in the doc. >>>>>> >>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in >>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because >>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem >>>>>> is expected?. >>>>>> >>>>>> @Nicko, We are not doing the fat jar stuff. >>>>>> >>>>>> @Piotr Nowojski <[hidden email]> , we are in the process of >>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months. >>>>>> >>>>>> >>>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe >>>>>> it was happening only 1 or 2 times. >>>>>> >>>>>> Please let me know if you need additional input >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Josson >>>>>> >>>>>> >>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <[hidden email]> >>>>>> wrote: >>>>>> >>>>>>> What looks a bit strange to me is that with a running job, the >>>>>>> SystemProcessingTimeService should actually not be collected (since >>>>>>> it is >>>>>>> still in use)! >>>>>>> >>>>>>> My guess is that something is indeed happening during that time frame >>>>>>> (maybe >>>>>>> job restarts?) and I would propose to check your logs for anything >>>>>>> suspicious >>>>>>> in there. >>>>>>> >>>>>>> >>>>>>> When I did experiments with Beam pipelines on our platform [1], I >>>>>>> also >>>>>>> noticed, that the standard fat jars that Beam creates include Flink >>>>>>> runtime >>>>>>> classes it shouldn't (at least if you are submitting to a separate >>>>>>> Flink >>>>>>> cluster). This can cause all sorts of problems and I would recommend >>>>>>> removing >>>>>>> those from the fat jar as documented in [1]. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Nico >>>>>>> >>>>>>> >>>>>>> >>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099 >>>>>>> >>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote: >>>>>>>> Hi Josson, >>>>>>>> >>>>>>>> Thanks again for the detailed answer, and sorry that I can not help >>>>>>> you >>>>>>>> with some immediate answer. I presume that jvm args for 1.8 are the >>>>>>> same? >>>>>>>> >>>>>>>> Can you maybe post what exactly has crashed in your cases a) and b)? >>>>>>>> Re c), in the previously attached word document, it looks like >>>>>>> Flink was >>>>>>>> running without problems for a couple of hours/minutes, everything >>>>>>> was >>>>>>>> stable, no signs of growing memory consumption, impending problem, >>>>>>> until >>>>>>>> around 23:15, when the problem started, right? Has something else >>>>>>> happened >>>>>>>> at that time, something that could explain the spike? A checkpoint? >>>>>>> Job >>>>>>>> crash/restart? Load spike? >>>>>>>> >>>>>>>> A couple of other random guesses: >>>>>>>> - have you been monitoring other memory pools for Flink 1.4 and >>>>>>> 1.8? Like >>>>>>>> meta space? Growing meta space size can sometimes cause problems. It >>>>>>>> shouldn't be the case here, as you configured XX:MaxMetaspaceSize, >>>>>>> but it >>>>>>>> might be still worth checking... >>>>>>>> - another random idea, have you tried upgrading JDK? Maybe that >>>>>>> would solve >>>>>>>> the problem? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Piotrek >>>>>>>> >>>>>>>> śr., 9 wrz 2020 o 19:53 Josson Paul <[hidden email]> >>>>>>> napisał(a): >>>>>>>>> Hi Piotr, >>>>>>>>> >>>>>>>>> *JVM start up for Flink 1.4* >>>>>>>>> >>>>>>>>> *-------------------------------* >>>>>>>>> >>>>>>>>> >>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta >>>>>>>>> skmgr-assurance-1-77d44cf64-z8gd4.heapdump- >>>>>>>>> *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m* >>>>>>>>> -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4 >>>>>>>>> >>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4* >>>>>>>>> -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9 >>>>>>>>> %p*-Dio.netty.eventLoopThreads=3* >>>>>>>>> >>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo >>>>>>>>> >>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw >>>>>>>>> orkaddress.cache.ttl=120-Dnum.cores=3- >>>>>>>>> >>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par >>>>>>>>> allelism=3-XX:ConcGCThreads=4 * >>>>>>>>> >>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di >>>>>>>>> >>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di >>>>>>>>> >>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/ >>>>>>>>> >>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi >>>>>>>>> >>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m >>>>>>>>> aglev.MaglevServerstartmaglev> >>>>>>>>> 1. taskmanager.memory.fraction = 0.7f (This was coming to >>>>>>> 4.5 GB. I >>>>>>>>> didn't know at that time that we could set memory fraction to >>>>>>> zero >>>>>>>>> because >>>>>>>>> ours is a streaming job. It was picking up the default ) >>>>>>>>> 2. Network buffer pool memory was 646MB on the Heap (I >>>>>>> think this >>>>>>>>> was the default based on some calculations in the Flink 1.4) >>>>>>>>> 3. G1GC region size was 4MB (Default) >>>>>>>>> >>>>>>>>> I tested this setup by reducing the JVM heap by *1GB.* It still >>>>>>> worked >>>>>>>>> perfectly with some lags here and there. >>>>>>>>> >>>>>>>>> *JVM start up for Flink 1.8* >>>>>>>>> *------------------------------------* >>>>>>>>> a) I started with the same configuration as above. Kubenetis POD >>>>>>> went out >>>>>>>>> of memory. At this point I realized that in Flink 1.8 network >>>>>>> buffer >>>>>>>>> pools >>>>>>>>> are moved to native memory. Based on calculations it was coming >>>>>>> to 200MB >>>>>>>>> in >>>>>>>>> native memory. I increased the overall POD memory to accommodate >>>>>>> the >>>>>>>>> buffer pool change keeping the *heap the same*. >>>>>>>>> >>>>>>>>> b) Even after I modified the overall POD memory, the POD still >>>>>>> crashed. >>>>>>>>> At this point I generated Flame graphs to identify the CPU/Malloc >>>>>>> calls >>>>>>>>> (Attached as part of the initial email). Realized that cpu usage >>>>>>> of G1GC >>>>>>>>> is >>>>>>>>> significantly different from Flink 1.4. Now I made 2 changes >>>>>>>>> >>>>>>>>> 1. taskmanager.memory.fraction = 0.01f (This will give more >>>>>>> heap for >>>>>>>>> user code) >>>>>>>>> 2. Increased cpu from 3 to 4 cores. >>>>>>>>> >>>>>>>>> Above changes helped to hold the cluster a little longer. >>>>>>> But it >>>>>>>>> >>>>>>>>> still crashed after sometime. >>>>>>>>> >>>>>>>>> c) Now I made the below changes. >>>>>>>>> >>>>>>>>> 1. I came across this -> >>>>>>>>> >>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002 >>>>>>>>> 622.html . Now I changed the G1GC region space to *8MB >>>>>>> *instead of the >>>>>>>>> default 4MB*.* >>>>>>>>> 2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later >>>>>>> experiments) >>>>>>>>> 3. Played around with G1RSetSparseRegionEntries >>>>>>>>> >>>>>>>>> This helped to avoid the POD going out of memory. But the >>>>>>> Old Gen >>>>>>>>> >>>>>>>>> heap issue was very evident now (Please see the attached word >>>>>>> document). >>>>>>>>> >>>>>>>>> d) Allocated additional heap memory of *700 MB *along with the >>>>>>> above >>>>>>>>> >>>>>>>>> changes. This also didn't help. It just prolonged the crash. Now >>>>>>> I need >>>>>>>>> help from others to which direction I want to take this to . >>>>>>>>> >>>>>>>>> My worry is even if I upgrade to flink 1.11 this issue might still >>>>>>>>> persist. >>>>>>>>> >>>>>>>>> I have attached a screenshot from Heap dump to show you the >>>>>>> difference >>>>>>>>> between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is >>>>>>> created. Not >>>>>>>>> sure whether this change has something to do with this memory >>>>>>> issue that I >>>>>>>>> am facing. >>>>>>>>> Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8 >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Josson >>>>>>>>> >>>>>>>>> On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski < >>>>>>> [hidden email]> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>>> Hi Josson, >>>>>>>>>> >>>>>>>>>> Thanks for getting back. >>>>>>>>>> >>>>>>>>>> What are the JVM settings and in particular GC settings that you >>>>>>> are >>>>>>>>>> using (G1GC?)? >>>>>>>>>> It could also be an issue that in 1.4 you were just slightly >>>>>>> below the >>>>>>>>>> threshold of GC issues, while in 1.8, something is using a bit >>>>>>> more >>>>>>>>>> memory, >>>>>>>>>> causing the GC issues to appear? Have you tried just increasing >>>>>>> the heap >>>>>>>>>> size? >>>>>>>>>> Have you tried to compare on the job start up, what is the usage >>>>>>> and size >>>>>>>>>> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can >>>>>>> point us in >>>>>>>>>> the right direction. >>>>>>>>>> >>>>>>>>>>> My understanding on back pressure is that it is not based on >>>>>>> Heap >>>>>>>>>> >>>>>>>>>> memory but based on how fast the Network buffers are filled. Is >>>>>>> this >>>>>>>>>> correct?. >>>>>>>>>> >>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if >>>>>>> the tasks >>>>>>>>>> >>>>>>>>>> are in the same Task manager?. >>>>>>>>>> >>>>>>>>>> No, local input channels are being used then, but memory for >>>>>>> network >>>>>>>>>> buffers is assigned to tasks regardless of the fraction of local >>>>>>> input >>>>>>>>>> channels in the task. However with just single taskmanager and >>>>>>>>>> parallelism >>>>>>>>>> of 4, the amount of the memory used by the network stack should >>>>>>> be >>>>>>>>>> insignificant, at least as long as you have a reasonably sized >>>>>>> job graph >>>>>>>>>> (32KB * (2 * parallelism + 7) * number of tasks). >>>>>>>>>> >>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from >>>>>>> Kafka if >>>>>>>>>> >>>>>>>>>> there is not sufficient heap memory to process data. Somehow >>>>>>> this is not >>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>>> get >>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>>> Flink 1.8. >>>>>>>>>> >>>>>>>>>> No, there were no changes in this part as far as I remember. >>>>>>> Tasks when >>>>>>>>>> producing records are serialising them and putting into the >>>>>>> network >>>>>>>>>> buffers. If there are no available network buffers, the task is >>>>>>> back >>>>>>>>>> pressuring and stops processing new records. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>> wt., 8 wrz 2020 o 21:51 Josson Paul <[hidden email]> >>>>>>> napisał(a): >>>>>>>>>>> Hi Piotr, >>>>>>>>>>> >>>>>>>>>>> 2) SystemProcessingTimeService holds the >>>>>>> HeapKeyedStateBackend and >>>>>>>>>>> >>>>>>>>>>> HeapKeyedStateBackend has lot of Objects and that is filling >>>>>>> the Heap >>>>>>>>>>> >>>>>>>>>>> 3) I am not using Flink Kafka Connector. But we are using >>>>>>> Apache Beam >>>>>>>>>>> >>>>>>>>>>> kafka connector. There is a change in the Apache Beam version. >>>>>>> But the >>>>>>>>>>> kafka client we are using is the same as the one which was >>>>>>> working in >>>>>>>>>>> the >>>>>>>>>>> other cluster where Flink was 1.4. >>>>>>>>>>> >>>>>>>>>>> *There is no change in Hardware/Java/Kafka/Kafka >>>>>>> Client/Application >>>>>>>>>>> >>>>>>>>>>> between the cluster which is working and not working* >>>>>>>>>>> >>>>>>>>>>> I am aware of the memory changes and network buffer changes >>>>>>> between 1.4 >>>>>>>>>>> and 1.8. >>>>>>>>>>> >>>>>>>>>>> Flink 1.4 had network buffers on Heap and 1.8 network buffers >>>>>>> are on the >>>>>>>>>>> native memory. I modified the Flink 1.8 code to put it back to >>>>>>> Heap >>>>>>>>>>> memory >>>>>>>>>>> but the issue didn't get resolved. >>>>>>>>>>> >>>>>>>>>>> Mine is a streaming job so we set 'taskmanager.memory.fraction' >>>>>>> to very >>>>>>>>>>> minimal and that heap is fully available for user data. >>>>>>>>>>> >>>>>>>>>>> Flink 1.4 was not using Credit based Flow control and Flink 1.8 >>>>>>> uses >>>>>>>>>>> Credit based Flow control. *Our set up has only 1 task manager >>>>>>> and 4 >>>>>>>>>>> parallelisms*. According to this video >>>>>>>>>>> >>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward ( >>>>>>>>>>> *16:21*) if tasks are in same task manager, Flink doesn't use >>>>>>> Credit >>>>>>>>>>> Based Flow control. Essentially no change between Flink 1.4 and >>>>>>> 1.8 in >>>>>>>>>>> *our >>>>>>>>>>> set up*. Still I tried to change the Credit Based Flow Control >>>>>>> to False >>>>>>>>>>> and test my setup. The problem persists. >>>>>>>>>>> >>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from >>>>>>> Kafka if >>>>>>>>>>> there is not sufficient heap memory to process data. Somehow >>>>>>> this is not >>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>>> get >>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>>> Flink 1.8. >>>>>>>>>>> >>>>>>>>>>> My understanding on back pressure is that it is not based on >>>>>>> Heap memory >>>>>>>>>>> but based on how fast the Network buffers are filled. Is this >>>>>>> correct?. >>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if >>>>>>> the tasks >>>>>>>>>>> are in the same Task manager?. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> josson >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski < >>>>>>> [hidden email]> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> Hi Josson, >>>>>>>>>>>> >>>>>>>>>>>> 2. Are you sure that all/vast majority of those objects are >>>>>>> pointing >>>>>>>>>>>> towards SystemProcessingTimeService? And is this really the >>>>>>> problem of >>>>>>>>>>>> those objects? Are they taking that much of the memory? >>>>>>>>>>>> 3. It still could be Kafka's problem, as it's likely that >>>>>>> between 1.4 >>>>>>>>>>>> and 1.8.x we bumped Kafka dependencies. >>>>>>>>>>>> >>>>>>>>>>>> Frankly if that's not some other external dependency issue, I >>>>>>> would >>>>>>>>>>>> expect that the problem might lie somewhere completely else. >>>>>>> Flink's >>>>>>>>>>>> code >>>>>>>>>>>> relaying on the finalisation hasn't changed since 2015/2016. >>>>>>> On the >>>>>>>>>>>> other >>>>>>>>>>>> hand there were quite a bit of changes between 1.4 and 1.8.x, >>>>>>> some of >>>>>>>>>>>> them >>>>>>>>>>>> were affecting memory usage. Have you read release notes for >>>>>>> versions >>>>>>>>>>>> 1.5, >>>>>>>>>>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have >>>>>>> memory >>>>>>>>>>>> related notes that could be addressed via configuration >>>>>>> changes. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Piotrek >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not >>>>>>>>>>>> es/flink-1.5.html [2] >>>>>>>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not >>>>>>>>>>>> es/flink-1.8.html>>>> >>>>>>>>>>>> czw., 3 wrz 2020 o 18:50 Josson Paul <[hidden email]> >>>>>>> napisał(a): >>>>>>>>>>>>> 1) We are in the process of migrating to Flink 1.11. But it >>>>>>> is going >>>>>>>>>>>>> to take a while before we can make everything work with the >>>>>>> latest >>>>>>>>>>>>> version. >>>>>>>>>>>>> Meanwhile since this is happening in production I am trying >>>>>>> to solve >>>>>>>>>>>>> this. >>>>>>>>>>>>> 2) Finalizae class is pointing >>>>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService >>>>>>>>>>>>> . >>>>>>>>>>>>> This class has a finalize method. I have attached spreadsheet >>>>>>> ( >>>>>>>>>>>>> *Object-explorer.csv*) to give you a high level view >>>>>>>>>>>>> 3) The difference between working cluster and NON working >>>>>>> cluster is >>>>>>>>>>>>> only on Beam and Flink. Hardware, Input message rate, >>>>>>> Application >>>>>>>>>>>>> jars, >>>>>>>>>>>>> Kafka are all the same between those 2 clusters. Working >>>>>>> cluster was >>>>>>>>>>>>> with >>>>>>>>>>>>> Flink 1.4 and Beam 2.4.0 >>>>>>>>>>>>> >>>>>>>>>>>>> Any insights into this will help me to debug further >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Josson >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski < >>>>>>> [hidden email]> >>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Have you tried using a more recent Flink version? 1.8.x is >>>>>>> no longer >>>>>>>>>>>>>> supported, and latest versions might not have this issue >>>>>>> anymore. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Secondly, have you tried backtracking those references to the >>>>>>>>>>>>>> Finalizers? Assuming that Finalizer is indeed the class >>>>>>> causing >>>>>>>>>>>>>> problems. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also it may well be a non Flink issue [1]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546 >>>>>>>>>>>>>> >>>>>>>>>>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <[hidden email]> >>>>>>>>>>>>>> >>>>>>>>>>>>>> napisał(a): >>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *ISSUE* >>>>>>>>>>>>>>> ------ >>>>>>>>>>>>>>> Flink application runs for sometime and suddenly the CPU >>>>>>> shoots up >>>>>>>>>>>>>>> and touches the peak, POD memory reaches to the peak, GC >>>>>>> count >>>>>>>>>>>>>>> increases, >>>>>>>>>>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean >>>>>>> up heap >>>>>>>>>>>>>>> space. At >>>>>>>>>>>>>>> this point I stopped sending the data and cancelled the >>>>>>> Flink Jobs. >>>>>>>>>>>>>>> Still >>>>>>>>>>>>>>> the Old-Gen space doesn't come down. I took a heap dump and >>>>>>> can see >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>> lot of Objects in the java.lang.Finalizer class. I have >>>>>>> attached the >>>>>>>>>>>>>>> details in a word document. I do have the heap dump but it >>>>>>> is close >>>>>>>>>>>>>>> to 2GB >>>>>>>>>>>>>>> of compressed size. Is it safe to upload somewhere and >>>>>>> share it >>>>>>>>>>>>>>> here?. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: >>>>>>> release-2.4.0 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: >>>>>>> release-2.4.0) >>>>>>>>>>>>>>> ---------------------------------------------------- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Application reads from Kafka and does aggregations and >>>>>>> writes into >>>>>>>>>>>>>>> Kafka. Application has 5 minutes windows. Application uses >>>>>>> Beam >>>>>>>>>>>>>>> constructs >>>>>>>>>>>>>>> to build the pipeline. To read and write we use Beam >>>>>>> connectors. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Flink version: 1.4.0 >>>>>>>>>>>>>>> Beam version: release-2.4.0 >>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check >>>>>>> pointing >>>>>>>>>>>>>>> happening to the distributed File System. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> No of task Managers: 1 >>>>>>>>>>>>>>> Heap: 6.4 GB >>>>>>>>>>>>>>> CPU: 4 Cores >>>>>>>>>>>>>>> Standalone cluster deployment on a Kubernetes pod >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam >>>>>>> version: >>>>>>>>>>>>>>> release-2.15.0) >>>>>>>>>>>>>>> ---------- >>>>>>>>>>>>>>> Application details are same as above >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *No change in application and the rate at which data is >>>>>>> injected. >>>>>>>>>>>>>>> But change in Flink and Beam versions* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Flink version: 1.8.3 >>>>>>>>>>>>>>> Beam version: release-2.15.0 >>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check >>>>>>> pointing >>>>>>>>>>>>>>> happening to the distributed File System. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> No of task Managers: 1 >>>>>>>>>>>>>>> Heap: 6.5 GB >>>>>>>>>>>>>>> CPU: 4 Cores >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Deployment: Standalone cluster deployment on a Kubernetes >>>>>>> pod >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My Observations >>>>>>>>>>>>>>> ------------- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) CPU flame graph shows that in the working version, the >>>>>>> cpu time >>>>>>>>>>>>>>> on GC is lesser compared to non-working version (Please see >>>>>>> the >>>>>>>>>>>>>>> attached >>>>>>>>>>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and >>>>>>>>>>>>>>> *CPU-flame-NOT-working.svg*) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) I have attached the flame graph for native memory MALLOC >>>>>>> calls >>>>>>>>>>>>>>> when the issue was happening. Please find the attached SVG >>>>>>> image ( >>>>>>>>>>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this >>>>>>> issue >>>>>>>>>>>>>>> happens. For me, it looks like the GC process is requesting >>>>>>> a lot of >>>>>>>>>>>>>>> native >>>>>>>>>>>>>>> memory. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3) When the issue is happening the GC cpu usage is very >>>>>>> high. Please >>>>>>>>>>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note: SVG file can be opened using any browser and it is >>>>>>> clickable >>>>>>>>>>>>>>> while opened. >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Josson >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Josson >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks >>>>>>>>>>> Josson >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks >>>>>>>>> Josson >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks >>>>>> Josson >>>>>> >>>>> >>>> >>>> -- >>>> Thanks >>>> Josson >>>> >>> >> >> -- >> Thanks >> Josson >> > 0001-BEAM-XXXXX-Don-t-chain-sources-to-avoid-checkpoint-s.patch (3K) Download Attachment |
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251
On 11.11.20 19:09, Aljoscha Krettek wrote: > Hi, > > nice work on debugging this! > > We need the synchronized block in the source because the call to > reader.advance() (via the invoker) and reader.getCurrent() (via > emitElement()) must be atomic with respect to state. We cannot advance > the reader state, not emit that record but still checkpoint the new > reader state. The monitor ensures that no checkpoint can happen in > between those to calls. > > The basic problem is now that we starve checkpointing because the > monitor/lock is not fair. This could be solved by using a fair lock but > that would require Flink proper to be changed to use a fair lock instead > of a monitor/synchronized. I don't see this as an immediate solution. > > One thing that exacerbates this problem is that too many things are > happening "under" the synchronized block. All the transforms before a > shuffle/rebalance/keyBy are chained to the source, which means that they > are invoked from the emitElement() call. You could see this by > printing/logging a stacktrace in your user function that does the Redis > lookups. > > A possible mitigation would be to disable chaining globally by inserting > a `flinkStreamEnv.disableOperatorChaining()` in [1]. > > A more surgical version would be to only disable chaining for sources. > I'm attaching a patch for that in case you're willing to try it out. > This is for latest master but it's easy enough to apply manually. > > Best, > Aljoscha > > [1] > https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225 > > > On 23.10.20 09:47, Piotr Nowojski wrote: >> Hi Josson, >> >> Thanks for great investigation and coming back to use. Aljoscha, could >> you >> help us here? It looks like you were involved in this original BEAM-3087 >> issue. >> >> Best, >> Piotrek >> >> pt., 23 paź 2020 o 07:36 Josson Paul <[hidden email]> napisał(a): >> >>> @Piotr Nowojski <[hidden email]> @Nico Kruber >>> <[hidden email]> >>> >>> An update. >>> >>> I am able to figure out the problem code. A change in the Apache Beam >>> code >>> is causing this problem. >>> >>> >>> >>> >>> >>> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on >>> the Flink’s check point lock. Now the same lock is used by Flink’s timer >>> service to emit the Watermarks. Flink’s timer service is starved to get >>> hold of the lock and for some reason it never gets that lock. >>> Aftereffect >>> of this situation is that the ‘WaterMark’ is never emitted by Flink’s >>> timer service. Because there is no Watermarks flowing through the >>> system, >>> Sliding Windows are never closed. Data gets accumulated in the Window. >>> >>> >>> >>> This problem occurs only if we have external lookup calls (like Redis) >>> happen before the data goes to Sliding Window. Something like below. >>> >>> >>> >>> KafkaSource à Transforms (Occasional Redis >>> lookup)->SlidingWindow->Transforms->Kafka Sink >>> >>> >>> >>> >>> >>> >>> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 >>> >>> . This is Beam 2.4 and you can see that there is no synchronized >>> block at >>> line 257 and 270. >>> >>> >>> >>> >>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 >>> >>> . This is Beam 2.15. See the synchronized block introduced in line >>> 264 and >>> 280. We are using Beam 2.15 and Flink 1.8. >>> >>> >>> >>> Beam introduced this synchronized block because of this bug. >>> https://issues.apache.org/jira/browse/BEAM-3087 >>> >>> >>> >>> After I removed that synchronized keyword everything started working >>> fine >>> in my application. >>> >>> >>> >>> What do you guys think about this?. Why does Beam need a Synchronized >>> block there? >>> >>> >>> >>> Beam is using this lock -> >>> >>> >>> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282 >>> >>> >>> >>> >>> Thanks, >>> >>> Josson >>> >>> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <[hidden email]> >>> wrote: >>> >>>> Hi Josson, >>>> >>>> The TM logs that you attached are only from a 5 minutes time period. >>>> Are >>>> you sure they are encompassing the period before the potential >>>> failure and >>>> after the potential failure? It would be also nice if you would >>>> provide the >>>> logs matching to the charts (like the one you were providing in the >>>> previous messages), to correlate events (spike in latency/GC with some >>>> timestamp from the logs). >>>> >>>> I was not asking necessarily to upgrade to Java9, but an updated/bug >>>> fixed version of Java8 [1]. >>>> >>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never >>>> goes out of memory whatever be the ingestion rate. our Windows are 5 >>>> minutes windows. >>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed >>>> GC or >>>> Full GC doesn't reclaim space. >>>> >>>> In both cases there is the same mechanism for the backpressure. If a >>>> task's output runs out of buffers to put produced records, it will >>>> block >>>> the task. It can be that between 1.4 and 1.8, with credit based flow >>>> control changes, the amount of available buffers for the tasks on your >>>> setup has grown, so the tasks are backpressuring later. This in turn >>>> can >>>> sometimes mean that at any point of time there is more data buffered >>>> on the >>>> operator's state, like `WindowOperator`. I'm not sure what's the >>>> best/easiest way how to check this: >>>> >>>> 1. the amount of buffered data might be visible via metrics [2][3] >>>> 2. if you enable DEBUG logs, it should be visible via: >>>> >>>>> LOG.debug("Using a local buffer pool with {}-{} buffers", >>>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments); >>>> >>>> entry logged by >>>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`. >>>> >>>> Piotrek >>>> >>>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network >>>> >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network >>>> >>>> >>>> pon., 14 wrz 2020 o 05:04 Josson Paul <[hidden email]> >>>> napisał(a): >>>> >>>>> @Piotr Nowojski <[hidden email]> @Nico Kruber >>>>> <[hidden email]> >>>>> I have attached the Taskmanager/GC/thread dumps in a zip file. >>>>> >>>>> I don't see any issues in the TM logs. >>>>> Tried to upgrade to Java 9. Flink is on top of another platform which >>>>> threw errors while upgrading to Java 9. I can't do much for now. We >>>>> will >>>>> upgrade to Jdk 11 in another 2 months. >>>>> >>>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on >>>>> both Flink 1.4 and Flink 1.8. >>>>> >>>>> Questions I am trying to get answered are >>>>> >>>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never >>>>> goes >>>>> out of memory whatever be the ingestion rate. our Windows are 5 >>>>> minutes windows. >>>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >>>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed >>>>> GC or >>>>> Full GC doesn't reclaim space. >>>>> >>>>> >>>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi Josson, >>>>>> >>>>>> Have you checked the logs as Nico suggested? At 18:55 there is a >>>>>> dip in >>>>>> non-heap memory, just about when the problems started happening. >>>>>> Maybe you >>>>>> could post the TM logs? >>>>>> Have you tried updating JVM to a newer version? >>>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but >>>>>> in an earlier message you said you increased it by 700MB? >>>>>> >>>>>> Piotrek >>>>>> >>>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <[hidden email]> >>>>>> napisał(a): >>>>>> >>>>>>> I have attached two word documents. >>>>>>> Flink1.4 and Flink1.8 >>>>>>> I reduced the heap size in the cluster and tried the experiment in >>>>>>> both Flink 1.4 and Flink 1.8. >>>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going >>>>>>> into the details here). >>>>>>> >>>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster >>>>>>> for 1 >>>>>>> hour. You can see the details in the attached Flink1.4 document >>>>>>> file. You >>>>>>> can see the GC activity and Cpu. Both are holding good. >>>>>>> >>>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started >>>>>>> happening. Issue started within 15 minutes of starting the >>>>>>> ingestion. @Piotr >>>>>>> Nowojski <[hidden email]> , you can see that there is no meta >>>>>>> space related issue. All the GC related details are available in >>>>>>> the doc. >>>>>>> >>>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in >>>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it >>>>>>> because >>>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and >>>>>>> this problem >>>>>>> is expected?. >>>>>>> >>>>>>> @Nicko, We are not doing the fat jar stuff. >>>>>>> >>>>>>> @Piotr Nowojski <[hidden email]> , we are in the process of >>>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months. >>>>>>> >>>>>>> >>>>>>> I am not getting the Finalizer problem in the latest heap dump. >>>>>>> Maybe >>>>>>> it was happening only 1 or 2 times. >>>>>>> >>>>>>> Please let me know if you need additional input >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Josson >>>>>>> >>>>>>> >>>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <[hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> What looks a bit strange to me is that with a running job, the >>>>>>>> SystemProcessingTimeService should actually not be collected (since >>>>>>>> it is >>>>>>>> still in use)! >>>>>>>> >>>>>>>> My guess is that something is indeed happening during that time >>>>>>>> frame >>>>>>>> (maybe >>>>>>>> job restarts?) and I would propose to check your logs for anything >>>>>>>> suspicious >>>>>>>> in there. >>>>>>>> >>>>>>>> >>>>>>>> When I did experiments with Beam pipelines on our platform [1], I >>>>>>>> also >>>>>>>> noticed, that the standard fat jars that Beam creates include Flink >>>>>>>> runtime >>>>>>>> classes it shouldn't (at least if you are submitting to a separate >>>>>>>> Flink >>>>>>>> cluster). This can cause all sorts of problems and I would >>>>>>>> recommend >>>>>>>> removing >>>>>>>> those from the fat jar as documented in [1]. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Nico >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099 >>>>>>>> >>>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote: >>>>>>>>> Hi Josson, >>>>>>>>> >>>>>>>>> Thanks again for the detailed answer, and sorry that I can not >>>>>>>>> help >>>>>>>> you >>>>>>>>> with some immediate answer. I presume that jvm args for 1.8 are >>>>>>>>> the >>>>>>>> same? >>>>>>>>> >>>>>>>>> Can you maybe post what exactly has crashed in your cases a) >>>>>>>>> and b)? >>>>>>>>> Re c), in the previously attached word document, it looks like >>>>>>>> Flink was >>>>>>>>> running without problems for a couple of hours/minutes, everything >>>>>>>> was >>>>>>>>> stable, no signs of growing memory consumption, impending problem, >>>>>>>> until >>>>>>>>> around 23:15, when the problem started, right? Has something else >>>>>>>> happened >>>>>>>>> at that time, something that could explain the spike? A >>>>>>>>> checkpoint? >>>>>>>> Job >>>>>>>>> crash/restart? Load spike? >>>>>>>>> >>>>>>>>> A couple of other random guesses: >>>>>>>>> - have you been monitoring other memory pools for Flink 1.4 and >>>>>>>> 1.8? Like >>>>>>>>> meta space? Growing meta space size can sometimes cause >>>>>>>>> problems. It >>>>>>>>> shouldn't be the case here, as you configured XX:MaxMetaspaceSize, >>>>>>>> but it >>>>>>>>> might be still worth checking... >>>>>>>>> - another random idea, have you tried upgrading JDK? Maybe that >>>>>>>> would solve >>>>>>>>> the problem? >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>> śr., 9 wrz 2020 o 19:53 Josson Paul <[hidden email]> >>>>>>>> napisał(a): >>>>>>>>>> Hi Piotr, >>>>>>>>>> >>>>>>>>>> *JVM start up for Flink 1.4* >>>>>>>>>> >>>>>>>>>> *-------------------------------* >>>>>>>>>> >>>>>>>>>> >>>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta >>>>>>>> >>>>>>>>>> skmgr-assurance-1-77d44cf64-z8gd4.heapdump- >>>>>>>>>> *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m* >>>>>>>>>> -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4 >>>>>>>>>> >>>>>>>>>> >>>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4* >>>>>>>> >>>>>>>>>> -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9 >>>>>>>>>> %p*-Dio.netty.eventLoopThreads=3* >>>>>>>>>> >>>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo >>>>>>>> >>>>>>>>>> >>>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw >>>>>>>> >>>>>>>>>> orkaddress.cache.ttl=120-Dnum.cores=3- >>>>>>>>>> >>>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par >>>>>>>> >>>>>>>>>> allelism=3-XX:ConcGCThreads=4 * >>>>>>>>>> >>>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di >>>>>>>> >>>>>>>>>> >>>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di >>>>>>>> >>>>>>>>>> >>>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/ >>>>>>>> >>>>>>>>>> >>>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi >>>>>>>> >>>>>>>>>> >>>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m >>>>>>>> >>>>>>>>>> aglev.MaglevServerstartmaglev> >>>>>>>>>> 1. taskmanager.memory.fraction = 0.7f (This was coming to >>>>>>>> 4.5 GB. I >>>>>>>>>> didn't know at that time that we could set memory fraction to >>>>>>>> zero >>>>>>>>>> because >>>>>>>>>> ours is a streaming job. It was picking up the default ) >>>>>>>>>> 2. Network buffer pool memory was 646MB on the Heap (I >>>>>>>> think this >>>>>>>>>> was the default based on some calculations in the Flink 1.4) >>>>>>>>>> 3. G1GC region size was 4MB (Default) >>>>>>>>>> >>>>>>>>>> I tested this setup by reducing the JVM heap by *1GB.* It still >>>>>>>> worked >>>>>>>>>> perfectly with some lags here and there. >>>>>>>>>> >>>>>>>>>> *JVM start up for Flink 1.8* >>>>>>>>>> *------------------------------------* >>>>>>>>>> a) I started with the same configuration as above. Kubenetis POD >>>>>>>> went out >>>>>>>>>> of memory. At this point I realized that in Flink 1.8 network >>>>>>>> buffer >>>>>>>>>> pools >>>>>>>>>> are moved to native memory. Based on calculations it was coming >>>>>>>> to 200MB >>>>>>>>>> in >>>>>>>>>> native memory. I increased the overall POD memory to accommodate >>>>>>>> the >>>>>>>>>> buffer pool change keeping the *heap the same*. >>>>>>>>>> >>>>>>>>>> b) Even after I modified the overall POD memory, the POD still >>>>>>>> crashed. >>>>>>>>>> At this point I generated Flame graphs to identify the CPU/Malloc >>>>>>>> calls >>>>>>>>>> (Attached as part of the initial email). Realized that cpu usage >>>>>>>> of G1GC >>>>>>>>>> is >>>>>>>>>> significantly different from Flink 1.4. Now I made 2 changes >>>>>>>>>> >>>>>>>>>> 1. taskmanager.memory.fraction = 0.01f (This will give more >>>>>>>> heap for >>>>>>>>>> user code) >>>>>>>>>> 2. Increased cpu from 3 to 4 cores. >>>>>>>>>> >>>>>>>>>> Above changes helped to hold the cluster a little >>>>>>>>>> longer. >>>>>>>> But it >>>>>>>>>> >>>>>>>>>> still crashed after sometime. >>>>>>>>>> >>>>>>>>>> c) Now I made the below changes. >>>>>>>>>> >>>>>>>>>> 1. I came across this -> >>>>>>>>>> >>>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002 >>>>>>>> >>>>>>>>>> 622.html . Now I changed the G1GC region space to *8MB >>>>>>>> *instead of the >>>>>>>>>> default 4MB*.* >>>>>>>>>> 2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later >>>>>>>> experiments) >>>>>>>>>> 3. Played around with G1RSetSparseRegionEntries >>>>>>>>>> >>>>>>>>>> This helped to avoid the POD going out of memory. But the >>>>>>>> Old Gen >>>>>>>>>> >>>>>>>>>> heap issue was very evident now (Please see the attached word >>>>>>>> document). >>>>>>>>>> >>>>>>>>>> d) Allocated additional heap memory of *700 MB *along with the >>>>>>>> above >>>>>>>>>> >>>>>>>>>> changes. This also didn't help. It just prolonged the crash. Now >>>>>>>> I need >>>>>>>>>> help from others to which direction I want to take this to . >>>>>>>>>> >>>>>>>>>> My worry is even if I upgrade to flink 1.11 this issue might >>>>>>>>>> still >>>>>>>>>> persist. >>>>>>>>>> >>>>>>>>>> I have attached a screenshot from Heap dump to show you the >>>>>>>> difference >>>>>>>>>> between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is >>>>>>>> created. Not >>>>>>>>>> sure whether this change has something to do with this memory >>>>>>>> issue that I >>>>>>>>>> am facing. >>>>>>>>>> Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Josson >>>>>>>>>> >>>>>>>>>> On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski < >>>>>>>> [hidden email]> >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>> Hi Josson, >>>>>>>>>>> >>>>>>>>>>> Thanks for getting back. >>>>>>>>>>> >>>>>>>>>>> What are the JVM settings and in particular GC settings that you >>>>>>>> are >>>>>>>>>>> using (G1GC?)? >>>>>>>>>>> It could also be an issue that in 1.4 you were just slightly >>>>>>>> below the >>>>>>>>>>> threshold of GC issues, while in 1.8, something is using a bit >>>>>>>> more >>>>>>>>>>> memory, >>>>>>>>>>> causing the GC issues to appear? Have you tried just increasing >>>>>>>> the heap >>>>>>>>>>> size? >>>>>>>>>>> Have you tried to compare on the job start up, what is the usage >>>>>>>> and size >>>>>>>>>>> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can >>>>>>>> point us in >>>>>>>>>>> the right direction. >>>>>>>>>>> >>>>>>>>>>>> My understanding on back pressure is that it is not based on >>>>>>>> Heap >>>>>>>>>>> >>>>>>>>>>> memory but based on how fast the Network buffers are filled. Is >>>>>>>> this >>>>>>>>>>> correct?. >>>>>>>>>>> >>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if >>>>>>>> the tasks >>>>>>>>>>> >>>>>>>>>>> are in the same Task manager?. >>>>>>>>>>> >>>>>>>>>>> No, local input channels are being used then, but memory for >>>>>>>> network >>>>>>>>>>> buffers is assigned to tasks regardless of the fraction of local >>>>>>>> input >>>>>>>>>>> channels in the task. However with just single taskmanager and >>>>>>>>>>> parallelism >>>>>>>>>>> of 4, the amount of the memory used by the network stack should >>>>>>>> be >>>>>>>>>>> insignificant, at least as long as you have a reasonably sized >>>>>>>> job graph >>>>>>>>>>> (32KB * (2 * parallelism + 7) * number of tasks). >>>>>>>>>>> >>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from >>>>>>>> Kafka if >>>>>>>>>>> >>>>>>>>>>> there is not sufficient heap memory to process data. Somehow >>>>>>>> this is not >>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>>>> get >>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>>>> Flink 1.8. >>>>>>>>>>> >>>>>>>>>>> No, there were no changes in this part as far as I remember. >>>>>>>> Tasks when >>>>>>>>>>> producing records are serialising them and putting into the >>>>>>>> network >>>>>>>>>>> buffers. If there are no available network buffers, the task is >>>>>>>> back >>>>>>>>>>> pressuring and stops processing new records. >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Piotrek >>>>>>>>>>> >>>>>>>>>>> wt., 8 wrz 2020 o 21:51 Josson Paul <[hidden email]> >>>>>>>> napisał(a): >>>>>>>>>>>> Hi Piotr, >>>>>>>>>>>> >>>>>>>>>>>> 2) SystemProcessingTimeService holds the >>>>>>>> HeapKeyedStateBackend and >>>>>>>>>>>> >>>>>>>>>>>> HeapKeyedStateBackend has lot of Objects and that is filling >>>>>>>> the Heap >>>>>>>>>>>> >>>>>>>>>>>> 3) I am not using Flink Kafka Connector. But we are using >>>>>>>> Apache Beam >>>>>>>>>>>> >>>>>>>>>>>> kafka connector. There is a change in the Apache Beam version. >>>>>>>> But the >>>>>>>>>>>> kafka client we are using is the same as the one which was >>>>>>>> working in >>>>>>>>>>>> the >>>>>>>>>>>> other cluster where Flink was 1.4. >>>>>>>>>>>> >>>>>>>>>>>> *There is no change in Hardware/Java/Kafka/Kafka >>>>>>>> Client/Application >>>>>>>>>>>> >>>>>>>>>>>> between the cluster which is working and not working* >>>>>>>>>>>> >>>>>>>>>>>> I am aware of the memory changes and network buffer changes >>>>>>>> between 1.4 >>>>>>>>>>>> and 1.8. >>>>>>>>>>>> >>>>>>>>>>>> Flink 1.4 had network buffers on Heap and 1.8 network buffers >>>>>>>> are on the >>>>>>>>>>>> native memory. I modified the Flink 1.8 code to put it back to >>>>>>>> Heap >>>>>>>>>>>> memory >>>>>>>>>>>> but the issue didn't get resolved. >>>>>>>>>>>> >>>>>>>>>>>> Mine is a streaming job so we set 'taskmanager.memory.fraction' >>>>>>>> to very >>>>>>>>>>>> minimal and that heap is fully available for user data. >>>>>>>>>>>> >>>>>>>>>>>> Flink 1.4 was not using Credit based Flow control and Flink 1.8 >>>>>>>> uses >>>>>>>>>>>> Credit based Flow control. *Our set up has only 1 task manager >>>>>>>> and 4 >>>>>>>>>>>> parallelisms*. According to this video >>>>>>>>>>>> >>>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward >>>>>>>> ( >>>>>>>>>>>> *16:21*) if tasks are in same task manager, Flink doesn't use >>>>>>>> Credit >>>>>>>>>>>> Based Flow control. Essentially no change between Flink 1.4 and >>>>>>>> 1.8 in >>>>>>>>>>>> *our >>>>>>>>>>>> set up*. Still I tried to change the Credit Based Flow Control >>>>>>>> to False >>>>>>>>>>>> and test my setup. The problem persists. >>>>>>>>>>>> >>>>>>>>>>>> What I noticed in Flink 1.4 is that it doesn't read data from >>>>>>>> Kafka if >>>>>>>>>>>> there is not sufficient heap memory to process data. Somehow >>>>>>>> this is not >>>>>>>>>>>> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>>>> get >>>>>>>>>>>> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>>>> Flink 1.8. >>>>>>>>>>>> >>>>>>>>>>>> My understanding on back pressure is that it is not based on >>>>>>>> Heap memory >>>>>>>>>>>> but based on how fast the Network buffers are filled. Is this >>>>>>>> correct?. >>>>>>>>>>>> Does Flink use TCP connection to communicate between tasks if >>>>>>>> the tasks >>>>>>>>>>>> are in the same Task manager?. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> josson >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski < >>>>>>>> [hidden email]> >>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> Hi Josson, >>>>>>>>>>>>> >>>>>>>>>>>>> 2. Are you sure that all/vast majority of those objects are >>>>>>>> pointing >>>>>>>>>>>>> towards SystemProcessingTimeService? And is this really the >>>>>>>> problem of >>>>>>>>>>>>> those objects? Are they taking that much of the memory? >>>>>>>>>>>>> 3. It still could be Kafka's problem, as it's likely that >>>>>>>> between 1.4 >>>>>>>>>>>>> and 1.8.x we bumped Kafka dependencies. >>>>>>>>>>>>> >>>>>>>>>>>>> Frankly if that's not some other external dependency issue, I >>>>>>>> would >>>>>>>>>>>>> expect that the problem might lie somewhere completely else. >>>>>>>> Flink's >>>>>>>>>>>>> code >>>>>>>>>>>>> relaying on the finalisation hasn't changed since 2015/2016. >>>>>>>> On the >>>>>>>>>>>>> other >>>>>>>>>>>>> hand there were quite a bit of changes between 1.4 and 1.8.x, >>>>>>>> some of >>>>>>>>>>>>> them >>>>>>>>>>>>> were affecting memory usage. Have you read release notes for >>>>>>>> versions >>>>>>>>>>>>> 1.5, >>>>>>>>>>>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have >>>>>>>> memory >>>>>>>>>>>>> related notes that could be addressed via configuration >>>>>>>> changes. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Piotrek >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not >>>>>>>> >>>>>>>>>>>>> es/flink-1.5.html [2] >>>>>>>>>>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not >>>>>>>> >>>>>>>>>>>>> es/flink-1.8.html>>>> >>>>>>>>>>>>> czw., 3 wrz 2020 o 18:50 Josson Paul <[hidden email]> >>>>>>>> napisał(a): >>>>>>>>>>>>>> 1) We are in the process of migrating to Flink 1.11. But it >>>>>>>> is going >>>>>>>>>>>>>> to take a while before we can make everything work with the >>>>>>>> latest >>>>>>>>>>>>>> version. >>>>>>>>>>>>>> Meanwhile since this is happening in production I am trying >>>>>>>> to solve >>>>>>>>>>>>>> this. >>>>>>>>>>>>>> 2) Finalizae class is pointing >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService >>>>>>>> >>>>>>>>>>>>>> . >>>>>>>>>>>>>> This class has a finalize method. I have attached spreadsheet >>>>>>>> ( >>>>>>>>>>>>>> *Object-explorer.csv*) to give you a high level view >>>>>>>>>>>>>> 3) The difference between working cluster and NON working >>>>>>>> cluster is >>>>>>>>>>>>>> only on Beam and Flink. Hardware, Input message rate, >>>>>>>> Application >>>>>>>>>>>>>> jars, >>>>>>>>>>>>>> Kafka are all the same between those 2 clusters. Working >>>>>>>> cluster was >>>>>>>>>>>>>> with >>>>>>>>>>>>>> Flink 1.4 and Beam 2.4.0 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any insights into this will help me to debug further >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Josson >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski < >>>>>>>> [hidden email]> >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Have you tried using a more recent Flink version? 1.8.x is >>>>>>>> no longer >>>>>>>>>>>>>>> supported, and latest versions might not have this issue >>>>>>>> anymore. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Secondly, have you tried backtracking those references to >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> Finalizers? Assuming that Finalizer is indeed the class >>>>>>>> causing >>>>>>>>>>>>>>> problems. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also it may well be a non Flink issue [1]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <[hidden email]> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> napisał(a): >>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *ISSUE* >>>>>>>>>>>>>>>> ------ >>>>>>>>>>>>>>>> Flink application runs for sometime and suddenly the CPU >>>>>>>> shoots up >>>>>>>>>>>>>>>> and touches the peak, POD memory reaches to the peak, GC >>>>>>>> count >>>>>>>>>>>>>>>> increases, >>>>>>>>>>>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean >>>>>>>> up heap >>>>>>>>>>>>>>>> space. At >>>>>>>>>>>>>>>> this point I stopped sending the data and cancelled the >>>>>>>> Flink Jobs. >>>>>>>>>>>>>>>> Still >>>>>>>>>>>>>>>> the Old-Gen space doesn't come down. I took a heap dump and >>>>>>>> can see >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>> lot of Objects in the java.lang.Finalizer class. I have >>>>>>>> attached the >>>>>>>>>>>>>>>> details in a word document. I do have the heap dump but it >>>>>>>> is close >>>>>>>>>>>>>>>> to 2GB >>>>>>>>>>>>>>>> of compressed size. Is it safe to upload somewhere and >>>>>>>> share it >>>>>>>>>>>>>>>> here?. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: >>>>>>>> release-2.4.0 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: >>>>>>>> release-2.4.0) >>>>>>>>>>>>>>>> ---------------------------------------------------- >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Application reads from Kafka and does aggregations and >>>>>>>> writes into >>>>>>>>>>>>>>>> Kafka. Application has 5 minutes windows. Application uses >>>>>>>> Beam >>>>>>>>>>>>>>>> constructs >>>>>>>>>>>>>>>> to build the pipeline. To read and write we use Beam >>>>>>>> connectors. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Flink version: 1.4.0 >>>>>>>>>>>>>>>> Beam version: release-2.4.0 >>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check >>>>>>>> pointing >>>>>>>>>>>>>>>> happening to the distributed File System. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> No of task Managers: 1 >>>>>>>>>>>>>>>> Heap: 6.4 GB >>>>>>>>>>>>>>>> CPU: 4 Cores >>>>>>>>>>>>>>>> Standalone cluster deployment on a Kubernetes pod >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam >>>>>>>> version: >>>>>>>>>>>>>>>> release-2.15.0) >>>>>>>>>>>>>>>> ---------- >>>>>>>>>>>>>>>> Application details are same as above >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *No change in application and the rate at which data is >>>>>>>> injected. >>>>>>>>>>>>>>>> But change in Flink and Beam versions* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Flink version: 1.8.3 >>>>>>>>>>>>>>>> Beam version: release-2.15.0 >>>>>>>>>>>>>>>> Backend State: State backend is in the Heap and check >>>>>>>> pointing >>>>>>>>>>>>>>>> happening to the distributed File System. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> No of task Managers: 1 >>>>>>>>>>>>>>>> Heap: 6.5 GB >>>>>>>>>>>>>>>> CPU: 4 Cores >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Deployment: Standalone cluster deployment on a Kubernetes >>>>>>>> pod >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My Observations >>>>>>>>>>>>>>>> ------------- >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) CPU flame graph shows that in the working version, the >>>>>>>> cpu time >>>>>>>>>>>>>>>> on GC is lesser compared to non-working version (Please see >>>>>>>> the >>>>>>>>>>>>>>>> attached >>>>>>>>>>>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> *CPU-flame-NOT-working.svg*) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) I have attached the flame graph for native memory MALLOC >>>>>>>> calls >>>>>>>>>>>>>>>> when the issue was happening. Please find the attached SVG >>>>>>>> image ( >>>>>>>>>>>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this >>>>>>>> issue >>>>>>>>>>>>>>>> happens. For me, it looks like the GC process is requesting >>>>>>>> a lot of >>>>>>>>>>>>>>>> native >>>>>>>>>>>>>>>> memory. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3) When the issue is happening the GC cpu usage is very >>>>>>>> high. Please >>>>>>>>>>>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Note: SVG file can be opened using any browser and it is >>>>>>>> clickable >>>>>>>>>>>>>>>> while opened. >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Josson >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Josson >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks >>>>>>>>>>>> Josson >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Thanks >>>>>>>>>> Josson >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks >>>>>>> Josson >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> Josson >>>>> >>>> >>> >>> -- >>> Thanks >>> Josson >>> >> > |
Free forum by Nabble | Edit this page |