This post was updated on .
Hi Flink users,
We have a unit test to test event time window aggregation, but when the job finishes, the last event is not output because the Flink job finishes before the watermark proceeds, as there is no next event. Does anyone have similar issue and have a solution? The code is like: env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"), TestData.events("2017-05-20T20:34:17.097Z", "998"), TestData.events("2017-05-20T20:38:17.097Z", "999")); DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new EventWatermark()) .keyBy(new KeyByID()) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .trigger(PurgingTrigger.of(EventTimeTrigger103.create())) .allowedLateness(Time.minutes(Long.MAX_VALUE)) .fold(null, new AggFoldFunction()); Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult); int count = 0; while (javaObj.hasNext()) { JsonNode current = javaObj.next(); System.out.println(current); count++; } Assert.assertEquals(3, count); The watermark is simply as: public class EventWatermark implements AssignerWithPeriodicWatermarks<JsonNode> { private final long maxTimeLag = 5000; private long currentMaxTimestamp; public transient static DateTimeFormatter parseFromTimeFormatter = ISODateTimeFormat.dateTimeParser(); @Override public long extractTimestamp(JsonNode element, long previousElementTimestamp) { long occurredAtLong; try { occurredAtLong = DateTime.parse(element.get("metadata").get("occurred_at").asText(), parseFromTimeFormatter).getMillis(); } catch(IllegalArgumentException ie) { throw new IllegalArgumentException(element.asText()); } if(occurredAtLong > currentMaxTimestamp){ currentMaxTimestamp = occurredAtLong; } return occurredAtLong; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxTimeLag); } } Best, Sendoh |
Hi, So your operators never compute the last window.the problem might be that your source does not send a watermark this timestamp MAX_LONG after the last record has been sent. 2017-05-24 19:00 GMT+02:00 Sendoh <[hidden email]>: Hi Flink users, |
Hi,
All sources emit a Long.MAX_VALUE watermark when they shut down. What is the expected output and what is the output that you actually get? Best, Aljoscha
|
Free forum by Nabble | Edit this page |