Hi, Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired 1. Ingest data from Kinesis (works fine) 2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back) 3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate called ???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement 4. I have tried with timeWindow of 1m and 15s Main code: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //Setup Kinesis Consumer Properties kinesisConsumerConfig = new Properties(); .. kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>( "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig); DataStream<Map<String, Object>> kinesisStream; RichSinkFunction<InfluxDBPoint> influxSink; DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer); kinesisStream = monitoringDataStreamSource .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound)); influxSink = pms.createInfluxMonitoringSink(....); ...... ...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ????? new MGroupingWindowAggregate(interval), new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) .map(new MonitoringGroupingToInfluxDBPoint(rule)); enrichedMGStream.addSink(influxSink); env.execute("Aggregation of Map data"); MonitoringTSWAssigner code: public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> { private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs private long maxTimestamp = Long.MIN_VALUE; public MonitoringTSWAssigner() { } public MonitoringTSWAssigner(long bound) { this.bound = bound; } public long extractTimestamp(Map<String, Object> monitoring, long previousTS) { long extractedTS = getExtractedTS(monitoring); if (extractedTS > maxTimestamp) { maxTimestamp = extractedTS; } return extractedTS;//return System.currentTimeMillis(); }public long getExtractedTS(Map<String, Object> monitoring) { final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : ""; return Utils.getLongFromDateStr(eventTimestamp); } @Override public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) { long extractedTS = getExtractedTS(monitoring); long nextWatermark = maxTimestamp - bound; return new Watermark(nextWatermark); } } MGroupingWindowAggregate: public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> { private final String interval; public MGroupingWindowAggregate(String interval) { this.interval = interval; } public Map<String, Object> createAccumulator() { return new ConcurrentHashMap<>(); } public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) { ..... } ..... } TIA, |
Hi Do you mean `windowStream.aggregate` do not work for all records or just some records. If for some records, can you try to confirm that the assigned watermark is monotonic increase. If for all records, can you confirm that the watermark has reached the end of the window? In another word, could you share how do you tell that `windowStream.aggregate method doesn't seem to fire`? Best, Congxian Vijay Balakrishnan <[hidden email]> 于2019年10月12日周六 上午3:37写道:
Hi Vijay, Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance. Best, Dawdi On 11/10/2019 21:37, Vijay Balakrishnan
Hi, Thx for the replies - Congxian & Dawdi. Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ???? I did check the Flink UI for the currentInputWatermark and it is increasing monotonically. Narrowed down the problem to not calling the windowStream.aggregate. I also added a checkpoint to see if it was causing the issue.Didn't seem to help. Most of the code is reached during the creation of the ExecutionGraph on the start of the program. I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records. My window size is 15s. I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger. It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). I even tried to use ProcessingTime but that also didn't help. //code to create kinesis consumer successfully...... for (Rule rule : rules.getRules()) { //gets in here fine final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> { boolean result; String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : ""; result = eventName.equals(measurement); if (result) { Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>(); Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); result = matchTags(inputTags, ruleTags); } return result;//<== this is true } ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> { out.collect(input);//<==== runs up till here fine }).returns(new TypeHint<Map<String, Object>>() { }); //doesn't do anything beyond this point at runtime DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1 (filteredKinesisStream, ruleFactory, rule, parallelProcess); enrichedMGStream.addSink(influxSink) .setParallelism(nbrSinks); } private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) { DataStream<InfluxDBPoint> enrichedComponentInstanceStream1; RuleConfig ruleConfig = rule.getRuleConfig(); String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; RuleIF ruleImpl = ruleFactory.getRule(ruleType); Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>(); Object intervalObj = ruleProps.get("rule_eval_window"); String timeInterval = intervalObj != null ? (String) intervalObj : ""; org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval); Object windowTypeObj = ruleProps.get("window_type"); String windowType = windowTypeObj != null ? (String) windowTypeObj : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); String groupByObj = tags.get(GROUP_BY); String groupBy = groupByObj != null ? groupByObj : ""; kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> { Object groupByValueObj = inputMap.get(groupBy); return groupByValueObj != null; }); Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); //till here, it went through fine during creation of ExceutionGraph KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess); return enrichedComponentInstanceStream1; } private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream, org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType, String interval, RuleIF ruleImpl, Rule rule, int parallelProcess) { long slide = 100; final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = windowType.equalsIgnoreCase(SLIDING) ? monitoringTupleKeyedStream .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) : monitoringTupleKeyedStream .timeWindow(timeWindow); return windowStream.aggregate( new MGroupingWindowAggregate(interval),//<=== never gets into add() here new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) .map(new MonitoringGroupingToInfluxDBPoint(rule)); } On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi Vijay, Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or is not even the filter function itself called? (Due to your comments suggesting it). Best regards Theo Von: "Vijay Balakrishnan" <[hidden email]> An: "Dawid Wysakowicz" <[hidden email]> CC: "user" <[hidden email]> Gesendet: Dienstag, 15. Oktober 2019 02:01:05 Betreff: Re: add() method of AggregateFunction not called even though new watermark is emitted Hi, Thx for the replies - Congxian & Dawdi. Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ???? I did check the Flink UI for the currentInputWatermark and it is increasing monotonically. Narrowed down the problem to not calling the windowStream.aggregate. Hi Theo, It gets to the FilterFunction during the creation of the ExecutionGraph initially but not during the runtime when recs are streaming in.So, it is not getting that far- seems to be stuck in the final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter code. Doesn't seem to get past it as it keeps incrementing watermarks but the Watermark never seems to hit the end of the window.Maybe I am doing something super simple stupid. TIA, Vijay On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <[hidden email]> wrote:
Hi Theo, You were right. For some reason(I still haven't figured it out) but the FilterFunction was causing issues. I commented it out and it started getting into the add() method of the aggregate method. /*kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> { TIA, Vijay
