Hey Flink users, I wanted to see if I could get some insight on what the heap memory profile of my stream app should look like vs my expectation. My layout consists of a sequence of FlatMaps + Maps, feeding a pair of 5 minute
TumblingEventTimeWindows, intervalJoined, into a 24 hour (per 5 minute) SlidingEventTimeWindow, then intervalJoined again, back into the first set of FlatMaps. The data flow works as expected, and the reports I am generated in the last join appear to be correct,
and contain info from the 24 hour sliding window. My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the
t-24hr window frames expire and release back to the JVM. What is actually happening is when a constant data source feeds the stream, the heap memory profile grows linearly past the 24 hour mark. Could this be a result of a misunderstanding of how the
window’s memory states are kept, or is my assumption correct, and it is more likely I have a leak somewhere? Thanks as always Chris
|
On 15.05.20 15:17, Slotterback, Chris wrote:
> My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames expire and release back to the JVM. What is actually happening is when a constant data source feeds the stream, the heap memory profile grows linearly past the 24 hour mark. Could this be a result of a misunderstanding of how the window’s memory states are kept, or is my assumption correct, and it is more likely I have a leak somewhere? Will memory keep growing indefinitely? That would indicate a bug? What sort of lateness/watermark settings do you have? What window function do you use? ProcessWindowFunction, or sth that aggregates? Side note: with sliding windows of 24h/5min you will have a "write amplification" of 24*60/5=288, each record will be in 288 windows, which will each be kept in separate state? Best, Aljoscha |
What I've noticed is that heap memory ends up growing linearly with time indefinitely (past 24 hours) until it hits the roof of the allocated heap for the task manager, which leads me to believe I am leaking somewhere. All of my windows have an allowed lateness of 5 minutes, and my watermarks are pulled from time embedded in the records using BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use ProcessJoinFunctions.
I expect this app to use a significant amount of memory at scale due to the 288 5-minute intervals in 24 hours, and records being put in all 288 window states, and as the application runs for 24 hours memory would increase as all 288(*unique key) windows build with incoming records, but then after 24 hours the memory should stop growing, or at least grow at a different rate? Also of note, we are using a FsStateBackend configuration, and plan to move to RocksDBStateBackend, but from what I can tell, this would only reduce memory and delay hitting the heap memory capacity, not stall it forever? Thanks Chris On 5/18/20, 7:29 AM, "Aljoscha Krettek" <[hidden email]> wrote: On 15.05.20 15:17, Slotterback, Chris wrote: > My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames expire and release back to the JVM. What is actually happening is when a constant data source feeds the stream, the heap memory profile grows linearly past the 24 hour mark. Could this be a result of a misunderstanding of how the window’s memory states are kept, or is my assumption correct, and it is more likely I have a leak somewhere? Will memory keep growing indefinitely? That would indicate a bug? What sort of lateness/watermark settings do you have? What window function do you use? ProcessWindowFunction, or sth that aggregates? Side note: with sliding windows of 24h/5min you will have a "write amplification" of 24*60/5=288, each record will be in 288 windows, which will each be kept in separate state? Best, Aljoscha |
For those who are interested or googling the mail archives in 8 months, the issue was garbage collection related.
The default 1.8 jvm garbage collector (parallel gc) was being lazy in its marking and collection phases and letting the heap build to a level that was causing memory exceptions and stalled tms. This app has a lot of state, and memory usage well above 10GB at times. The solution was moving to the G1 collector which is very aggressive in its young generation collection by default, at the cost of some cpu usage and requires some tuning, but keeps the memory levels much more stable. On 5/20/20, 9:05 AM, "Slotterback, Chris" <[hidden email]> wrote: What I've noticed is that heap memory ends up growing linearly with time indefinitely (past 24 hours) until it hits the roof of the allocated heap for the task manager, which leads me to believe I am leaking somewhere. All of my windows have an allowed lateness of 5 minutes, and my watermarks are pulled from time embedded in the records using BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use ProcessJoinFunctions. I expect this app to use a significant amount of memory at scale due to the 288 5-minute intervals in 24 hours, and records being put in all 288 window states, and as the application runs for 24 hours memory would increase as all 288(*unique key) windows build with incoming records, but then after 24 hours the memory should stop growing, or at least grow at a different rate? Also of note, we are using a FsStateBackend configuration, and plan to move to RocksDBStateBackend, but from what I can tell, this would only reduce memory and delay hitting the heap memory capacity, not stall it forever? Thanks Chris On 5/18/20, 7:29 AM, "Aljoscha Krettek" <[hidden email]> wrote: On 15.05.20 15:17, Slotterback, Chris wrote: > My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames expire and release back to the JVM. What is actually happening is when a constant data source feeds the stream, the heap memory profile grows linearly past the 24 hour mark. Could this be a result of a misunderstanding of how the window’s memory states are kept, or is my assumption correct, and it is more likely I have a leak somewhere? Will memory keep growing indefinitely? That would indicate a bug? What sort of lateness/watermark settings do you have? What window function do you use? ProcessWindowFunction, or sth that aggregates? Side note: with sliding windows of 24h/5min you will have a "write amplification" of 24*60/5=288, each record will be in 288 windows, which will each be kept in separate state? Best, Aljoscha |
Just to double check: the issue was resolved by using a different GC?
Because the default GC was too "lazy". ;-) Best, Aljoscha On 21.05.20 18:09, Slotterback, Chris wrote: > For those who are interested or googling the mail archives in 8 months, the issue was garbage collection related. > > The default 1.8 jvm garbage collector (parallel gc) was being lazy in its marking and collection phases and letting the heap build to a level that was causing memory exceptions and stalled tms. This app has a lot of state, and memory usage well above 10GB at times. The solution was moving to the G1 collector which is very aggressive in its young generation collection by default, at the cost of some cpu usage and requires some tuning, but keeps the memory levels much more stable. > > On 5/20/20, 9:05 AM, "Slotterback, Chris" <[hidden email]> wrote: > > What I've noticed is that heap memory ends up growing linearly with time indefinitely (past 24 hours) until it hits the roof of the allocated heap for the task manager, which leads me to believe I am leaking somewhere. All of my windows have an allowed lateness of 5 minutes, and my watermarks are pulled from time embedded in the records using BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use ProcessJoinFunctions. > > I expect this app to use a significant amount of memory at scale due to the 288 5-minute intervals in 24 hours, and records being put in all 288 window states, and as the application runs for 24 hours memory would increase as all 288(*unique key) windows build with incoming records, but then after 24 hours the memory should stop growing, or at least grow at a different rate? > > Also of note, we are using a FsStateBackend configuration, and plan to move to RocksDBStateBackend, but from what I can tell, this would only reduce memory and delay hitting the heap memory capacity, not stall it forever? > > Thanks > Chris > > > On 5/18/20, 7:29 AM, "Aljoscha Krettek" <[hidden email]> wrote: > > On 15.05.20 15:17, Slotterback, Chris wrote: > > My understanding is that while all these windows build their memory state, I can expect heap memory to grow for the 24 hour length of the SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames expire and release back to the JVM. What is actually happening is when a constant data source feeds the stream, the heap memory profile grows linearly past the 24 hour mark. Could this be a result of a misunderstanding of how the window’s memory states are kept, or is my assumption correct, and it is more likely I have a leak somewhere? > > Will memory keep growing indefinitely? That would indicate a bug? What > sort of lateness/watermark settings do you have? What window function do > you use? ProcessWindowFunction, or sth that aggregates? > > Side note: with sliding windows of 24h/5min you will have a "write > amplification" of 24*60/5=288, each record will be in 288 windows, which > will each be kept in separate state? > > Best, > Aljoscha > > > |
Chris, What version of Flink are you using? I also have an issue with slow but continual memory growth in a windowing function but it seems like the taskmanager.sh script I'm using already has the -XX+UseG1GC flag set: https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/taskmanager.sh#L43 On Mon, May 25, 2020 at 3:31 AM Aljoscha Krettek <[hidden email]> wrote: Just to double check: the issue was resolved by using a different GC? |
Aljoscha, Maybe “lazy” isn’t the right term haha it’s my interpretation that during mark and sweep of the default GC, memory from older windows wasn’t being fully marked for collection. Since switching to G1, collection seems to be much more aggressive,
and whenever the young generation memory exceeds the configured %, it reclaims all unreferenced state. Mitch, I am running 1.9 at the moment, with plans to upgrade to 1.10 at some point. It looks like in that tm script that if any other jvm options are set, it won’t append the UseG1GC flag? We had more config so it wasn’t G1 by default. A quick way to verify the taskmanagers are actually using G1 is checking the flink taskmanager
garbage collection metrics for G1_Young_Generation and G1_Old_Generation. If it isn’t being set, you can always append the G1 flag manually to java ops in the flink-conf file each host will use to start. Chris From: Mitch Lloyd <[hidden email]> Chris, What version of Flink are you using? I also have an issue with slow but continual memory growth in a windowing function but it seems like the taskmanager.sh script I'm using already has the -XX+UseG1GC flag set: https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/taskmanager.sh#L43 On Mon, May 25, 2020 at 3:31 AM Aljoscha Krettek <[hidden email]> wrote:
Screen Shot 2020-05-27 at 12.53.50 PM.png (35K) Download Attachment |
Free forum by Nabble | Edit this page |