Flink CEP Pattern Matching

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP Pattern Matching

Jerry Lam
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Fabian Hueske-2
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Vitor Vieira
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry


Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Till Rohrmann-2

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry



Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Vitor Vieira
Hi Till,

Idk if the windowing package should provide functions to operate on the internal elements.

What is the easiest way, or is it possible to get, for example, the last event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <[hidden email]>:

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry




Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Till Rohrmann
Hi Vitor,

the CEP operators are not working on real windows. What they do is to use a NFA to track the state of multiple ongoing sequences. In order to store the element efficiently, a kind of shared buffer with versioning is used. Once a sequence has reached a final state, the sequence of elements is backtracked in the shared buffer to produce the final result. 

So in order to get access to the previous elements of a non-finished sequence, we could simply apply the same mechanism just without removing the sequence from the shared buffer. This would of course be a bit more costly since for every state you retrieve the sequence of elements which led to this state.

But we could offer two filter conditions. One which is more light-weight and only offers access to the current element. And another filter condition where you have access to the previous elements. The second variant might make sense if you can prune early many false sequences.

Cheers,
Till

On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <[hidden email]> wrote:
Hi Till,

Idk if the windowing package should provide functions to operate on the internal elements.

What is the easiest way, or is it possible to get, for example, the last event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <[hidden email]>:

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry





Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Jerry Lam
Hi Till,

The idea of having CEP functionalities in Flink is very exciting. I really appreciate your work on this. 
Will you consider in the future adding the similar functionalities described in this standard (http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)? This document describes a lot of use cases that are very interesting for CEP applications. I have experience with Esper and WSO2 Siddhi. They provide subset of the functionalities described in the standard. 

Having this pattern matching CEP functionality in Flink is a killing feature IMHO.

Best Regards,

Jerry


On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <[hidden email]> wrote:
Hi Vitor,

the CEP operators are not working on real windows. What they do is to use a NFA to track the state of multiple ongoing sequences. In order to store the element efficiently, a kind of shared buffer with versioning is used. Once a sequence has reached a final state, the sequence of elements is backtracked in the shared buffer to produce the final result. 

So in order to get access to the previous elements of a non-finished sequence, we could simply apply the same mechanism just without removing the sequence from the shared buffer. This would of course be a bit more costly since for every state you retrieve the sequence of elements which led to this state.

But we could offer two filter conditions. One which is more light-weight and only offers access to the current element. And another filter condition where you have access to the previous elements. The second variant might make sense if you can prune early many false sequences.

Cheers,
Till

On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <[hidden email]> wrote:
Hi Till,

Idk if the windowing package should provide functions to operate on the internal elements.

What is the easiest way, or is it possible to get, for example, the last event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <[hidden email]>:

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry






Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Vitor Vieira
I believe that most of functionalities regarding transformation and projection of windowed events will only be implemented in the next releases.

I'm looking forward to contribute!


2016-03-03 15:29 GMT-03:00 Jerry Lam <[hidden email]>:
Hi Till,

The idea of having CEP functionalities in Flink is very exciting. I really appreciate your work on this. 
Will you consider in the future adding the similar functionalities described in this standard (http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)? This document describes a lot of use cases that are very interesting for CEP applications. I have experience with Esper and WSO2 Siddhi. They provide subset of the functionalities described in the standard. 

Having this pattern matching CEP functionality in Flink is a killing feature IMHO.

Best Regards,

Jerry


On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <[hidden email]> wrote:
Hi Vitor,

the CEP operators are not working on real windows. What they do is to use a NFA to track the state of multiple ongoing sequences. In order to store the element efficiently, a kind of shared buffer with versioning is used. Once a sequence has reached a final state, the sequence of elements is backtracked in the shared buffer to produce the final result. 

So in order to get access to the previous elements of a non-finished sequence, we could simply apply the same mechanism just without removing the sequence from the shared buffer. This would of course be a bit more costly since for every state you retrieve the sequence of elements which led to this state.

But we could offer two filter conditions. One which is more light-weight and only offers access to the current element. And another filter condition where you have access to the previous elements. The second variant might make sense if you can prune early many false sequences.

Cheers,
Till

On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <[hidden email]> wrote:
Hi Till,

