Last event in event time window is not output when using env.fromElements

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Last event in event time window is not output when using env.fromElements

Hung
This post was updated on .
Hi Flink users,

We have a unit test to test event time window aggregation, but when the job finishes, the last event is not output because the Flink job finishes before the watermark proceeds, as there is no next event.

Does anyone have similar issue and have a solution?

The code is like:
env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
                TestData.events("2017-05-20T20:34:17.097Z", "998"),
                TestData.events("2017-05-20T20:38:17.097Z", "999"));


DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new EventWatermark())
                .keyBy(new KeyByID())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
                .allowedLateness(Time.minutes(Long.MAX_VALUE))
                .fold(null, new AggFoldFunction());

        Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);

        int count = 0;
        while (javaObj.hasNext()) {
            JsonNode current = javaObj.next();
            System.out.println(current);
            count++;
        }
        Assert.assertEquals(3, count);

The watermark is simply as:
public class EventWatermark implements AssignerWithPeriodicWatermarks<JsonNode> {

    private final long maxTimeLag = 5000;

    private long currentMaxTimestamp;
    public transient static DateTimeFormatter parseFromTimeFormatter = ISODateTimeFormat.dateTimeParser();

    @Override
    public long extractTimestamp(JsonNode element, long previousElementTimestamp) {
        long occurredAtLong;
        try {
            occurredAtLong = DateTime.parse(element.get("metadata").get("occurred_at").asText(), parseFromTimeFormatter).getMillis();
        }
        catch(IllegalArgumentException ie) {
            throw new IllegalArgumentException(element.asText());
        }

        if(occurredAtLong > currentMaxTimestamp){
            currentMaxTimestamp = occurredAtLong;
            }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Last event in event time window is not output

Fabian Hueske-2
Hi,

the problem might be that your source does not send a watermark this timestamp MAX_LONG after the last record has been sent.
So your operators never compute the last window.

Best, Fabian

2017-05-24 19:00 GMT+02:00 Sendoh <[hidden email]>:
Hi Flink users,

We have a unit test to test event time window aggregation, but when the job
finishes, the last event is not output because the Flink job finishes before
the watermark proceeds, as there is no next event.

Does anyone have similar issue and have a solution?

The code is like:
env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
                TestData.events("2017-05-20T20:34:17.097Z", "998"),
                TestData.events("2017-05-20T20:38:17.097Z", "999"));


DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
EventWatermark())
                .keyBy(new KeyByID())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
                .allowedLateness(Time.minutes(Long.MAX_VALUE))
                .fold(null, new AggFoldFunction());

        Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);

        int count = 0;
        while (javaObj.hasNext()) {
            JsonNode current = javaObj.next();
            System.out.println(current);
            count++;
        }
        Assert.assertEquals(3, count);

The watermark is simply as:
public class EventWatermark implements
AssignerWithPeriodicWatermarks<JsonNode> {

    private final long maxTimeLag = 5000;

    private long currentMaxTimestamp;
    public transient static DateTimeFormatter parseFromTimeFormatter =
ISODateTimeFormat.dateTimeParser();

    @Override
    public long extractTimestamp(JsonNode element, long
previousElementTimestamp) {
        long occurredAtLong;
        try {
            occurredAtLong =
DateTime.parse(element.get("metadata").get("occurred_at").asText(),
parseFromTimeFormatter).getMillis();
        }
        catch(IllegalArgumentException ie) {
            throw new IllegalArgumentException(element.asText());
        }

        if(occurredAtLong > currentMaxTimestamp){
            currentMaxTimestamp = occurredAtLong;
            }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Last event in event time window is not output

Aljoscha Krettek
Hi,

All sources emit a Long.MAX_VALUE watermark when they shut down.

What is the expected output and what is the output that you actually get?

Best,
Aljoscha

On 27. May 2017, at 00:01, Fabian Hueske <[hidden email]> wrote:

Hi,

the problem might be that your source does not send a watermark this timestamp MAX_LONG after the last record has been sent.
So your operators never compute the last window.

Best, Fabian

2017-05-24 19:00 GMT+02:00 Sendoh <[hidden email]>:
Hi Flink users,

We have a unit test to test event time window aggregation, but when the job
finishes, the last event is not output because the Flink job finishes before
the watermark proceeds, as there is no next event.

Does anyone have similar issue and have a solution?

The code is like:
env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
                TestData.events("2017-05-20T20:34:17.097Z", "998"),
                TestData.events("2017-05-20T20:38:17.097Z", "999"));


DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
EventWatermark())
                .keyBy(new KeyByID())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
                .allowedLateness(Time.minutes(Long.MAX_VALUE))
                .fold(null, new AggFoldFunction());

        Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);

        int count = 0;
        while (javaObj.hasNext()) {
            JsonNode current = javaObj.next();
            System.out.println(current);
            count++;
        }
        Assert.assertEquals(3, count);

The watermark is simply as:
public class EventWatermark implements
AssignerWithPeriodicWatermarks<JsonNode> {

    private final long maxTimeLag = 5000;

    private long currentMaxTimestamp;
    public transient static DateTimeFormatter parseFromTimeFormatter =
ISODateTimeFormat.dateTimeParser();

    @Override
    public long extractTimestamp(JsonNode element, long
previousElementTimestamp) {
        long occurredAtLong;
        try {
            occurredAtLong =
DateTime.parse(element.get("metadata").get("occurred_at").asText(),
parseFromTimeFormatter).getMillis();
        }
        catch(IllegalArgumentException ie) {
            throw new IllegalArgumentException(element.asText());
        }

        if(occurredAtLong > currentMaxTimestamp){
            currentMaxTimestamp = occurredAtLong;
            }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.