Hi everyone, I wanted to ask if FlinkCEP in the following scenario is working as it should, or I have misunderstood its functioning.{value: 100, time: "2017-11-05 03:52:02.000"} {value: 100, time: "2017-11-05 03:54:02.000"} {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30 minutes from the first event {value: 100, time: "2017-11-05 06:00:02.000"} Now, when it comes to the select/flatselect function, I tried printing the content of the pattern map and what I noticed is that, for example, the first 2 events weren't considered in the same pattern as the map was like the following: {start=[{value: 100, time: 2017-11-05 03:50:02.000}]} {start=[{value: 100, time: 2017-11-05 03:52:02.000}]} Now, shouldn't they be in the same List, as they belong to the same iterative pattern, defined with the oneOrMore clause? Thank you for your insight, Federico D'Ambrosio |
Hey Frederico,
let me pull in Dawid (cc'd) who works on CEP. He can probably clarify the expected behaviour here. Best, Ufuk On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio <[hidden email]> wrote: > Hi everyone, > > I wanted to ask if FlinkCEP in the following scenario is working as it > should, or I have misunderstood its functioning. > > I've got a keyedstream associated with the following pattern: > > Pattern[Event].begin("start").where(_.value >=100).oneOrMore > .notNext("end").where(_.value >=100).within(Time.minutes(30)) > > Considering a single key in the stream, for simplicity, I've got the > following sequence of events (using EventTime on the "time" field of the > json event): > > {value: 100, time: "2017-11-05 03:50:02.000"} > {value: 100, time: "2017-11-05 03:52:02.000"} > {value: 100, time: "2017-11-05 03:54:02.000"} > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30 > minutes from the first event > {value: 100, time: "2017-11-05 06:00:02.000"} > > Now, when it comes to the select/flatselect function, I tried printing the > content of the pattern map and what I noticed is that, for example, the > first 2 events weren't considered in the same pattern as the map was like > the following: > > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]} > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]} > > Now, shouldn't they be in the same List, as they belong to the same > iterative pattern, defined with the oneOrMore clause? > > Thank you for your insight, > Federico D'Ambrosio |
Hi Federico,
For your given input and pattern there should (and there are) only two timeouted patterns: 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02)))) 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02)))) It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100). But I will try to explain how it works with the same input for Pattern: Pattern[Event].begin("start"). .notNext("end").where(_.value <100).within(Time.minutes(30)Then we have matches: 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02)))) 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)))) 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02)))) 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02)))) 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02)))) 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02)))) and timeouted partial matches: 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02)))) 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02)))) Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02. Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those. The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked). Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce. Regards, Dawid 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>: Hey Frederico, |
Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches.
2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <[hidden email]>:
-- Federico D'Ambrosio |
Unforunately there is mistake in the docs the return type should be DataStream rather than SingleOuputStream
The correct version should be: val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val outputTag = OutputTag[String]("side-output") val result: DataStream[ComplexEvent] = patternStream.select(outputTag){ (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent() } { pattern: Map[String, Iterable[Event]] => ComplexEvent() } This syntax is only available in 1.4 though, in previous versions timeouted events were not returned via sideOutput. > On 8 Nov 2017, at 12:18, Federico D'Ambrosio <[hidden email]> wrote: > > Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches. > > I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation, there is the following code: > > val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) > > > > val outputTag = OutputTag[String]("side-output") > > > > val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){ > > > (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent() > } { > > > pattern: Map[String, Iterable[Event]] => ComplexEvent() > } > > where result would then be used to get outputtag side output. > If I paste this code I get that the select function is missing its parameters ("Unspecified value parameters: patternSelectFunction: PatternSelectFunction[ComplexEvent, NotInferredR]""), > while, If I add the parameters explicitly such as > > patternStream.select[TimeoutEvent, ComplexEvent] > > I get "Too many arguments for select". Am I missing something? > > Thank you very much, > Federico > > 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <[hidden email]>: > Hi Federico, > > For your given input and pattern there should (and there are) only two timeouted patterns: > > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02)))) > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02)))) > > It is because in your patterns say the next event after events with value >=100 should not have value >= 100 . And within your timeout there is no sequence of events where (>=100)+ (<100). > > But I will try to explain how it works with the same input for Pattern: > > Pattern[Event].begin("start").where(_.value >=100).oneOrMore > .notNext("end").where(_.value <100).within(Time.minutes(30)) > > Then we have matches: > > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02)))) > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)))) > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02)))) > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02)))) > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02)))) > 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02)))) > > and timeouted partial matches: > > 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) > 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) > 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02)))) > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02)))) > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02)))) > > Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you will be able to specify AFTER_MATCH_SKIP strategy see: https://issues.apache.org/jira/browse/FLINK-7169), therefore you see matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02, 2017-11-05T03:54:02. > Also right now the oneOrMore is not greedy (in 1.4 you will be able to alter it see: https://issues.apache.org/jira/browse/FLINK-7147), therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02)) rather than only one of those. > > The timeoute partial matches are returned because within the timeout there was no event with value <100 (in fact there was no event at all to be checked). > > Hope this "study" helps you understand the behaviour. If you feel I missed something, please provide some example I could reproduce. > > Regards, > Dawid > > 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <[hidden email]>: > Hey Frederico, > > let me pull in Dawid (cc'd) who works on CEP. He can probably clarify > the expected behaviour here. > > Best, > > Ufuk > > > On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio > <[hidden email]> wrote: > > Hi everyone, > > > > I wanted to ask if FlinkCEP in the following scenario is working as it > > should, or I have misunderstood its functioning. > > > > I've got a keyedstream associated with the following pattern: > > > > Pattern[Event].begin("start").where(_.value >=100).oneOrMore > > .notNext("end").where(_.value >=100).within(Time.minutes(30)) > > > > Considering a single key in the stream, for simplicity, I've got the > > following sequence of events (using EventTime on the "time" field of the > > json event): > > > > {value: 100, time: "2017-11-05 03:50:02.000"} > > {value: 100, time: "2017-11-05 03:52:02.000"} > > {value: 100, time: "2017-11-05 03:54:02.000"} > > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the 30 > > minutes from the first event > > {value: 100, time: "2017-11-05 06:00:02.000"} > > > > Now, when it comes to the select/flatselect function, I tried printing the > > content of the pattern map and what I noticed is that, for example, the > > first 2 events weren't considered in the same pattern as the map was like > > the following: > > > > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]} > > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]} > > > > Now, shouldn't they be in the same List, as they belong to the same > > iterative pattern, defined with the oneOrMore clause? > > > > Thank you for your insight, > > Federico D'Ambrosio > > > > > -- > Federico D'Ambrosio signature.asc (849 bytes) Download Attachment |
Thank you very much, that was really helpful Cheers,2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz <[hidden email]>: Unforunately there is mistake in the docs the return type should be DataStream rather than SingleOuputStream -- Federico D'Ambrosio |
Free forum by Nabble | Edit this page |