Memory Leak in ProcessingTimeSessionWindow

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Stefan Richter
Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
All, 

I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. 

Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned).

Here is a plot of heap utilization:

">
So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app.

Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. 

Appreciate any pointers anyone might have.

Thanks, Ashish

On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[hidden email]> wrote:


Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish



1529454480422blob.jpg (82K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Stefan Richter
Hi,

it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. 

To analyse the heap dump, I usually take the following approach:
- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.
- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.
- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.
- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone.

Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. 

Best,
Stefan



Am 20.06.2018 um 02:33 schrieb ashish pok <[hidden email]>:

All, 

I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. 

Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned).

Here is a plot of heap utilization:

<1529454480422blob.jpg>
So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app.

Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. 

Appreciate any pointers anyone might have.

Thanks, Ashish

On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[hidden email]> wrote:


Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


<1529454480422blob.jpg>

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
Hi Stefan, 

Thanks for outlining the steps and are similar to what we have been doing for OOM issues.

However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So,

1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.
2- If so, whats the way to clear those keys when windows fire if any?
3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API?

Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter <[hidden email]> wrote:

Hi,

it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. 

To analyse the heap dump, I usually take the following approach:
- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.
- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.
- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.
- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone.

Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. 

Best,
Stefan



Am 20.06.2018 um 02:33 schrieb ashish pok <[hidden email]>:

All, 

I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. 

Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned).

Here is a plot of heap utilization:

<1529454480422blob.jpg>

So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app.

Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. 

Appreciate any pointers anyone might have.

Thanks, Ashish

On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[hidden email]> wrote:


Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


<1529454480422blob.jpg>

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
Stefan, All, 

If there are no further thoughts on this I am going to switch my app to low level Process API. I still think there is an easier solution here which I am missing but I will revisit that after I fix Production issue.

Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Thursday, June 21, 2018, 7:28 AM, ashish pok <[hidden email]> wrote:

Hi Stefan, 

Thanks for outlining the steps and are similar to what we have been doing for OOM issues.

However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So,

1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.
2- If so, whats the way to clear those keys when windows fire if any?
3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API?

Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter <[hidden email]> wrote:

Hi,

it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. 

To analyse the heap dump, I usually take the following approach:
- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.
- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.
- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.
- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone.

Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. 

Best,
Stefan



Am 20.06.2018 um 02:33 schrieb ashish pok <[hidden email]>:

All, 

I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. 

Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned).

Here is a plot of heap utilization:

<1529454480422blob.jpg>

So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app.

Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. 

Appreciate any pointers anyone might have.

Thanks, Ashish

On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[hidden email]> wrote:


Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


<1529454480422blob.jpg>

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
All,

I have been doing a little digging on this and to Stefan's point incrementing memory (not necessarily leak) was essentially because of keys that were incrementing as I was using time buckets concatenated with actual key to make unique sessions.

Taking a couple of steps back, use case is very simple tumbling window of 15 mins by keys. Stream can be viewed simply as:

    <timestamp>|<key>|<value>

We have a few of these type of pipelines and one catch here is we wanted to create an app which can process historical and current data. HIstorical data is mainly because users adhoc request for "backfill". In order to easily manage processing pipeline, we are making no distinction between historical and current data as processing is based on event time. 

1) Of course, easiest way to solve this problem is to create TumblingWindow of 15mins with some allowed lateness. One issue here was watermarks are moved forward and backfill data appeared to be viewed as late arrival data, which is a correct behavior from Flink perspective but seems to be causing issues in how we are trying to handle streams.

2) Another issue is our data collectors are highly distributed - we regularly get data from later event time buckets faster than older buckets. Also, it is also more consistent to actually create 15min buckets using concept of Session instead. So I am creating a key with <timestamp_floor_15mins>|<key> and a session gap of say 10 mins. This works perfectly from business logic perspective. However, now I am introducing quite a lot of keys which based on my heap dumps seem to be hanging around causing memory issues.

3) We converted the apps to a Process function and manage all states using key defined in step (2) and registering/unregistering timeouts. 

