Hi, Observations on Watermarks: Read this great article: * Watermark means when for any event TS, when to stop waiting for arrival of earlier events. * Watermark t means all events with Timestamp < t have already arrived. * When to push data out - When watermark with TS >= t arrives Only using incrementing current time for watermark seems to be working correctly but not sure if it aligns up correctly with EventTime processing. Using the incoming records intervalStart as the Watermark source for EventTime causes data to not be pushed at all in cases when i have just 5 records in the Source. My source generation for intervalStart has intervalStart incrementing at a regular interval. I tried using the intervalStart for my Watermark with a out of order late boundedness of 3 secs. The AggregateFunction I am using calls the add() fine but never calls the getResult(). My assumption was that the AggregateFunction I am using would push the data to getResult based on the Watermark based on intervalStart incrementing beyong the previous watermark t. But it doesn't -is it because I have limited number of input records and once intervalStart gets to the end of the input records too fast, it stops incrementing the watermar and hence doesn't push data ? With System.currentTimeMillis, it happily keeps increasing and hence pushes the data. Created this class: public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> { private long bound = 3 * 1000;//3 secs out of order bound in millisecs public MonitoringAssigner(long bound) { this.bound = bound; } public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) { long nextWatermark = extractedTimestamp - bound; //simply emit a Watermark with every event return new Watermark(nextWatermark); } @Override public long extractTimestamp(Monitoring monitoring, long previousTS) { /*LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057 long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);//using this stopped pushing recs after a certain time return extractedTS;*/ return System.currentTimeMillis();//incrementing current time } |
Hi Vijay, Could you provide more information about your problem? For example - Which kind of window do you use? - What's the window size? - A relatively complete code is better :-) As for the problem, it is probably the event time has not reached the end of the window. You can monitor the watermark in the web dashboard[1]. Also, changing even time to processing time is another way to verify if it is a watermark problem. Best, Hequn On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi, Thx for your reply and pointers on the currentLowWatermark. Looks like the Flink UI has tab for Watermarks itself for an Operator. I dump 5 records into the Kinesis Data Stream and am trying to read the same record from the FlinkKinesisConsumer and am not able to. I am using the same monitoring.getIntervalStart() in the Watermark generation(intervalStart - bound) in MonitoringAssigner class that I used to generate data on the producer side. I generate intervalStart on the Producer side which increments on each record by 3-10 millisecs. The watermark is being generated with intervalStart - bound(3 secs)-so, every watermark generated is > than the previous one. So, why does it not push data out ? It gets into the MGroupingWindowAggregate.add(..) method but never gets into the MGroupingWindowAggregate.getResult(..) method ?? It works when i produce 1000 records or so into Kinesis data stream. Here is a gist of my code- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //FlinkConsumer Properties kinesisConsumerConfig = new Properties(); ...... kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");//2000 kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name()); FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>( kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig); final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(kinesisConsumer); DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringAssigner(3000));//code at bottom org.apache.flink.streaming.api.windowing.time.Time timeWindow = Time.seconds(5); final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = kinesisStream.timeWindow(timeWindow); DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = windowStream.aggregate( new MGroupingWindowAggregate(....),//AggregateFunction impl new MGroupingAggregateWindowProcessing(...)); public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> { private long bound = 3 * 1000;//3 secs out of order bound in millisecs public MonitoringAssigner(long bound) { this.bound = bound; } public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) { long nextWatermark = extractedTimestamp - bound; return new Watermark(nextWatermark); } public long extractTimestamp(Monitoring monitoring, long previousTS) { LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057 long extractedTS = Utils.getLongFromLocalDateTime(intervalStart); return extractedTS; //return System.currentTimeMillis(); //this works fine. } } TIA, Vijay On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <[hidden email]> wrote:
|
Hi, After looking at the code in EventTimeTrigger, I changed the Watermark to be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS was <= watermark. I was able to consumer from Kinesis when I had only 50 records. For TumblingWindow of 5 secs , the window maxTS was usually like around currTime + 5 secs. So, I set the watermark to System.currentMillisecs + 5 secs. This way, the trigger fired and got into the AggregateFunction.getResult(). @Override On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |