Flink : CEP processing

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink : CEP processing

M Singh
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans
Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

Sameer Wadkar
Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans

Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

M Singh
Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans



Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

Sameer Wadkar
In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans



Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

M Singh
Hi Sameer:

If we use a within window for event series - 

1. Does it interfere with the default time windows ?
2. How does it affect snapshotting ?  
3. If the window is too large are the events stored in a "processor" for the window to expire ?
4. Are there any other know limitations and best practices of using CEP with Flink ?

Thanks again for your help.



On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <[hidden email]> wrote:


In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans





Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

Sameer Wadkar
In one of the earlier thread Till explained this to me (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html)

1. Within does not use time windows. It sort of uses session windows where the session begins when the first event of the pattern is identified. The timer starts when the "first" event in the pattern fires. If the pattern completes "within" the designated times (meaning the "next" and "followed by" fire as will "within" the time specified) you have a match or else the window is removed. I don't know how it is implemented but I doubt it stores all the events in memory for the "within" window (there is not need to). It will only store the relevant events (first, next, followed by, etc). So memory would not be an issue here. If two "first" type events are identified I think two "within" sessions are created.

2. Snapshotting (I don't know much in this area so I cannot answer). Why should it be different though? You are using operators and state. It should work the same way. But I am not too familiar with that.

3. The "Within" window is not an issue. Even the window preceding that should not be unless you are using WindowFunction (more memory friendly alternative is https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions ) by themselves and using a really large window

4. The way I am using it, it is working fine. Some of the limitations I have seen are related to this paper not being fully implemented (https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf). I don't know how to support negation in an event stream but I don't need it for now.

Thanks,
Sameer


On Tue, Aug 9, 2016 at 3:45 PM, M Singh <[hidden email]> wrote:
Hi Sameer:

If we use a within window for event series - 

1. Does it interfere with the default time windows ?
2. How does it affect snapshotting ?  
3. If the window is too large are the events stored in a "processor" for the window to expire ?
4. Are there any other know limitations and best practices of using CEP with Flink ?

Thanks again for your help.



On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <[hidden email]> wrote:


In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans






Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

M Singh
Thanks for the pointers Sameer.


The reason I wanted to find out about snapshotting with CEP is because I thought that CEP state might also be snapshotted for recovery. If that is the case, then there are events in the CEP might be in two snapshots.

Mans


On Tuesday, August 9, 2016 1:15 PM, Sameer W <[hidden email]> wrote:


In one of the earlier thread Till explained this to me (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html)

1. Within does not use time windows. It sort of uses session windows where the session begins when the first event of the pattern is identified. The timer starts when the "first" event in the pattern fires. If the pattern completes "within" the designated times (meaning the "next" and "followed by" fire as will "within" the time specified) you have a match or else the window is removed. I don't know how it is implemented but I doubt it stores all the events in memory for the "within" window (there is not need to). It will only store the relevant events (first, next, followed by, etc). So memory would not be an issue here. If two "first" type events are identified I think two "within" sessions are created.

2. Snapshotting (I don't know much in this area so I cannot answer). Why should it be different though? You are using operators and state. It should work the same way. But I am not too familiar with that.

3. The "Within" window is not an issue. Even the window preceding that should not be unless you are using WindowFunction (more memory friendly alternative is https://ci.apache.org/ projects/flink/flink-docs- master/apis/streaming/windows. html#window-functions ) by themselves and using a really large window

4. The way I am using it, it is working fine. Some of the limitations I have seen are related to this paper not being fully implemented (https://people.cs.umass.edu/ ~yanlei/publications/sase- sigmod08.pdf). I don't know how to support negation in an event stream but I don't need it for now.

Thanks,
Sameer


On Tue, Aug 9, 2016 at 3:45 PM, M Singh <[hidden email]> wrote:
Hi Sameer:

If we use a within window for event series - 

1. Does it interfere with the default time windows ?
2. How does it affect snapshotting ?  
3. If the window is too large are the events stored in a "processor" for the window to expire ?
4. Are there any other know limitations and best practices of using CEP with Flink ?

Thanks again for your help.



On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <[hidden email]> wrote:


In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans








Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

Sameer Wadkar
Mans,

I think at this time we need someone who knows the internal implementation to answer definitively-

My understanding is-

1. Internally CEP is like a map operator with session-like semantics operating in a pipeline. You could do what it does but you would have to implement all that. If you need support for negation today that is probably how you would do it.
2. Ultimately CEP produces a stream which you need to write to some sink. If you sink supports exactly-once semantics then your pipeline will support it. So I think snapshotting with CEP would be no different. If you send out events (alerts) from within your select(PatternSelectFunction) then yes, you could "send" your alerts multiple times. If instead you wrote to a sink (with exactly once semantics) which then sent alerts out in the real world you should not get those multiple alerts. I am sending alerts from within my PatternSelectFunction. So I am taking the chance of sending alerts twice which is ok for my use-case.

I am operating under the belief (which seems logical to me) that CEP is like a stateful map operator at end of my processing pipeline. Snapshotting would work exactly like it would in that case in CEP.

Thanks,
Sameer


On Wed, Aug 10, 2016 at 2:42 PM, M Singh <[hidden email]> wrote:
Thanks for the pointers Sameer.


The reason I wanted to find out about snapshotting with CEP is because I thought that CEP state might also be snapshotted for recovery. If that is the case, then there are events in the CEP might be in two snapshots.

Mans


On Tuesday, August 9, 2016 1:15 PM, Sameer W <[hidden email]> wrote:


In one of the earlier thread Till explained this to me (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html)

1. Within does not use time windows. It sort of uses session windows where the session begins when the first event of the pattern is identified. The timer starts when the "first" event in the pattern fires. If the pattern completes "within" the designated times (meaning the "next" and "followed by" fire as will "within" the time specified) you have a match or else the window is removed. I don't know how it is implemented but I doubt it stores all the events in memory for the "within" window (there is not need to). It will only store the relevant events (first, next, followed by, etc). So memory would not be an issue here. If two "first" type events are identified I think two "within" sessions are created.

2. Snapshotting (I don't know much in this area so I cannot answer). Why should it be different though? You are using operators and state. It should work the same way. But I am not too familiar with that.

3. The "Within" window is not an issue. Even the window preceding that should not be unless you are using WindowFunction (more memory friendly alternative is https://ci.apache.org/ projects/flink/flink-docs- master/apis/streaming/windows. html#window-functions ) by themselves and using a really large window

4. The way I am using it, it is working fine. Some of the limitations I have seen are related to this paper not being fully implemented (https://people.cs.umass.edu/ ~yanlei/publications/sase- sigmod08.pdf). I don't know how to support negation in an event stream but I don't need it for now.

Thanks,
Sameer


On Tue, Aug 9, 2016 at 3:45 PM, M Singh <[hidden email]> wrote:
Hi Sameer:

If we use a within window for event series - 

1. Does it interfere with the default time windows ?
2. How does it affect snapshotting ?  
3. If the window is too large are the events stored in a "processor" for the window to expire ?
4. Are there any other know limitations and best practices of using CEP with Flink ?

Thanks again for your help.



On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <[hidden email]> wrote:


In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans









Reply | Threaded
Open this post in threaded view
|

Re: Flink : CEP processing

Aljoscha Krettek
Hi,
Sameet is right about the snapshotting. The CEP operator behaves more or less like a FlatMap operator that keeps some more complex state internally. Snapshotting works the same as with any other operator.

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 00:54 Sameer W <[hidden email]> wrote:
Mans,

I think at this time we need someone who knows the internal implementation to answer definitively-

My understanding is-

1. Internally CEP is like a map operator with session-like semantics operating in a pipeline. You could do what it does but you would have to implement all that. If you need support for negation today that is probably how you would do it.
2. Ultimately CEP produces a stream which you need to write to some sink. If you sink supports exactly-once semantics then your pipeline will support it. So I think snapshotting with CEP would be no different. If you send out events (alerts) from within your select(PatternSelectFunction) then yes, you could "send" your alerts multiple times. If instead you wrote to a sink (with exactly once semantics) which then sent alerts out in the real world you should not get those multiple alerts. I am sending alerts from within my PatternSelectFunction. So I am taking the chance of sending alerts twice which is ok for my use-case.

I am operating under the belief (which seems logical to me) that CEP is like a stateful map operator at end of my processing pipeline. Snapshotting would work exactly like it would in that case in CEP.

Thanks,
Sameer


On Wed, Aug 10, 2016 at 2:42 PM, M Singh <[hidden email]> wrote:
Thanks for the pointers Sameer.


The reason I wanted to find out about snapshotting with CEP is because I thought that CEP state might also be snapshotted for recovery. If that is the case, then there are events in the CEP might be in two snapshots.

Mans


On Tuesday, August 9, 2016 1:15 PM, Sameer W <[hidden email]> wrote:


In one of the earlier thread Till explained this to me (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html)

1. Within does not use time windows. It sort of uses session windows where the session begins when the first event of the pattern is identified. The timer starts when the "first" event in the pattern fires. If the pattern completes "within" the designated times (meaning the "next" and "followed by" fire as will "within" the time specified) you have a match or else the window is removed. I don't know how it is implemented but I doubt it stores all the events in memory for the "within" window (there is not need to). It will only store the relevant events (first, next, followed by, etc). So memory would not be an issue here. If two "first" type events are identified I think two "within" sessions are created.

2. Snapshotting (I don't know much in this area so I cannot answer). Why should it be different though? You are using operators and state. It should work the same way. But I am not too familiar with that.

3. The "Within" window is not an issue. Even the window preceding that should not be unless you are using WindowFunction (more memory friendly alternative is https://ci.apache.org/ projects/flink/flink-docs- master/apis/streaming/windows. html#window-functions ) by themselves and using a really large window

4. The way I am using it, it is working fine. Some of the limitations I have seen are related to this paper not being fully implemented (https://people.cs.umass.edu/ ~yanlei/publications/sase- sigmod08.pdf). I don't know how to support negation in an event stream but I don't need it for now.

Thanks,
Sameer


On Tue, Aug 9, 2016 at 3:45 PM, M Singh <[hidden email]> wrote:
Hi Sameer:

If we use a within window for event series - 

1. Does it interfere with the default time windows ?
2. How does it affect snapshotting ?  
3. If the window is too large are the events stored in a "processor" for the window to expire ?
4. Are there any other know limitations and best practices of using CEP with Flink ?

Thanks again for your help.



On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar <[hidden email]> wrote:


In that case you need to get them into one stream somehow (keyBy a dummy value for example). There is always some logical key to keyBy on when data is arriving from multiple sources (ex some portion of the time stamp). 

You are looking for patterns within something (events happening around the same time but arriving from multiple devices). That something should be the key. That's how I am using it. 

Sameer

Sent from my iPhone

On Aug 9, 2016, at 1:40 PM, M Singh <[hidden email]> wrote:

Thanks Sameer.

So does that mean that if the events keys are not same we cannot use the CEP pattern match ?  What if events are coming from different sources and need to be correlated ?

Mans


On Tuesday, August 9, 2016 9:40 AM, Sameer W <[hidden email]> wrote:


Hi,

You will need to use keyBy operation first to get all the events you need monitored in a pattern on the same node. Only then can you apply Pattern because it depends on the order of the events (first, next, followed by). I even had to make sure that the events were correctly sorted by timestamps to ensure that the first,next and followed by works correctly.

Sameer

On Tue, Aug 9, 2016 at 12:17 PM, M Singh <[hidden email]> wrote:
Hey Folks:

I have a question about CEP processing in Flink - How does flink processing work when we have multiple partitions in which the events used in the pattern sequence might be scattered across multiple partitions on multiple nodes ?

Thanks for your insight.

Mans