Timer & Window Memory Consumption

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Timer & Window Memory Consumption

Navneeth Krishnan
Hi,

I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created.

Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService.

Window code:
dataStream.keyBy(new MessageKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new Aggregate());

Captured at time T:

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2074427      481933816  [B
   2:        357192      339368592  [D
   3:      12759222      204147552  java.lang.Integer
   4:         31416       85151832  [I
   5:        900982       83872240  [C
   6:        631888       20220416  java.util.HashMap$Node
   7:        804203       19300872  java.lang.String
   8:        541651       17332832  org.apache.flink.streaming.api.operators.InternalTimer
   9:        540252       17288064  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T1 (T + 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:      12084258     2282849264  [B
   2:       1922018     1828760896  [D
   3:      68261427     1092182832  java.lang.Integer
   4:       2712099      291488736  [C
   5:         54201       98798976  [I
   6:       2028250       48678000  java.lang.String
   7:         66080       43528136  [[B
   8:       1401915       35580168  [Ljava.lang.Object;
   9:        949062       30369984  java.util.HashMap$Node
  10:        570832       18266624  org.apache.flink.streaming.api.operators.InternalTimer
  11:        549979       17599328  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T2 (T1+ 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:       9911982     2920384472  [B
   2:       1584406     1510958520  [D
   3:      56087337      897397392  java.lang.Integer
   4:      26080337      834570784  java.util.HashMap$Node
   5:      25756748      824215936  org.apache.flink.streaming.api.operators.InternalTimer
   6:      25740086      823682752  org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Timer & Window Memory Consumption

Fabian Hueske-2
Hi,

TimeWindows and Timers are created for each window, i.e., every 5 seconds for every distinct key that a task is processing.
Event-time windows are completed and cleaned up when a watermark is received that passes the window end timestamp.
Therefore, there might be more than one window per key depending on the watermarks.

Hope this helps,
Fabian

2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Hi,

I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created.

Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService.

Window code:
dataStream.keyBy(new MessageKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new Aggregate());

Captured at time T:

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2074427      481933816  [B
   2:        357192      339368592  [D
   3:      12759222      204147552  java.lang.Integer
   4:         31416       85151832  [I
   5:        900982       83872240  [C
   6:        631888       20220416  java.util.HashMap$Node
   7:        804203       19300872  java.lang.String
   8:        541651       17332832  org.apache.flink.streaming.api.operators.InternalTimer
   9:        540252       17288064  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T1 (T + 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:      12084258     <a href="tel:(228)%20284-9264" value="+12282849264" target="_blank">2282849264  [B
   2:       1922018     1828760896  [D
   3:      68261427     1092182832  java.lang.Integer
   4:       2712099      291488736  [C
   5:         54201       98798976  [I
   6:       2028250       48678000  java.lang.String
   7:         66080       43528136  [[B
   8:       1401915       35580168  [Ljava.lang.Object;
   9:        949062       30369984  java.util.HashMap$Node
  10:        570832       18266624  org.apache.flink.streaming.api.operators.InternalTimer
  11:        549979       17599328  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T2 (T1+ 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:       9911982     2920384472  [B
   2:       1584406     1510958520  [D
   3:      56087337      897397392  java.lang.Integer
   4:      26080337      834570784  java.util.HashMap$Node
   5:      25756748      824215936  org.apache.flink.streaming.api.operators.InternalTimer
   6:      25740086      823682752  org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Timer & Window Memory Consumption

Navneeth Krishnan
Thanks Fabian but for 1.5k messages per second per TM there are several million Internal & TimerWindow objects created within a period of 5 seconds. Is there a way to get debug this issue?
 
Regards,
Navneeth

On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

TimeWindows and Timers are created for each window, i.e., every 5 seconds for every distinct key that a task is processing.
Event-time windows are completed and cleaned up when a watermark is received that passes the window end timestamp.
Therefore, there might be more than one window per key depending on the watermarks.

Hope this helps,
Fabian

2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Hi,

I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created.

Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService.

Window code:
dataStream.keyBy(new MessageKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new Aggregate());

Captured at time T:

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2074427      481933816  [B
   2:        357192      339368592  [D
   3:      12759222      204147552  java.lang.Integer
   4:         31416       85151832  [I
   5:        900982       83872240  [C
   6:        631888       20220416  java.util.HashMap$Node
   7:        804203       19300872  java.lang.String
   8:        541651       17332832  org.apache.flink.streaming.api.operators.InternalTimer
   9:        540252       17288064  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T1 (T + 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:      12084258     <a href="tel:(228)%20284-9264" value="+12282849264" target="_blank">2282849264  [B
   2:       1922018     1828760896  [D
   3:      68261427     1092182832  java.lang.Integer
   4:       2712099      291488736  [C
   5:         54201       98798976  [I
   6:       2028250       48678000  java.lang.String
   7:         66080       43528136  [[B
   8:       1401915       35580168  [Ljava.lang.Object;
   9:        949062       30369984  java.util.HashMap$Node
  10:        570832       18266624  org.apache.flink.streaming.api.operators.InternalTimer
  11:        549979       17599328  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T2 (T1+ 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:       9911982     2920384472  [B
   2:       1584406     1510958520  [D
   3:      56087337      897397392  java.lang.Integer
   4:      26080337      834570784  java.util.HashMap$Node
   5:      25756748      824215936  org.apache.flink.streaming.api.operators.InternalTimer
   6:      25740086      823682752  org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: Timer & Window Memory Consumption

Fabian Hueske-2
Aljoscha (in CC), do you have an idea about this issue?

Thanks,
Fabian

2018-01-24 7:06 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Thanks Fabian but for 1.5k messages per second per TM there are several million Internal & TimerWindow objects created within a period of 5 seconds. Is there a way to get debug this issue?
 
Regards,
Navneeth

On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

TimeWindows and Timers are created for each window, i.e., every 5 seconds for every distinct key that a task is processing.
Event-time windows are completed and cleaned up when a watermark is received that passes the window end timestamp.
Therefore, there might be more than one window per key depending on the watermarks.

Hope this helps,
Fabian

2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Hi,

I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created.

Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService.

Window code:
dataStream.keyBy(new MessageKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new Aggregate());

Captured at time T:

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2074427      481933816  [B
   2:        357192      339368592  [D
   3:      12759222      204147552  java.lang.Integer
   4:         31416       85151832  [I
   5:        900982       83872240  [C
   6:        631888       20220416  java.util.HashMap$Node
   7:        804203       19300872  java.lang.String
   8:        541651       17332832  org.apache.flink.streaming.api.operators.InternalTimer
   9:        540252       17288064  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T1 (T + 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:      12084258     <a href="tel:(228)%20284-9264" value="+12282849264" target="_blank">2282849264  [B
   2:       1922018     1828760896  [D
   3:      68261427     1092182832  java.lang.Integer
   4:       2712099      291488736  [C
   5:         54201       98798976  [I
   6:       2028250       48678000  java.lang.String
   7:         66080       43528136  [[B
   8:       1401915       35580168  [Ljava.lang.Object;
   9:        949062       30369984  java.util.HashMap$Node
  10:        570832       18266624  org.apache.flink.streaming.api.operators.InternalTimer
  11:        549979       17599328  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T2 (T1+ 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:       9911982     2920384472  [B
   2:       1584406     1510958520  [D
   3:      56087337      897397392  java.lang.Integer
   4:      26080337      834570784  java.util.HashMap$Node
   5:      25756748      824215936  org.apache.flink.streaming.api.operators.InternalTimer
   6:      25740086      823682752  org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.




Reply | Threaded
Open this post in threaded view
|

Re: Timer & Window Memory Consumption

Aljoscha Krettek
You can connect to the TaskManagers with a tool such as jvisualvm to observe where the objects are created. It doesn't sound normal that there are millions of these objects if only a couple thousand elements come in.

On 25. Jan 2018, at 14:59, Fabian Hueske <[hidden email]> wrote:

Aljoscha (in CC), do you have an idea about this issue?

Thanks,
Fabian

2018-01-24 7:06 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Thanks Fabian but for 1.5k messages per second per TM there are several million Internal & TimerWindow objects created within a period of 5 seconds. Is there a way to get debug this issue?
 
Regards,
Navneeth

On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

TimeWindows and Timers are created for each window, i.e., every 5 seconds for every distinct key that a task is processing.
Event-time windows are completed and cleaned up when a watermark is received that passes the window end timestamp.
Therefore, there might be more than one window per key depending on the watermarks.

Hope this helps,
Fabian

2018-01-21 6:48 GMT+01:00 Navneeth Krishnan <[hidden email]>:
Hi,

I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created.

Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService.

Window code:
dataStream.keyBy(new MessageKeySelector())
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new Aggregate());

Captured at time T:

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2074427      481933816  [B
   2:        357192      339368592  [D
   3:      12759222      204147552  java.lang.Integer
   4:         31416       85151832  [I
   5:        900982       83872240  [C
   6:        631888       20220416  java.util.HashMap$Node
   7:        804203       19300872  java.lang.String
   8:        541651       17332832  org.apache.flink.streaming.api.operators.InternalTimer
   9:        540252       17288064  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T1 (T + 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:      12084258     <a href="tel:(228)%20284-9264" value="+12282849264" target="_blank" class="">2282849264  [B
   2:       1922018     1828760896  [D
   3:      68261427     1092182832  java.lang.Integer
   4:       2712099      291488736  [C
   5:         54201       98798976  [I
   6:       2028250       48678000  java.lang.String
   7:         66080       43528136  [[B
   8:       1401915       35580168  [Ljava.lang.Object;
   9:        949062       30369984  java.util.HashMap$Node
  10:        570832       18266624  org.apache.flink.streaming.api.operators.InternalTimer
  11:        549979       17599328  org.apache.flink.streaming.api.windowing.windows.TimeWindow


Captured at T2 (T1+ 5 seconds):

 num     #instances         #bytes  class name
----------------------------------------------
   1:       9911982     2920384472  [B
   2:       1584406     1510958520  [D
   3:      56087337      897397392  java.lang.Integer
   4:      26080337      834570784  java.util.HashMap$Node
   5:      25756748      824215936  org.apache.flink.streaming.api.operators.InternalTimer
   6:      25740086      823682752  org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.