CEP issue

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

CEP issue

Vishal Santoshi
This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in 
and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ), 
I get this. I have the EventTime turned on. 


java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
.
.
.

Any one has seen this issue ? 

Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.

The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears. 

A batch though does not have more than the number of keys elements ( 600 ). 

On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in 
and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ), 
I get this. I have the EventTime turned on. 


java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
.
.
.

Any one has seen this issue ? 


Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....


int id = sharedBuffer.entryId;
Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);


On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.

The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears. 

A batch though does not have more than the number of keys elements ( 600 ). 

On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in 
and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ), 
I get this. I have the EventTime turned on. 


java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
.
.
.

Any one has seen this issue ? 



Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
A new one 

java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4097)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
.
.
.
It is the toString() on 
SharedBuffer
no doubt. Some recursive loop ?


On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....


int id = sharedBuffer.entryId;
Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);


On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.

The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears. 

A batch though does not have more than the number of keys elements ( 600 ). 

On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in 
and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ), 
I get this. I have the EventTime turned on. 


java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
.
.
.

Any one has seen this issue ? 




Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
I have flink master CEP library code imported to  a 1.4 build.

On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
A new one 

java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4097)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
.
.
.
It is the toString() on 
SharedBuffer
no doubt. Some recursive loop ?


On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....


int id = sharedBuffer.entryId;
Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);


On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.

The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears. 

A batch though does not have more than the number of keys elements ( 600 ). 

On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in 
and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ), 
I get this. I have the EventTime turned on. 


java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106)
	at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
.
.
.

Any one has seen this issue ? 





Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Dawid Wysakowicz
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>


Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Kostas Kloudas
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas

On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>



Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>




Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>





Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Kostas Kloudas
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>






Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
Absolutely.  For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n. 

I think the way one would do it is to iterative condition and look at all  events  ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. 

What would constitute a StopState ? 

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>







Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Kostas Kloudas
Hi Vishal,

A stopState is a state that invalidates a partial match, e.g. a.NotFollowedBy(b).followedBy(c). 
If you have an “a” and then you see a “b” then you invalidate the pattern.

A finalState is the one where a match has been found.

Kostas

On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <[hidden email]> wrote:

Absolutely.  For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n. 

I think the way one would do it is to iterative condition and look at all  events  ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. 

What would constitute a StopState ? 

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>








Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
Aah, yes we never had a sink state so never came across a case where it was ever exercised.  When the range expires, it is a prune rather than a stop state ( we were expecting it to be a stop state )  which is some what misleading if we hold stop state to " that invalidates a partial match " whatever the reason may be.

Again I would also advise ( though not a biggy )  that strategic debug statements in the CEP core would help folks to see what actually happens. We instrumented the code to follow the construction of NFA that was very helpful. 

On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

A stopState is a state that invalidates a partial match, e.g. a.NotFollowedBy(b).followedBy(c). 
If you have an “a” and then you see a “b” then you invalidate the pattern.

A finalState is the one where a match has been found.

Kostas


On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <[hidden email]> wrote:

Absolutely.  For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n. 

I think the way one would do it is to iterative condition and look at all  events  ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. 

What would constitute a StopState ? 

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>









Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Kostas Kloudas
Why not opening a JIRA and working on adding some debug 
statements that you consider useful?

This could help the next user that faces the same issues ;)

Kostas

On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <[hidden email]> wrote:

Aah, yes we never had a sink state so never came across a case where it was ever exercised.  When the range expires, it is a prune rather than a stop state ( we were expecting it to be a stop state )  which is some what misleading if we hold stop state to " that invalidates a partial match " whatever the reason may be.

Again I would also advise ( though not a biggy )  that strategic debug statements in the CEP core would help folks to see what actually happens. We instrumented the code to follow the construction of NFA that was very helpful. 

On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

A stopState is a state that invalidates a partial match, e.g. a.NotFollowedBy(b).followedBy(c). 
If you have an “a” and then you see a “b” then you invalidate the pattern.

A finalState is the one where a match has been found.

Kostas


On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <[hidden email]> wrote:

Absolutely.  For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n. 

