I am learning the tumbling and rolling window API and I was wondering what API calls people use to determine if their events are being assigned to windows as they expect? For example, is there a way to print out the window start and and times for windows as they are being processed, and what events are contained in the window before an operation is performed? I have a simple socket-reading stream application that uses Event Time. I have written a test "producer" app that generates events with timestamps, and sometimes I deliberately force an event to be out of order by using sleep(), etc. However, I want to convince myself that events are being included in sliding windows as I expect. The main part of the app is this snippet: DataStream<Tuple2<String,Integer>> sensorStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) .map(new RawSensorMapper()); // Compute: // Sliding 10-second window every 5 seconds DataStream<Tuple2<String,Integer>> counts = sensorStream .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1); I just map each event to Tuple2<id,1> How can I code this to "see" the start and end times of each sliding window as it's generated so I can see why certain events are or are not included within a given window? The events come in looking like this. (Simulating data coming from a sensor). I use AssignerWithPeriodicWatermarks subclass to parse the event time timestamp from each record. The "sensor1" field is used as the key. Watermark is just System.currentTimeMillis() at the moment. 1527516714364 sensor1 28.33 1527516714365 sensor1 311.42 1527516717365 sensor1 12.33 Any tips appreciated. Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I'm not sure if this is a "best practice" for debugging, but I found that if
use apply() one of the parameters passed into the WindowFunction that I must implement contains a TimeWindow object, that has start and end times: private static class MyApplyWindowFunction implements WindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, TimeWindow> { @Override public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { System.out.println("ApplyWindowFunction BEGIN: " + key.getField(0)); System.out.println("WINDOW START=" + window.getStart()); System.out.println("WINDOW END=" + window.getEnd()); } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |