FlinkCEP behaviour with time constraints not as expected

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkCEP behaviour with time constraints not as expected

Federico D'Ambrosio
Hi everyone,

I wanted to ask if FlinkCEP in the following scenario is working as it should, or I have misunderstood its functioning.

I've got a keyedstream associated with the following pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value >=100).within(Time.minutes(30))

Considering a single key in the stream, for simplicity, I've got the following sequence of events (using EventTime on the "time" field of the json event):

{value: 100, time: "2017-11-05 03:50:02.000"}
{value: 100, time: "2017-11-05 03:52:02.000"}
{value: 100, time: "2017-11-05 03:54:02.000"}
{value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30 minutes from the first event
{value: 100, time: "2017-11-05 06:00:02.000"}

Now, when it comes to the select/flatselect function, I tried printing the content of the pattern map and what I noticed is that, for example, the first 2 events weren't considered in the same pattern as the map was like the following:

{start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
{start=[{value: 100, time: 2017-11-05 03:52:02.000}]}

Now, shouldn't they be in the same List, as they belong to the same iterative pattern, defined with the oneOrMore clause?

Thank you for your insight,
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP behaviour with time constraints not as expected

Ufuk Celebi
Hey Frederico,

let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
the expected behaviour here.

Best,

Ufuk


On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
<[hidden email]> wrote:

> Hi everyone,
>
> I wanted to ask if FlinkCEP in the following scenario is working as it
> should, or I have misunderstood its functioning.
>
> I've got a keyedstream associated with the following pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value >=100).within(Time.minutes(30))
>
> Considering a single key in the stream, for simplicity, I've got the
> following sequence of events (using EventTime on the "time" field of the
> json event):
>
> {value: 100, time: "2017-11-05 03:50:02.000"}
> {value: 100, time: "2017-11-05 03:52:02.000"}
> {value: 100, time: "2017-11-05 03:54:02.000"}
> {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> minutes from the first event
> {value: 100, time: "2017-11-05 06:00:02.000"}
>
> Now, when it comes to the select/flatselect function, I tried printing the
> content of the pattern map and what I noticed is that, for example, the
> first 2 events weren't considered in the same pattern as the map was like
> the following:
>
> {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
>
> Now, shouldn't they be in the same List, as they belong to the same
> iterative pattern, defined with the oneOrMore clause?
>
> Thank you for your insight,
> Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP behaviour with time constraints not as expected

Dawid Wysakowicz
Hi Federico,

For your given input and pattern there should (and there are) only two timeouted patterns:

5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))

It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100).

But I will try to explain how it works with the same input for Pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value <100).within(Time.minutes(30))

Then we have matches:

5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))

and timeouted partial matches:

5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))

Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those.

The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked).

Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce.

Regards,
Dawid

2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Frederico,

let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
the expected behaviour here.

Best,

Ufuk


On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
<[hidden email]> wrote:
> Hi everyone,
>
> I wanted to ask if FlinkCEP in the following scenario is working as it
> should, or I have misunderstood its functioning.
>
> I've got a keyedstream associated with the following pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value >=100).within(Time.minutes(30))
>
> Considering a single key in the stream, for simplicity, I've got the
> following sequence of events (using EventTime on the "time" field of the
> json event):
>
> {value: 100, time: "2017-11-05 03:50:02.000"}
> {value: 100, time: "2017-11-05 03:52:02.000"}
> {value: 100, time: "2017-11-05 03:54:02.000"}
> {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> minutes from the first event
> {value: 100, time: "2017-11-05 06:00:02.000"}
>
> Now, when it comes to the select/flatselect function, I tried printing the
> content of the pattern map and what I noticed is that, for example, the
> first 2 events weren't considered in the same pattern as the map was like
> the following:
>
> {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
>
> Now, shouldn't they be in the same List, as they belong to the same
> iterative pattern, defined with the oneOrMore clause?
>
> Thank you for your insight,
> Federico D'Ambrosio

Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP behaviour with time constraints not as expected

Federico D'Ambrosio
Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches. 

I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation, there is the following code:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
    pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

where result would then be used to get outputtag side output.
If I paste this code I get that the select function is missing its parameters ("Unspecified value parameters: patternSelectFunction: PatternSelectFunction[ComplexEvent, NotInferredR]""), 
while, If I add the parameters explicitly such as

patternStream.select[TimeoutEvent, ComplexEvent]

I get "Too many arguments for select". Am I missing something?

Thank you very much,
Federico

2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <[hidden email]>:
Hi Federico,

For your given input and pattern there should (and there are) only two timeouted patterns:

5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))

It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100).

