Re: Cannot see all events in window apply() for big input
Posted by Hung on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p10028.html
Hi,
We let watermark proceed at the earliest timestamp among all event types. Our test result looks correct.
/*
* Watermark proceeds at the earliest timestamp among all the event types
* */
public class EventsWatermark implements AssignerWithPeriodicWatermarks<Map<String, Object>> {
private final long maxTimeLag = 180000;
private long currentMaxTimestamp;
private Map<String, Long> eventTimestampMap;
private int eventSize;
public EventsWatermark(int eventSize){
this.eventSize = eventSize;
eventTimestampMap = new HashMap<>();
}
@Override
public long extractTimestamp(Map<String, Object> element, long previousElementTimestamp) {
long occurredAtLong = DateTime.parse(element.get("occurred_at").toString(), Config.timeFormatter).getMillis();
String eventType = element.get("event_type").toString();
// Update the timestamp of this event
eventTimestampMap.put(eventType, occurredAtLong);
// Haven't collected all timestamps of events, so watermark is not forwarding
if(eventSize != eventTimestampMap.size()){
currentMaxTimestamp = Math.min(occurredAtLong, currentMaxTimestamp);
}
// Get the smallest timestamp of all events which should be the watermark that can proceed
else{
// Get the earliest timestamp of all events
currentMaxTimestamp = Collections.min(eventTimestampMap.values());
}
return occurredAtLong;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxTimeLag);
}
}
Cheers,
Sendoh