Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as... device_id,trigger_id,event_time,messageId 1,START,1520433909396,1 1,TRACKING,1520433914398,2 1,TRACKING,1520433919398,3 1,STOP,1520433924398,4 1,START,1520433929398,5 1,TRACKING,1520433934399,6 1,TRACKING,1520433939399,7 1,TRACKING,1520433944399,8 1,STOP,1520433949399,9 Where trigger_id can be an indicator such as: start,tracking,stop. What I would like to do is based on device_id group all incoming events and define a window based on the trigger_id. I.e group all events from start until stop and then do some calculations such as: average,max etc. I call env.readTextFile("events.csv"); and set StreamTimeCharacteristic to EventTime. Parellism is set to 4. This means my messages from the source file are read but are not in order... (I have added messageId as an counter only for dev purposes) I have defined a custom trigger to find stop events and fire in order to evict all collected events. My main problem is that if parellism is increased from 1 the input source reads these out of order. Shouldn't event time and watermarks resolve this issue? How do i handle possible out of order events? public class SimpleEventJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(4); //env.setParallelism(1); DataStream<String> input = env.readTextFile("events.csv"); // create event stream DataStream<Event> events = input.map(new LineToEvent()); DataStream<Event> waterMarkedStreams = events.assignTimestampsAndWatermarks(new EventWAssigner()); DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId") .window(GlobalWindows.create()) .trigger(new TripTrigger()) .evictor(new TripEvictor()) .apply(new CreateTrip()); tripStream.print(); // execute program env.execute("Flink Streaming Java API Skeleton"); } public static class TripTrigger extends Trigger<Event, GlobalWindow> { @Override public TriggerResult onElement(Event event, long timestamp, GlobalWindow window, TriggerContext context) throws Exception { // if this is a stop event, set a timer if (event.getTrigger() == 53) { return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) { } } private static class TripEvictor implements Evictor<Event, GlobalWindow> { @Override public void evictBefore(Iterable<TimestampedValue<Event>> events, int size, GlobalWindow window, EvictorContext ctx) { } @Override public void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, GlobalWindow window, EvictorContext ctx) { System.out.println(elements); long firstStop = Event.earliestStopElement(elements); // remove all events up to (and including) the first stop event (which is the event that triggered the window) for (Iterator<TimestampedValue<Event>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Event> element = iterator.next(); if (element.getTimestamp() >= firstStop ) { iterator.remove(); } } } } public static class CreateTrip implements WindowFunction<Event, TripEv, Tuple, GlobalWindow> { @Override public void apply(Tuple key, GlobalWindow window, Iterable<Event> events, Collector<TripEv> out) { TripEv trp = new TripEv(events); if (trp.length > 0) { out.collect(trp); } } } private static class LineToEvent implements MapFunction<String, Event> { @Override public Event map(String line) throws Exception { return Event.fromString(line); } } private static class EventWAssigner implements AssignerWithPunctuatedWatermarks<Event> { @Override public long extractTimestamp(Event event, long previousElementTimestamp) { return event.getTimestamp(); } @Override public Watermark checkAndGetNextWatermark(Event event, long extractedTimestamp) { // simply emit a watermark with every event return new Watermark(extractedTimestamp - 1000L); } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hello,
Event-time and watermarks can be used to deal with out-of-order events, but since you're using global windows (opposed to time-based windows) you have to implement the logic for doing this yourself. Conceptually, what you would have to do is to not create your TripEv when receiving a STOP event, but when a given amount of time has passed after you received it. Your CreateTrip function would have to scan the window contents for START -> STOP sequences and check the timestamp of STOP with the current event time. (I think for this you would have to extend ProcessWindowFunction instead) The evictor would have to use the same logic to detect sequences that were already processed. I suggest to look into our CEP library as this looks like a perfect use-case for it. On 12.03.2018 14:37, dim5b wrote: Could someone clarify how exactly event time/watermarks and allow lateness work. I have created the program below and I have an input file such as... device_id,trigger_id,event_time,messageId 1,START,1520433909396,1 1,TRACKING,1520433914398,2 1,TRACKING,1520433919398,3 1,STOP,1520433924398,4 1,START,1520433929398,5 1,TRACKING,1520433934399,6 1,TRACKING,1520433939399,7 1,TRACKING,1520433944399,8 1,STOP,1520433949399,9 Where trigger_id can be an indicator such as: start,tracking,stop. What I would like to do is based on device_id group all incoming events and define a window based on the trigger_id. I.e group all events from start until stop and then do some calculations such as: average,max etc. I call env.readTextFile("events.csv"); and set StreamTimeCharacteristic to EventTime. Parellism is set to 4. This means my messages from the source file are read but are not in order... (I have added messageId as an counter only for dev purposes) I have defined a custom trigger to find stop events and fire in order to evict all collected events. My main problem is that if parellism is increased from 1 the input source reads these out of order. Shouldn't event time and watermarks resolve this issue? How do i handle possible out of order events? public class SimpleEventJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(4); //env.setParallelism(1); DataStream<String> input = env.readTextFile("events.csv"); // create event stream DataStream<Event> events = input.map(new LineToEvent()); DataStream<Event> waterMarkedStreams = events.assignTimestampsAndWatermarks(new EventWAssigner()); DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId") .window(GlobalWindows.create()) .trigger(new TripTrigger()) .evictor(new TripEvictor()) .apply(new CreateTrip()); tripStream.print(); // execute program env.execute("Flink Streaming Java API Skeleton"); } public static class TripTrigger extends Trigger<Event, GlobalWindow> { @Override public TriggerResult onElement(Event event, long timestamp, GlobalWindow window, TriggerContext context) throws Exception { // if this is a stop event, set a timer if (event.getTrigger() == 53) { return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) { } } private static class TripEvictor implements Evictor<Event, GlobalWindow> { @Override public void evictBefore(Iterable<TimestampedValue<Event>> events, int size, GlobalWindow window, EvictorContext ctx) { } @Override public void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, GlobalWindow window, EvictorContext ctx) { System.out.println(elements); long firstStop = Event.earliestStopElement(elements); // remove all events up to (and including) the first stop event (which is the event that triggered the window) for (Iterator<TimestampedValue<Event>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Event> element = iterator.next(); if (element.getTimestamp() >= firstStop ) { iterator.remove(); } } } } public static class CreateTrip implements WindowFunction<Event, TripEv, Tuple, GlobalWindow> { @Override public void apply(Tuple key, GlobalWindow window, Iterable<Event> events, Collector<TripEv> out) { TripEv trp = new TripEv(events); if (trp.length > 0) { out.collect(trp); } } } private static class LineToEvent implements MapFunction<String, Event> { @Override public Event map(String line) throws Exception { return Event.fromString(line); } } private static class EventWAssigner implements AssignerWithPunctuatedWatermarks<Event> { @Override public long extractTimestamp(Event event, long previousElementTimestamp) { return event.getTimestamp(); } @Override public Watermark checkAndGetNextWatermark(Event event, long extractedTimestamp) { // simply emit a watermark with every event return new Watermark(extractedTimestamp - 1000L); } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
I have looked into the CEP library. I have posted an issued on
stackoverflow. https://stackoverflow.com/questions/49047879/global-windows-in-flink-using-custom-triggers-vs-flink-cep-pattern-api However the pattern matches all possible solution on the stream of events.Does pattern have a notion of Evictor? How can you keep only the specific set of events. This question has not been answered and there seems to be a similar question https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I see you replied on
https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching with a known bug issue on the https://issues.apache.org/jira/browse/FLINK-8914 In my case my pattern looks like Pattern<Event, Event> tripPattern = Pattern.<Event>begin("start").times(1).where(START_CONDITION) .followedBy("middle").where(MIDDLE_CONDITION).oneOrMore() .next("end").where(END_CONDITION); the end result being.. 1> 1,2,3,4, 4> 1,2,3,6,7,8,11,12, 1> 5,6,7,8,11,12, 3> 5,6,7,8,9, 2> 1,2,3,6,7,8,9, 2> 10,11,12, By using next operator I limit the end-terminating side but begin takes all possible solutions. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
By adding , AfterMatchSkipStrategy.skipPastLastEvent() it returns what i
want. Is there a way to track/emit "ongoing" events i.e before the pattern matchs the end event type? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |