|
Please ignore this message. The issue was that a different timestamp extractor was used when the kafka source was setup. That caused the issue. Hi,
We implement a flink application that uses TumblingWindow, and uses even time as time characteristics. In the TumblingWindow's process function, we has the implementation below that checks whether the event's timestamp is in the tumbling window's timestamp range. We expected that all events shall be in the range. However, the application reports events with out-of-range timestamps. Any insights on how this happens?
@Override public void process(EventStreamPartitionKey key, Context context, Iterable<Event> elements, Collector<EventResult> out) {
for(Event event : elements) { if ( event.getTimestamp() >= context.window().getEnd() || event.getTimestamp() < context.window().getStart() ) System.out.println("NOT in RANGE: " + context.window().getStart()
+ ", " + event.getTimestamp() + ", " + context.window().getEnd()); ... } out.collect(res); }
Thanks!
Regards, -Yu
|