Solution (3) seems to be working pretty stable from memory perspective. However, it just feels like with so much high-level APIs available, we are not using them properly and keep reverting back to low level Process APIs - in the last month we have migrated about 5 or 6 apps to Process now :) 

For solution (2) it feels like any other Session aggregation use case will have the issue of keys hanging around (eg: for click streams with user sessions etc). Isn't there a way to clear those session windows? Sorry, I just feel like we are missing something simple and have been reverting to low level APIs instead.

Thanks,


On Friday, June 22, 2018, 9:00:14 AM EDT, ashish pok <[hidden email]> wrote:


Stefan, All, 

If there are no further thoughts on this I am going to switch my app to low level Process API. I still think there is an easier solution here which I am missing but I will revisit that after I fix Production issue.

Thanks, Ashish


On Thursday, June 21, 2018, 7:28 AM, ashish pok <[hidden email]> wrote:

Hi Stefan, 

Thanks for outlining the steps and are similar to what we have been doing for OOM issues.

However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So,

1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.
2- If so, whats the way to clear those keys when windows fire if any?
3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API?

Thanks, Ashish


Sent from Yahoo Mail for iPhone

On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter <[hidden email]> wrote:

Hi,

it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. 

To analyse the heap dump, I usually take the following approach:
- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.
- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.
- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.
- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone.

Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. 

Best,
Stefan



Am 20.06.2018 um 02:33 schrieb ashish pok <[hidden email]>:

All, 

I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. 

Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned).

Here is a plot of heap utilization:

<1529454480422blob.jpg>

So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app.

Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. 

Appreciate any pointers anyone might have.

Thanks, Ashish

On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[hidden email]> wrote:


Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. 


Thanks, Ashish

On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[hidden email]> wrote:

Hi,

can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier.

Best,
Stefan

Am 15.06.2018 um 23:01 schrieb ashish pok <[hidden email]>:

All,

I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). 

I have a very simple data pipeline:

                    DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780)))) 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent")
;

I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE)

            @Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. 

For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically.

public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple, BasicFactTuple, String, TimeWindow> {

private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;

private Meter error;
@Override
public void open(Configuration conf) throws Exception {
    this.in = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.out = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
    this.error = getRuntimeContext()
        .getMetricGroup()
        .addGroup(AppConstants.APP_METRICS.PROCESS)
        .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}

@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple> events, Collector<BasicFactTuple> collector) throws Exception {
}
}


Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward.

Thanks, Ashish


<1529454480422blob.jpg>

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Ashish Pokharel
One more attempt to get some feedback on this. It basically boils down to using High-Level Window API in scenarios where keys are unbounded / infinite but can be expired after certain time. From what we have observed (solution 2 below), some properties of keys are still in state (guessing key itself and watermarks etc). Is there any way to clean these up as FIRE_AND_PURGE trigger doesn’t seem to do it? I am of an option that even if we end up using HDFS or RocksDB backed State, we would think we would still want to clean those up. Any suggestions on this before we start re-writing our apps to start using Low-Level Process APIs in general? 

Thanks, Ashish

On Jul 2, 2018, at 10:47 AM, ashish pok <[hidden email]> wrote:

All,

I have been doing a little digging on this and to Stefan's earlier point incrementing memory (not necessarily leak) was essentially because of keys that were incrementing as I was using time buckets concatenated with actual key to make unique sessions.

Taking a couple of steps back, use case is very simple tumbling window of 15 mins by keys. Stream can be viewed simply as:

    <timestamp>|<key>|<value>

We have a few of these type of pipelines and one catch here is we wanted to create an app which can process historical and current data. HIstorical data is mainly because users adhoc request for "backfill". In order to easily manage processing pipeline, we are making no distinction between historical and current data as processing is based on event time. 

1) Of course, easiest way to solve this problem is to create TumblingWindow of 15mins with some allowed lateness. One issue here was watermarks are moved forward and backfill data appeared to be viewed as late arrival data, which is a correct behavior from Flink perspective but seems to be causing issues in how we are trying to handle streams.

