sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range

Yu Yang
Hi all, 

We are writing an application that set TimeCharacteristic.EventTime as time characteristic. When we implement the ProcessWindowFunction for a TumblingWindow, we added code as below to check if the timestamp of events is in the tumbling window time range. To our surprise, we found that the process function reports processing events that are not in the tumbling window time range. Any insights on how this happens?  We are using Flink 1.9.1. 

Below is the timestamp assigner, stream dag snippet and process function implementation:

Timestamp assigner: 

FlinkKafkaConsumerBase<Event> source = consumer.assignTimestampsAndWatermarks(

   new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(60)) {

         @Override

         public long extractTimestamp(Event element) {

           return element.getTimestamp();

         }

       });



The stream dag of our application: 

env.addSource(source)

   .filter(new EventFilter(events))

    .keyBy(new KeySelector<Event, EventStreamPartitionKey>() {

        @Override

        public EventStreamPartitionKey  getKey(Event value)

           throws Exception {

        return new EventStreamPartitionKey(value.getHost());

       }

    }).window(TumblingEventTimeWindows.of(Time.seconds(60))

   .process(new EventValidator())

   .addSink(kafkaProducer);


The implementation of  process window function EventValidator.process that checks whether the event timestamp is in the tumbling window time range:

@Override
public void process(EventStreamPartitionKey key,
                  Context context, Iterable<Event> elements, Collector<EventResult> out) {

for(Event event : elements) {
    if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

    System.out.println("NOT in RANGE: " + context.window().getStart()

        + ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}



Thanks!


Regards,

-Yu