Idk if the windowing package should provide functions to operate on the internal elements.

What is the easiest way, or is it possible to get, for example, the last event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <[hidden email]>:

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry







Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP Pattern Matching

Till Rohrmann
Hi Jerry and Vitor,

sorry for my late reply, but I was on vacation last week.

I think that Flink's CEP library should indeed head function-wise in the direction of the standard you've linked. The next steps would be to enrich the expressiveness of the pattern language to support regular expression like patterns. Luckily, the infrastructure for that is already in place and most of it must only be exposed to the user. Furthermore, we should allow more complex filter expressions/conditions which can reference previous elements as well.

Concerning the DSL I'm not so sure yet how we should expose it to the user. At the moment we only have the Java API. But our vision is to tightly integrate it with Flink's streaming SQL. This means that you will be able to query your streaming data SQL-like, and to specify additional temporal patterns the query has to fulfil. But since the streaming SQL implementation is still an ongoing effort, I cannot really tell how the query language for event patterns will look like. The standard is a good starting point, though.

There are definitely more than enough possibilities to start contributing if you like to get involved in Flink's CEP library. Here is a list [1] of open issues but the list is by far not exhaustive. So if you have more ideas for new features, feel free to add them.


Cheers,
Till


On Thu, Mar 3, 2016 at 8:22 PM, Vitor Vieira <[hidden email]> wrote:
I believe that most of functionalities regarding transformation and projection of windowed events will only be implemented in the next releases.

I'm looking forward to contribute!


2016-03-03 15:29 GMT-03:00 Jerry Lam <[hidden email]>:
Hi Till,

The idea of having CEP functionalities in Flink is very exciting. I really appreciate your work on this. 
Will you consider in the future adding the similar functionalities described in this standard (http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)? This document describes a lot of use cases that are very interesting for CEP applications. I have experience with Esper and WSO2 Siddhi. They provide subset of the functionalities described in the standard. 

Having this pattern matching CEP functionality in Flink is a killing feature IMHO.

Best Regards,

Jerry


On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <[hidden email]> wrote:
Hi Vitor,

the CEP operators are not working on real windows. What they do is to use a NFA to track the state of multiple ongoing sequences. In order to store the element efficiently, a kind of shared buffer with versioning is used. Once a sequence has reached a final state, the sequence of elements is backtracked in the shared buffer to produce the final result. 

So in order to get access to the previous elements of a non-finished sequence, we could simply apply the same mechanism just without removing the sequence from the shared buffer. This would of course be a bit more costly since for every state you retrieve the sequence of elements which led to this state.

But we could offer two filter conditions. One which is more light-weight and only offers access to the current element. And another filter condition where you have access to the previous elements. The second variant might make sense if you can prune early many false sequences.

Cheers,
Till

On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <[hidden email]> wrote:
Hi Till,

Idk if the windowing package should provide functions to operate on the internal elements.

What is the easiest way, or is it possible to get, for example, the last event of a window, lets say a 5 second window?

Rgds,

Vitor Vieira
@notvitor

2016-03-03 7:29 GMT-03:00 Till Rohrmann <[hidden email]>:

Hi Jerry,

at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current means:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till


On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[hidden email]> wrote:
Hi Jerry,

I'm currently evaluating the CEP library too, probably doing something similar.

Something like... comparing the 'offset' of the last event in different time windows, each window, based on the event type, occurring like realtime, with this same day/hour/minute a week ago/15d/1month/etc...

I plan to share some CEP examples once I finish this engine.

-@notvitor


2016-03-02 19:28 GMT-03:00 Fabian Hueske <[hidden email]>:
Hi Jerry,

I haven't used the CEP features yet, so I cannot comment on your requirements.
In case you are looking for the CEP documentation, here it is:

--> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html

The CEP features will be included in the upcoming 1.0.0 release (which we currently vote on).
I think you would be one of the first persons to use it. Please let us know, if you find any problems.

Thanks, Fabian


2016-03-02 23:12 GMT+01:00 Jerry Lam <[hidden email]>:
Hi Flink users and developers,

I'm trying to learn the CEP library. How can I express A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to get a hold of is the events that matches A when I'm processing B. 

Is this supported?

Best Regards,

Jerry