2) Another issue is our data collectors are highly distributed - we regularly get data from later event time buckets faster than older buckets. Also, it is also more consistent to actually create 15min buckets using concept of Session instead. So I am creating a key with <timestamp_floor_15mins>|<key> and a session gap of say 10 mins. This works perfectly from business logic perspective. However, now I am introducing quite a lot of keys which based on my heap dumps seem to be hanging around causing memory issues.

3) We converted the apps to a Process function and manage all states using key defined in step (2) and registering/unregistering timeouts. 

Solution (3) seems to be working pretty stable from memory perspective. However, it just feels like with so much high-level APIs available, we are not using them properly and keep reverting back to low level Process APIs - in the last month we have migrated about 5 or 6 apps to Process now :) 

For solution (2) it feels like any other Session aggregation use case will have the issue of keys hanging around (eg: for click streams with user sessions etc). Isn't there a way to clear those session windows? Sorry, I just feel like we are missing something simple and have been reverting to low level APIs instead.

Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak in ProcessingTimeSessionWindow

Stefan Richter
Hi,

for most windows, all state is cleared through FIRE_AND_PURGE, except for windows that are subtypes of merging windows, such as session windows. Here, the state still remembers the window itself until the watermark passes the session timeout+allowed lateness. This is done so that elements that fall into the window after firing can still resurrect the window’s information, see WindowOperator.clearAllState(). Only after that, all state from the session window is removed. Looking in Aljoscha, who might have more ideas about the best ways to implement your use case.

Best,
Stefan

Am 22.07.2018 um 18:19 schrieb Ashish Pokharel <[hidden email]>:

One more attempt to get some feedback on this. It basically boils down to using High-Level Window API in scenarios where keys are unbounded / infinite but can be expired after certain time. From what we have observed (solution 2 below), some properties of keys are still in state (guessing key itself and watermarks etc). Is there any way to clean these up as FIRE_AND_PURGE trigger doesn’t seem to do it? I am of an option that even if we end up using HDFS or RocksDB backed State, we would think we would still want to clean those up. Any suggestions on this before we start re-writing our apps to start using Low-Level Process APIs in general? 

Thanks, Ashish

On Jul 2, 2018, at 10:47 AM, ashish pok <[hidden email]> wrote:

All,

I have been doing a little digging on this and to Stefan's earlier point incrementing memory (not necessarily leak) was essentially because of keys that were incrementing as I was using time buckets concatenated with actual key to make unique sessions.

Taking a couple of steps back, use case is very simple tumbling window of 15 mins by keys. Stream can be viewed simply as:

    <timestamp>|<key>|<value>

We have a few of these type of pipelines and one catch here is we wanted to create an app which can process historical and current data. HIstorical data is mainly because users adhoc request for "backfill". In order to easily manage processing pipeline, we are making no distinction between historical and current data as processing is based on event time. 

1) Of course, easiest way to solve this problem is to create TumblingWindow of 15mins with some allowed lateness. One issue here was watermarks are moved forward and backfill data appeared to be viewed as late arrival data, which is a correct behavior from Flink perspective but seems to be causing issues in how we are trying to handle streams.

2) Another issue is our data collectors are highly distributed - we regularly get data from later event time buckets faster than older buckets. Also, it is also more consistent to actually create 15min buckets using concept of Session instead. So I am creating a key with <timestamp_floor_15mins>|<key> and a session gap of say 10 mins. This works perfectly from business logic perspective. However, now I am introducing quite a lot of keys which based on my heap dumps seem to be hanging around causing memory issues.

3) We converted the apps to a Process function and manage all states using key defined in step (2) and registering/unregistering timeouts. 

Solution (3) seems to be working pretty stable from memory perspective. However, it just feels like with so much high-level APIs available, we are not using them properly and keep reverting back to low level Process APIs - in the last month we have migrated about 5 or 6 apps to Process now :) 

For solution (2) it feels like any other Session aggregation use case will have the issue of keys hanging around (eg: for click streams with user sessions etc). Isn't there a way to clear those session windows? Sorry, I just feel like we are missing something simple and have been reverting to low level APIs instead.

Thanks, Ashish