Debugging window processing: can I output window start/end times, prove correctness?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Debugging window processing: can I output window start/end times, prove correctness?

chrisr123

I am learning the tumbling and rolling window API and I was wondering what
API calls people use
to determine if their events are being assigned to windows as they expect?
For example, is there
a way to print out the window start and and times for windows as they are
being processed, and what
events are contained in the window before an operation is performed?

I have a simple socket-reading stream application that uses Event Time. I
have written a test "producer"
app that generates events with timestamps, and sometimes I deliberately
force an event to be out of order
by using sleep(), etc.  However, I want to convince myself that events are
being included in sliding windows as
I expect. The main part of the app is this snippet:

                DataStream<Tuple2&lt;String,Integer>> sensorStream = env
                                .socketTextStream(parms.get("host"), parms.getInt("port"))
                                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                                .map(new RawSensorMapper());

                // Compute:
                // Sliding 10-second window every 5 seconds
                DataStream<Tuple2&lt;String,Integer>> counts = sensorStream
                                .keyBy(0)
                                .timeWindow(Time.seconds(10), Time.seconds(5))
                                .sum(1);

I just map each event to Tuple2<id,1>

How can I code this to "see" the start and end times of each sliding window
as it's generated so I can see why certain events are or are not included
within a given window?

The events come in looking like this. (Simulating data coming from a
sensor).  I use AssignerWithPeriodicWatermarks subclass to parse the event
time timestamp from each record. The "sensor1" field is used as the key.
Watermark is just System.currentTimeMillis() at the moment.

1527516714364 sensor1 28.33
1527516714365 sensor1 311.42
1527516717365 sensor1 12.33

Any tips appreciated.
Thanks






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Debugging window processing: can I output window start/end times, prove correctness?

chrisr123
I'm not sure if this is a "best practice" for debugging, but I found that if
use apply()
one of the parameters passed into the WindowFunction that I must implement
contains
a TimeWindow object, that has start and end times:

private static class MyApplyWindowFunction implements
WindowFunction<Tuple2&lt;String,Integer>, Tuple2<String,Integer>, Tuple,
TimeWindow> {

                @Override
                public void apply(Tuple key, TimeWindow window, Iterable<Tuple2&lt;String,
Integer>> input,
                                Collector<Tuple2&lt;String, Integer>> out) throws Exception {
                       
                        System.out.println("ApplyWindowFunction BEGIN: " + key.getField(0));
                        System.out.println("WINDOW START=" + window.getStart());
                        System.out.println("WINDOW END=" + window.getEnd());
                       
                       
                }
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/