Global Window, Trigger and Watermarks, Parallelism

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Global Window, Trigger and Watermarks, Parallelism

dim5b
Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...

  device_id,trigger_id,event_time,messageId
    1,START,1520433909396,1
    1,TRACKING,1520433914398,2
    1,TRACKING,1520433919398,3
    1,STOP,1520433924398,4
    1,START,1520433929398,5
    1,TRACKING,1520433934399,6
    1,TRACKING,1520433939399,7
    1,TRACKING,1520433944399,8
    1,STOP,1520433949399,9

Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.

I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)

I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.

Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?

public class SimpleEventJob {

        public static void main(String[] args) throws Exception {
                // set up the streaming execution environment
                final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.setParallelism(4);
        //env.setParallelism(1);
                DataStream<String> input = env.readTextFile("events.csv");
                // create event stream
                DataStream<Event> events = input.map(new LineToEvent());
                DataStream<Event> waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
                DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
                                .window(GlobalWindows.create())
                                .trigger(new TripTrigger())
                                .evictor(new TripEvictor())
                                .apply(new CreateTrip());
                tripStream.print();
        // execute program
                env.execute("Flink Streaming Java API Skeleton");
        }
       
    public static class TripTrigger extends Trigger<Event, GlobalWindow> {
        @Override
        public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
            // if this is a stop event, set a timer
            if (event.getTrigger() == 53) {
                                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

                @Override
                public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
            return TriggerResult.FIRE;
                }

                @Override
                public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                }

                @Override
                public void clear(GlobalWindow window, TriggerContext ctx) {
                }
    }

    private static class TripEvictor implements Evictor<Event, GlobalWindow>
{
                @Override
                public void evictBefore(Iterable<TimestampedValue&lt;Event>> events,
                int size, GlobalWindow window, EvictorContext ctx) {
                }

                @Override
                public void evictAfter(Iterable<TimestampedValue&lt;Event>> elements, int
size, GlobalWindow window, EvictorContext
                ctx) {
                        System.out.println(elements);
                        long firstStop = Event.earliestStopElement(elements);
                        // remove all events up to (and including) the first stop event (which is
the event that triggered the window)
                        for (Iterator<TimestampedValue&lt;Event>> iterator = elements.iterator();
iterator.hasNext(); ) {
                                TimestampedValue<Event> element = iterator.next();
                                if (element.getTimestamp() >= firstStop ) {
                                        iterator.remove();
                                }
                        }
                }
        }

        public static class CreateTrip implements WindowFunction<Event, TripEv,
Tuple, GlobalWindow> {
                @Override
                public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
Collector<TripEv> out) {
                        TripEv trp = new TripEv(events);
                        if (trp.length > 0) {
                                out.collect(trp);
                        }
                }
        }

        private static class LineToEvent implements MapFunction<String, Event> {
                @Override
                public Event map(String line) throws Exception {
                        return Event.fromString(line);
                }
        }


        private static class EventWAssigner implements
AssignerWithPunctuatedWatermarks<Event> {
                @Override
                public long extractTimestamp(Event event, long previousElementTimestamp) {
                        return event.getTimestamp();
                }

                @Override
                public Watermark checkAndGetNextWatermark(Event event, long
extractedTimestamp) {
                        // simply emit a watermark with every event
                        return new Watermark(extractedTimestamp - 1000L);
                }
        }
}







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Global Window, Trigger and Watermarks, Parallelism

Chesnay Schepler
Hello,

Event-time and watermarks can be used to deal with out-of-order events, but since you're using global windows (opposed to time-based windows) you have to implement the logic for doing this yourself.

Conceptually, what you would have to do is to not create your TripEv when receiving a STOP event, but when a given amount of time has passed after you received it. Your CreateTrip function would have to scan the window contents for START -> STOP sequences and check the timestamp of STOP with the current event time. (I think for this you would have to extend ProcessWindowFunction instead)
The evictor would have to use the same logic to detect sequences that were already processed.

I suggest to look into our CEP library as this looks like a perfect use-case for it.