I think the way one would do it is to iterative condition and look at all  events  ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. 

What would constitute a StopState ? 

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>










Reply | Threaded
Open this post in threaded view
|

Re: CEP issue

Vishal Santoshi
Will do. 

On Wed, Mar 7, 2018 at 9:33 AM, Kostas Kloudas <[hidden email]> wrote:
Why not opening a JIRA and working on adding some debug 
statements that you consider useful?

This could help the next user that faces the same issues ;)

Kostas

On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <[hidden email]> wrote:

Aah, yes we never had a sink state so never came across a case where it was ever exercised.  When the range expires, it is a prune rather than a stop state ( we were expecting it to be a stop state )  which is some what misleading if we hold stop state to " that invalidates a partial match " whatever the reason may be.

Again I would also advise ( though not a biggy )  that strategic debug statements in the CEP core would help folks to see what actually happens. We instrumented the code to follow the construction of NFA that was very helpful. 

On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

A stopState is a state that invalidates a partial match, e.g. a.NotFollowedBy(b).followedBy(c). 
If you have an “a” and then you see a “b” then you invalidate the pattern.

A finalState is the one where a match has been found.

Kostas


On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <[hidden email]> wrote:

Absolutely.  For one a simple m out of n true conditions where n is defined by range is a little under optimized as in just using time(m) will not short circuit the partial patterns till the time range is achieved even if there is no way m true conditions can be achieved ( we already have had n-m false conditions ) . That makes sense as we have defined a within() condition predicated on n. 

I think the way one would do it is to iterative condition and look at all  events  ( including the ones with false but that can be expensive ) and stop a pattern. One question I had is that an NFA can be in a FinalState or a StopState. 

What would constitute a StopState ? 

On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space consumption
comes from the pattern definition, so it could be nice if more people did the same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it along the rest
of FlinkCEP documentation.

What do you think?

Kostas



On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <[hidden email]> wrote:

Hello all,  There were recent changes to the flink master that I pulled in and that seems to have solved our issue.  

Few points 

* CEP is heavy as the NFA  transition  matrix   as state which can be  possibly  n^2 ( worst case )  can easily blow up space requirements.  The after match skip strategy is likely to play a crucial role in keeping the state lean https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not require partial matches within a match to contribute to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( almost reset it ) 

* The argument that the pattern events should be lean holds much more in CEP due to the potential exponential increase in space requirements. 

* The nature of the pattern will require consideration if state does blow up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <[hidden email]> wrote:
We could not recreate in a controlled setup, but here are a few notes that we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each computation is a counter that keeps track of partial matches with each true Event already existent partial match for that computation unit. Essentially for n Events and if each Event is a true there will be roughly n-1 computations, each with representing an Event from 1 to n-1 ( so 1 or first will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of Edges originating from it reduces to 0 ( the internalRemove() which uses a Stack) , which should happen as WM keeps progressing to the nth element for unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM progression 


In case there is a FinalState  ( and we skipToFirstAfterLast ) 

* The NFA by will prune ( release )  all partial matches and prune the shared buffer and emit the current match. The computations now should be empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions. 

* What we have seen is that the call to toString method of SharedBuffer is where OOM occurs. Now in the code there is no call to a Log so we are not sure why the method or who calls that method. Surely that is not part of the Seriazation/DeSer routine or is it ( very surprising if it is ) 
* There is no out of the box implementation of "m out of n"  pattern match. We have to resort to n in range ( m * time series slot ) which we do. This is fine but what it does not allow is an optimization where if n false conditions are seen, one can prune.  Simply speaking if n-m  false have been seen there is no way  that out of n there will be ever m trues and thus SharedBuffer can be pruned to the last true seen ( very akin to skipToFirstAfterLast ).  

We will keep instrumenting the code ( which apart from the null message is easily understandable ) but would love to hear your feedback. 


























 

On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <[hidden email]> wrote:
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas


On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <[hidden email]> wrote:

This is the pattern. Will create a test case. 
/**
*
* @param condition a single condition is applied as a acceptance criteria
* @param params defining the bounds of the pattern.
* @param <U> the element in the stream
* @return compiled pattern alonf with the params.
*/
public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> of(SimpleCondition<U> condition,
RelaxedContiguityWithinTime params,
RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
String patternId) {
assert (params.seriesLength >= params.elementCount && params.elementCount > 0);
Pattern<U, ?> pattern = Pattern.
<U>begin(START).
where(condition);
if (params.elementCount > 1) pattern = pattern.
followedBy(REST).
where(condition).
times(params.elementCount - 1);

return new RelaxedContiguousPattern<U>(
pattern.within(Time.minutes(params.seriesLength * params.period.duration))
,params,
params.elementCount > 1,
params.period.duration,
mapFunc,
patternId
);
}



On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <[hidden email]> wrote:
Could you provide some example to reproduce the case? Or the Pattern that you are using? It would help track down the issue.

> On 2 Feb 2018, at 13:35, Vishal Santoshi <[hidden email]> wrote:
>
> I have pulled in the flink master cep library and the runtime ( the cluster ) is configured to work against the latest and greatest. This does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . Does that makes sense?
>
> On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <[hidden email]> wrote:
> This problem sounds very similar to this one that was fixed for 1.4.1 and 1.5.0:
> https://issues.apache.org/jira/browse/FLINK-8226
>
> Could you check if that helps with your problem too?
>
> > On 1 Feb 2018, at 23:34, Vishal Santoshi <[hidden email]> wrote:
> >
> > I have flink master CEP library code imported to  a 1.4 build.
> >
> > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <[hidden email]> wrote:
> > A new one
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(
> > Arrays.java:3332)
> >       at java.lang.
> > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > 124)
> >       at java.lang.
> > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > 448)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:136)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4106)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > 673)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4097)
> >       at org.apache.commons.lang3.
> > StringUtils.join(StringUtils.
> > java:4151)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEntry.toString(
> > SharedBuffer.java:624)
> >       at java.lang.String.valueOf(
> > String.java:2994)
> >       at java.lang.StringBuilder.
> > append(StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.
> > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > .
> > .
> > .
> > It is the toString() on
> > SharedBuffer
> > no doubt. Some recursive loop ?
> >
> >
> > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <[hidden email]> wrote:
> > It happens when it looks to throw an exception and calls shardBuffer.toString. b'coz of the check....
> >
> >
> > int id = sharedBuffer.entryId;
> > Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
> >
> >
> > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <[hidden email]> wrote:
> > The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ).  I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
> >
> > The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot )  as in one will have a bunch of elements that have a constant timestamp till the next batch appears.
> >
> > A batch though does not have more than the number of keys elements ( 600 ).
> >
> > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <[hidden email]> wrote:
> > This is a pretty simple pattern, as in I hardly have 1500 elements ( across 600 keys at the max ) put in
> > and though I have a pretty wide range , as in I am looking at a relaxed pattern ( like 40 true conditions in 6 hours ),
> > I get this. I have the EventTime turned on.
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >       at java.util.Arrays.copyOf(Arrays
> > .java:3332)
> >       at java.lang.AbstractStringBuilde
> > r.ensureCapacityInternal(Abstr
> > actStringBuilder.java:124)
> >       at java.lang.AbstractStringBuilde
> > r.append(AbstractStringBuilder
> > .java:448)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:136)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 106)
> >       at org.apache.commons.lang3.Strin
> > gUtils.join(StringUtils.java:4
> > 151)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferEntry.toSt
> > ring(SharedBuffer.java:624)
> >       at java.lang.String.valueOf(Strin
> > g.java:2994)
> >       at java.lang.StringBuilder.append
> > (StringBuilder.java:131)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
> > 64)
> >       at org.apache.flink.cep.nfa.Share
> > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
> > 35)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:888)
> >       at org.apache.flink.cep.nfa.NFA$N
> > FASerializer.serialize(NFA.jav
> > a:820)
> >       at org.apache.flink.contrib.strea
> > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> > .
> > .
> > .
> >
> > Any one has seen this issue ?
> >
> >
> >
> >
> >
>
>