Re: Cannot see all events in window apply() for big input

Posted by Hung on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p10028.html

Hi,

We let watermark proceed at the earliest timestamp among all event types. Our test result looks correct.

/*
* Watermark proceeds at the earliest timestamp among all the event types
* */
public class EventsWatermark implements AssignerWithPeriodicWatermarks<Map<String, Object>> {

    private final long maxTimeLag = 180000;

    private long currentMaxTimestamp;
    private Map<String, Long> eventTimestampMap;
    private int eventSize;

    public EventsWatermark(int eventSize){
        this.eventSize = eventSize;
        eventTimestampMap = new HashMap<>();
    }

    @Override
    public long extractTimestamp(Map<String, Object> element, long previousElementTimestamp) {
        long occurredAtLong = DateTime.parse(element.get("occurred_at").toString(), Config.timeFormatter).getMillis();
        String eventType = element.get("event_type").toString();

        // Update the timestamp of this event
        eventTimestampMap.put(eventType, occurredAtLong);

        // Haven't collected all timestamps of events, so watermark is not forwarding
        if(eventSize != eventTimestampMap.size()){
            currentMaxTimestamp = Math.min(occurredAtLong, currentMaxTimestamp);
        }
        // Get the smallest timestamp of all events which should be the watermark that can proceed
        else{
            // Get the earliest timestamp of all events
            currentMaxTimestamp = Collections.min(eventTimestampMap.values());
        }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Cheers,

Sendoh