TimeWindowAll doeesn't assign properly with EventTime

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

TimeWindowAll doeesn't assign properly with EventTime

Hung
This post was updated on .
Hi Flink users,

We have an issue that TimeWindowAll() doesn't assign properly. The sum should be in the same window but is generated in separate windows.

For example in the following, window 832348384 has window start time 2016-07-20T05:57:00.000 with counts 36, and there is another window 832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They should be aggregated in the same window 832348384 with counts 37.

...// hashCode in winodw, sum of events in the window, window start time
{"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
...

Example code is as follows:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Config.bootstrapServers);
        properties.setProperty("group.id", parameter.getRequired("groupId"));
        properties.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties);

        DataStream<JSONObject> streams = env.addSource(kafkaConsumer)
                .assignTimestampsAndWatermarks(new SampleWatermark()).rebalance();

        DataStream<JSONObject> afterWindow = streams.timeWindowAll(Time.minutes(1))
                .apply(new SumAllWindow());
     

public static class SumAllWindow implements AllWindowFunction<JSONObject,
            JSONObject, TimeWindow> {

        @Override
        public void apply(TimeWindow timeWindow, Iterable<JSONObject> values,
                          Collector<JSONObject> collector) throws Exception {

            DateTime startTs = new DateTime(timeWindow.getStart());
            JSONObject jsonObject = new JSONObject();

            int sum = 0;
            for (JSONObject value : values){
                sum += 1;
            }

            jsonObject.put("startDate", startTs.toString());
            jsonObject.put("count", sum);
            jsonObject.put("hashCode", timeWindow.hashCode());
            collector.collect(jsonObject);
        }
    }


public class SampleWatermark implements AssignerWithPeriodicWatermarks<JSONObject> {
    private final long maxOutOfOrderness = 10000 * 1;
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {      
        long timestamp = DateTime.parse(element.get("occurredAt").toString(), Config.timeFormatter).getMillis();      
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

We have no problem with a smaller Kafka topic with Flink 1.0.3, and using ProcessingTime has no issue.
Do we make a mistake somewhere?
Please let me know if any further information is required to resolve this issue.

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: TimeWindowAll doeesn't assign properly

Aljoscha Krettek
Hi,
the single-element-windows to me indicate that these originate from elements that arrived at the window operator after the watermark. In the current version of Flink these elements will be emitted as a single-element window. You can avoid this by writing a custom EventTimeTrigger that does not fire on late elements. In Flink version 1.1 we also introduce a setting that allows to specify an allowed lateness after which elements are dropped.

Cheers,
Aljoscha

On Fri, 29 Jul 2016 at 17:30 Sendoh <[hidden email]> wrote:
Hi Flink users,

We have an issue that TimeWindowAll() doesn't assign properly. The sum
should be in the same window but is generated in separate windows.

For example in the following, window 832348384 has window start time
2016-07-20T05:57:00.000 with counts 36, and there is another window
832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They
should be aggregated in the same window 832348384 with counts 37.

...// hashCode in winodw, sum of events in the window, window start time
{"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
...

Example code is as follows:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
Config.bootstrapServers);
        properties.setProperty("group.id",
parameter.getRequired("groupId"));
        properties.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new
FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties);

        DataStream<JSONObject> streams = env.addSource(kafkaConsumer)
                .assignTimestampsAndWatermarks(new
CorrelationWatermark()).rebalance();

        DataStream<JSONObject> afterWindow =
streams.timeWindowAll(Time.minutes(1))
                .apply(new SumAllWindow());


public static class SumAllWindow implements AllWindowFunction<JSONObject,
            JSONObject, TimeWindow> {

        @Override
        public void apply(TimeWindow timeWindow, Iterable<JSONObject>
values,
                          Collector<JSONObject> collector) throws Exception
{

            DateTime startTs = new DateTime(timeWindow.getStart());
            JSONObject jsonObject = new JSONObject();

            int sum = 0;
            for (JSONObject value : values){
                sum += 1;
            }

            jsonObject.put("startDate", startTs.toString());
            jsonObject.put("count", sum);
            jsonObject.put("hashCode", timeWindow.hashCode());
            collector.collect(jsonObject);
        }
    }


public class CorrelationWatermark implements
AssignerWithPeriodicWatermarks<JSONObject> {
    private final long maxOutOfOrderness = 10000 * 1;
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(JSONObject element, long
previousElementTimestamp) {
        long timestamp =
DateTime.parse(element.get("occurredAt").toString(),
Config.timeFormatter).getMillis();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

We have no problem with a smaller Kafka topic with Flink 1.0.3. Do we make a
mistake somewhere?
Please let me know if any further information is required to resolve this
issue.

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-tp8201.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: TimeWindowAll doeesn't assign properly

Hung
Thank you for helping the issue.

Those single-element-windows arrive within seconds and delay is configured with watermark as 60000 seconds.

Following are some samples after investigated.
...
{"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.846","startDate":"2016-07-19T21:34:00.000"}
{"hashCode":-1794280288,"count":42,"processAt":"2016-08-01T11:08:05.873","startDate":"2016-07-19T21:36:00.000"}
{"hashCode":-1796184288,"count":9,"processAt":"2016-08-01T11:08:05.874","startDate":"2016-07-19T21:35:00.000"}
{"hashCode":-1800043744,"count":1,"processAt":"2016-08-01T11:08:05.889","startDate":"2016-07-19T21:33:00.000"}
{"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"}
{"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"}
{"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"}
{"hashCode":-1794280288,"count":1,"processAt":"2016-08-01T11:08:05.891","startDate":"2016-07-19T21:36:00.000"}
...

"processAt" was generated as follows:

             @Override
           public void apply(TimeWindow timeWindow, Iterable<JSONObject> values,
                          Collector<JSONObject> collector) throws Exception {

            DateTime startTs = new DateTime(timeWindow.getStart());

            JSONObject jsonObject = new JSONObject();

            int sum = 0;
            for (Correlation value : values){
                sum += 1;
            }
            DateTime current = new DateTime(); //joda time          
            jsonObject.put("startDate", startTs.toString());
            jsonObject.put("count", sum);
            jsonObject.put("hashCode", timeWindow.hashCode());
            jsonObject.put("processAt", current.toString());

            collector.collect(jsonObject);
        }

Is there other mistake we can try to look into?

Best,

Hung Chang
Reply | Threaded
Open this post in threaded view
|

Re: TimeWindowAll doeesn't assign properly

Hung
In reply to this post by Aljoscha Krettek
Probably `processAt` is not used adequately because after increasing maxDelay in watermark to 10 minutes it works as expected.

Is there any upper limit of setting this maxDelay? Because there might be too many windows are waiting for the last instance?

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: TimeWindowAll doeesn't assign properly

Aljoscha Krettek
Hi,
yes, if you set the delay to high you will have to wait a long time until your windows are emitted.

Cheers,
Aljoscha

On Mon, 1 Aug 2016 at 04:52 Sendoh <[hidden email]> wrote:
Probably `processAt` is not used adequately because after increasing maxDelay
in watermark to 10 minutes it works as expected.

Is there any upper limit of setting this maxDelay? Because there might be
too many windows are waiting for the last instance?

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-with-EventTime-tp8201p8234.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.