Tumbling window with timestamp out-of-range events

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling window with timestamp out-of-range events

Yu Yang

Hi, 


We implement a flink application that uses TumblingWindow, and uses even time as time characteristics. In the TumblingWindow's process function, we has the implementation below that checks whether the event's timestamp is in the tumbling window's timestamp range.  We expected that all events shall be in the range. However, the application reports events with out-of-range timestamps.  Any insights on how this happens? 


@Override
public void process(EventStreamPartitionKey key,
                  Context context, Iterable<Event> elements, Collector<EventResult> out) {

for(Event event : elements) {
    if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

    System.out.println("NOT in RANGE: " + context.window().getStart()

        + ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}


Thanks!


Regards,

-Yu

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window with timestamp out-of-range events

Yu Yang
Please ignore this message. The issue was that a different timestamp extractor was used when the kafka source was setup. That caused the issue. 

On Tue, Jun 9, 2020 at 2:58 PM Yu Yang <[hidden email]> wrote:

Hi, 


We implement a flink application that uses TumblingWindow, and uses even time as time characteristics. In the TumblingWindow's process function, we has the implementation below that checks whether the event's timestamp is in the tumbling window's timestamp range.  We expected that all events shall be in the range. However, the application reports events with out-of-range timestamps.  Any insights on how this happens? 


@Override
public void process(EventStreamPartitionKey key,
                  Context context, Iterable<Event> elements, Collector<EventResult> out) {

for(Event event : elements) {
    if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

    System.out.println("NOT in RANGE: " + context.window().getStart()

        + ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}


Thanks!


Regards,

-Yu