This post was updated on .
Hi Flink users,
We have an issue that TimeWindowAll() doesn't assign properly. The sum should be in the same window but is generated in separate windows. For example in the following, window 832348384 has window start time 2016-07-20T05:57:00.000 with counts 36, and there is another window 832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They should be aggregated in the same window 832348384 with counts 37. ...// hashCode in winodw, sum of events in the window, window start time {"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"} {"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"} {"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} ... Example code is as follows: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", Config.bootstrapServers); properties.setProperty("group.id", parameter.getRequired("groupId")); properties.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties); DataStream<JSONObject> streams = env.addSource(kafkaConsumer) .assignTimestampsAndWatermarks(new SampleWatermark()).rebalance(); DataStream<JSONObject> afterWindow = streams.timeWindowAll(Time.minutes(1)) .apply(new SumAllWindow()); public static class SumAllWindow implements AllWindowFunction<JSONObject, JSONObject, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable<JSONObject> values, Collector<JSONObject> collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart()); JSONObject jsonObject = new JSONObject(); int sum = 0; for (JSONObject value : values){ sum += 1; } jsonObject.put("startDate", startTs.toString()); jsonObject.put("count", sum); jsonObject.put("hashCode", timeWindow.hashCode()); collector.collect(jsonObject); } } public class SampleWatermark implements AssignerWithPeriodicWatermarks<JSONObject> { private final long maxOutOfOrderness = 10000 * 1; private long currentMaxTimestamp; @Override public long extractTimestamp(JSONObject element, long previousElementTimestamp) { long timestamp = DateTime.parse(element.get("occurredAt").toString(), Config.timeFormatter).getMillis(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } We have no problem with a smaller Kafka topic with Flink 1.0.3, and using ProcessingTime has no issue. Do we make a mistake somewhere? Please let me know if any further information is required to resolve this issue. Best, Sendoh |
Hi, the single-element-windows to me indicate that these originate from elements that arrived at the window operator after the watermark. In the current version of Flink these elements will be emitted as a single-element window. You can avoid this by writing a custom EventTimeTrigger that does not fire on late elements. In Flink version 1.1 we also introduce a setting that allows to specify an allowed lateness after which elements are dropped. Cheers, Aljoscha On Fri, 29 Jul 2016 at 17:30 Sendoh <[hidden email]> wrote: Hi Flink users, |
Thank you for helping the issue.
Those single-element-windows arrive within seconds and delay is configured with watermark as 60000 seconds. Following are some samples after investigated. ... {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.846","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":42,"processAt":"2016-08-01T11:08:05.873","startDate":"2016-07-19T21:36:00.000"} {"hashCode":-1796184288,"count":9,"processAt":"2016-08-01T11:08:05.874","startDate":"2016-07-19T21:35:00.000"} {"hashCode":-1800043744,"count":1,"processAt":"2016-08-01T11:08:05.889","startDate":"2016-07-19T21:33:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":1,"processAt":"2016-08-01T11:08:05.891","startDate":"2016-07-19T21:36:00.000"} ... "processAt" was generated as follows: @Override public void apply(TimeWindow timeWindow, Iterable<JSONObject> values, Collector<JSONObject> collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart()); JSONObject jsonObject = new JSONObject(); int sum = 0; for (Correlation value : values){ sum += 1; } DateTime current = new DateTime(); //joda time jsonObject.put("startDate", startTs.toString()); jsonObject.put("count", sum); jsonObject.put("hashCode", timeWindow.hashCode()); jsonObject.put("processAt", current.toString()); collector.collect(jsonObject); } Is there other mistake we can try to look into? Best, Hung Chang |
In reply to this post by Aljoscha Krettek
Probably `processAt` is not used adequately because after increasing maxDelay in watermark to 10 minutes it works as expected.
Is there any upper limit of setting this maxDelay? Because there might be too many windows are waiting for the last instance? Best, Sendoh |
Hi, yes, if you set the delay to high you will have to wait a long time until your windows are emitted. Cheers, Aljoscha On Mon, 1 Aug 2016 at 04:52 Sendoh <[hidden email]> wrote: Probably `processAt` is not used adequately because after increasing maxDelay |
Free forum by Nabble | Edit this page |