On 12.03.2018 14:37, dim5b wrote:
Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...

  device_id,trigger_id,event_time,messageId
    1,START,1520433909396,1
    1,TRACKING,1520433914398,2
    1,TRACKING,1520433919398,3
    1,STOP,1520433924398,4
    1,START,1520433929398,5
    1,TRACKING,1520433934399,6
    1,TRACKING,1520433939399,7
    1,TRACKING,1520433944399,8
    1,STOP,1520433949399,9

Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.

I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)

I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.

Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?

public class SimpleEventJob {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(4);
        //env.setParallelism(1);
		DataStream<String> input = env.readTextFile("events.csv");
		// create event stream
		DataStream<Event> events = input.map(new LineToEvent());
		DataStream<Event> waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
		DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
				.window(GlobalWindows.create())
				.trigger(new TripTrigger())
				.evictor(new TripEvictor())
				.apply(new CreateTrip());
		tripStream.print();
        // execute program
		env.execute("Flink Streaming Java API Skeleton");
	}
	
    public static class TripTrigger extends Trigger<Event, GlobalWindow> {
        @Override
        public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
            // if this is a stop event, set a timer
            if (event.getTrigger() == 53) {
				return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
            return TriggerResult.FIRE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) {
		}
    }

    private static class TripEvictor implements Evictor<Event, GlobalWindow>
{
		@Override
		public void evictBefore(Iterable<TimestampedValue&lt;Event>> events,
		int size, GlobalWindow window, EvictorContext ctx) {
		}

		@Override
		public void evictAfter(Iterable<TimestampedValue&lt;Event>> elements, int
size, GlobalWindow window, EvictorContext
		ctx) {
			System.out.println(elements);
			long firstStop = Event.earliestStopElement(elements);
			// remove all events up to (and including) the first stop event (which is
the event that triggered the window)
			for (Iterator<TimestampedValue&lt;Event>> iterator = elements.iterator();
iterator.hasNext(); ) {
				TimestampedValue<Event> element = iterator.next();
				if (element.getTimestamp() >= firstStop ) {
					iterator.remove();
				}
			}
		}
	}

	public static class CreateTrip implements WindowFunction<Event, TripEv,
Tuple, GlobalWindow> {
		@Override
		public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
Collector<TripEv> out) {
			TripEv trp = new TripEv(events);
			if (trp.length > 0) {
				out.collect(trp);
			}
		}
	}

	private static class LineToEvent implements MapFunction<String, Event> {
		@Override
		public Event map(String line) throws Exception {
			return Event.fromString(line);
		}
	}


	private static class EventWAssigner implements
AssignerWithPunctuatedWatermarks<Event> {
		@Override
		public long extractTimestamp(Event event, long previousElementTimestamp) {
			return event.getTimestamp();
		}

		@Override
		public Watermark checkAndGetNextWatermark(Event event, long
extractedTimestamp) {
			// simply emit a watermark with every event
			return new Watermark(extractedTimestamp - 1000L);
		}
	}
}







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Global Window, Trigger and Watermarks, Parallelism

dim5b
I have looked into the CEP library. I have posted  an issued on
stackoverflow.

https://stackoverflow.com/questions/49047879/global-windows-in-flink-using-custom-triggers-vs-flink-cep-pattern-api

However the pattern matches all possible solution on the stream of
events.Does pattern have a notion of Evictor? How can you keep only the
specific set of events.

This question has not been answered and there seems to be a similar question

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Global Window, Trigger and Watermarks, Parallelism

dim5b
I see you replied on

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

with a known bug issue on the

https://issues.apache.org/jira/browse/FLINK-8914

In my case my pattern looks like

Pattern<Event, Event> tripPattern =
               
Pattern.<Event>begin("start").times(1).where(START_CONDITION)
                       
.followedBy("middle").where(MIDDLE_CONDITION).oneOrMore()
                        .next("end").where(END_CONDITION);

the end result being..

1> 1,2,3,4,
4> 1,2,3,6,7,8,11,12,
1> 5,6,7,8,11,12,
3> 5,6,7,8,9,
2> 1,2,3,6,7,8,9,
2> 10,11,12,

By using next operator  I limit the end-terminating side but begin takes all
possible solutions.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Global Window, Trigger and Watermarks, Parallelism

dim5b
By adding , AfterMatchSkipStrategy.skipPastLastEvent() it returns what i
want.

Is there a way to track/emit "ongoing" events i.e before the pattern matchs
the end event type?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/