But I will try to explain how it works with the same input for Pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value <100).within(Time.minutes(30))

Then we have matches:

5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))

and timeouted partial matches:

5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))

Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those.

The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked).

Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce.

Regards,
Dawid

2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Frederico,

let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
the expected behaviour here.

Best,

Ufuk


On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
<[hidden email]> wrote:
> Hi everyone,
>
> I wanted to ask if FlinkCEP in the following scenario is working as it
> should, or I have misunderstood its functioning.
>
> I've got a keyedstream associated with the following pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value >=100).within(Time.minutes(30))
>
> Considering a single key in the stream, for simplicity, I've got the
> following sequence of events (using EventTime on the "time" field of the
> json event):
>
> {value: 100, time: "2017-11-05 03:50:02.000"}
> {value: 100, time: "2017-11-05 03:52:02.000"}
> {value: 100, time: "2017-11-05 03:54:02.000"}
> {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> minutes from the first event
> {value: 100, time: "2017-11-05 06:00:02.000"}
>
> Now, when it comes to the select/flatselect function, I tried printing the
> content of the pattern map and what I noticed is that, for example, the
> first 2 events weren't considered in the same pattern as the map was like
> the following:
>
> {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
>
> Now, shouldn't they be in the same List, as they belong to the same
> iterative pattern, defined with the oneOrMore clause?
>
> Thank you for your insight,
> Federico D'Ambrosio




--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP behaviour with time constraints not as expected

Dawid Wysakowicz
Unforunately there is mistake in the docs the return type should be DataStream rather than SingleOuputStream

The correct version should be:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
        (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
        pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

This syntax is only available in 1.4 though, in previous versions timeouted events were not returned via sideOutput.



> On 8 Nov 2017, at 12:18, Federico D'Ambrosio <[hidden email]> wrote:
>
> Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches.
>
> I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation, there is the following code:
>
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
>
>
>
> val outputTag = OutputTag[String]("side-output")
>
>
>
> val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
>
>
> (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
> } {
>
>
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
>
> where result would then be used to get outputtag side output.
> If I paste this code I get that the select function is missing its parameters ("Unspecified value parameters: patternSelectFunction: PatternSelectFunction[ComplexEvent, NotInferredR]""),
> while, If I add the parameters explicitly such as
>
> patternStream.select[TimeoutEvent, ComplexEvent]
>
> I get "Too many arguments for select". Am I missing something?
>
> Thank you very much,
> Federico
>
> 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <[hidden email]>:
> Hi Federico,
>
> For your given input and pattern there should (and there are) only two timeouted patterns:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
>
> It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100).
>
> But I will try to explain how it works with the same input for Pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
>
> Then we have matches:
>
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))
>
> and timeouted partial matches:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
>
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those.
>
> The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked).
>
> Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce.
>
> Regards,
> Dawid
>
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>:
> Hey Frederico,
>
> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> the expected behaviour here.
>
> Best,
>
> Ufuk
>
>
> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
> <[hidden email]> wrote:
> > Hi everyone,
> >
> > I wanted to ask if FlinkCEP in the following scenario is working as it
> > should, or I have misunderstood its functioning.
> >
> > I've got a keyedstream associated with the following pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
> >
> > Considering a single key in the stream, for simplicity, I've got the
> > following sequence of events (using EventTime on the "time" field of the
> > json event):
> >
> > {value: 100, time: "2017-11-05 03:50:02.000"}
> > {value: 100, time: "2017-11-05 03:52:02.000"}
> > {value: 100, time: "2017-11-05 03:54:02.000"}
> > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> > minutes from the first event
> > {value: 100, time: "2017-11-05 06:00:02.000"}
> >
> > Now, when it comes to the select/flatselect function, I tried printing the
> > content of the pattern map and what I noticed is that, for example, the
> > first 2 events weren't considered in the same pattern as the map was like
> > the following:
> >
> > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
> >
> > Now, shouldn't they be in the same List, as they belong to the same
> > iterative pattern, defined with the oneOrMore clause?
> >
> > Thank you for your insight,
> > Federico D'Ambrosio
>
>
>
>
> --
> Federico D'Ambrosio


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP behaviour with time constraints not as expected

Federico D'Ambrosio
Thank you very much, that was really helpful

Cheers,
Federico

2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz <[hidden email]>:
Unforunately there is mistake in the docs the return type should be DataStream rather than SingleOuputStream

The correct version should be:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
        (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
        pattern: Map[String, Iterable[Event]] => ComplexEvent()
}

This syntax is only available in 1.4 though, in previous versions timeouted events were not returned via sideOutput.



> On 8 Nov 2017, at 12:18, Federico D'Ambrosio <[hidden email]> wrote:
>
> Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches.
>
> I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation, there is the following code:
>
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
>
>
>
> val outputTag = OutputTag[String]("side-output")
>
>
>
> val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
>
>
> (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
> } {
>
>
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
>
> where result would then be used to get outputtag side output.
> If I paste this code I get that the select function is missing its parameters ("Unspecified value parameters: patternSelectFunction: PatternSelectFunction[ComplexEvent, NotInferredR]""),
> while, If I add the parameters explicitly such as
>
> patternStream.select[TimeoutEvent, ComplexEvent]
>
> I get "Too many arguments for select". Am I missing something?
>
> Thank you very much,
> Federico
>
> 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <[hidden email]>:
> Hi Federico,
>
> For your given input and pattern there should (and there are) only two timeouted patterns:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
>
> It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100).
>
> But I will try to explain how it works with the same input for Pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
>
> Then we have matches:
>
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))
>
> and timeouted partial matches:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
>
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those.
>
> The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked).
>
> Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce.
>
> Regards,
> Dawid
>
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>:
> Hey Frederico,
>
> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> the expected behaviour here.
>
> Best,
>
> Ufuk
>
>
> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
> <[hidden email]> wrote:
> > Hi everyone,
> >
> > I wanted to ask if FlinkCEP in the following scenario is working as it
> > should, or I have misunderstood its functioning.
> >
> > I've got a keyedstream associated with the following pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
> >
> > Considering a single key in the stream, for simplicity, I've got the
> > following sequence of events (using EventTime on the "time" field of the
> > json event):
> >
> > {value: 100, time: "2017-11-05 03:50:02.000"}
> > {value: 100, time: "2017-11-05 03:52:02.000"}
> > {value: 100, time: "2017-11-05 03:54:02.000"}
> > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30
> > minutes from the first event
> > {value: 100, time: "2017-11-05 06:00:02.000"}
> >
> > Now, when it comes to the select/flatselect function, I tried printing the
> > content of the pattern map and what I noticed is that, for example, the
> > first 2 events weren't considered in the same pattern as the map was like
> > the following:
> >
> > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
> >
> > Now, shouldn't they be in the same List, as they belong to the same
> > iterative pattern, defined with the oneOrMore clause?
> >
> > Thank you for your insight,
> > Federico D'Ambrosio
>
>
>
>
> --
> Federico D'Ambrosio




--
Federico D'Ambrosio