Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi, Trying to get a count of records in the Stream for a time window of 5s. Always getting a count of 1 ?? Sent in 10 records.Expect the count to be 10 at the end. Tried to follow the advise here from Fabian Hueske- https://stackoverflow.com/questions/45606999/how-to-count-the-number-of-records-processed-by-apache-flink-in-a-given-time-win DataStream<Map<String, Object>> kinesisStream; ...//get data from Kinesis source into kinesisStream - works fine final SingleOutputStreamOperator<Map<String, Object>> filterDroppedEvents = kinesisStream .filter(resultMap -> { long timestamp = Utils.getEventTimestampFromMap(resultMap); long currTimestamp = System.currentTimeMillis(); long driftFromCurrTS = currTimestamp - timestamp; if (driftFromCurrTS < 0) { Object eventNameObj = resultMap.get(EVENT_NAME); String eventName = eventNameObj != null ? (String) eventNameObj : ""; logger.debug("PMS - event_timestamp is > current timestamp by driftFromCurrTS:{} for event_name:{} and event_timestamp:{}", driftFromCurrTS, eventName, timestamp); return true; } else { return false; } });//called 10 times here - GOOD final SingleOutputStreamOperator<CountRows> droppedEventsMapToCountRows = filterDroppedEvents .map(mapValue -> new CountRows(mapValue, 1L, mapValue.get(EVENT_NAME) != null ? (String) mapValue.get(EVENT_NAME) : ""));//this is called 10 times - GOOD final KeyedStream<CountRows, String> countRowsKeyedStream = droppedEventsMapToCountRows.keyBy(new KeySelector<CountRows, String>() { @Override public String getKey(CountRows countRows) throws Exception { logger.info("Inside getKey"); return countRows.getEventName(); } });//doesn't get in here to this logger statement ?? final AllWindowedStream<CountRows, TimeWindow> countRowsTimeWindowAllWindowedStream = countRowsKeyedStream .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); //.sum("count") final SingleOutputStreamOperator<CountRows> countRowsReduceStream = countRowsTimeWindowAllWindowedStream.reduce((accum, input) -> { logger.info("Inside reduce"); return new CountRows(input.getRow(), accum.getCount() + input.getCount(), input.getEventName());// sum 1s to count });//don't see this logger statement "Inside reduce" DataStream<InfluxDBPoint> droppedEventsStream = countRowsReduceStream.flatMap(new FlatMapFunction<CountRows, InfluxDBPoint>() { @Override public void flatMap(CountRows countRows, Collector<InfluxDBPoint> out) throws Exception { logger.info("Inside final map"); // only called once and countRows.getCount() is 1 - BAD - want it to be 10 ?? Map<String, Object> mapValue = countRows.getRow(); //long currTimestamp = System.currentTimeMillis(); Object eventTSObj = mapValue.get(EVENT_TIMESTAMP); String eventTimestamp = eventTSObj != null ? (String)eventTSObj : ""; long eventTS = Utils.getLongFromDateStr(eventTimestamp); Map<String, String> tags = new HashMap<>(); Object eventNameObj = mapValue.get(Utils.EVENT_NAME); String eventName = eventNameObj != null ? (String)eventNameObj : ""; tags.put(Utils.EVENT_NAME, eventName); Map<String, Object> fields = new HashMap<>(); fields.put("count", countRows.getCount()); out.collect(new InfluxDBPoint("dropped_events_count", eventTS, tags, fields));//TODO: measurement name } }); /* Tried map but doesn't work reduceStream.map(countRows -> { logger.info("Inside final map"); Map<String, Object> mapValue = countRows.getRow(); //long currTimestamp = System.currentTimeMillis(); Object eventTSObj = mapValue.get(EVENT_TIMESTAMP); String eventTimestamp = eventTSObj != null ? (String)eventTSObj : ""; long eventTS = Utils.getLongFromDateStr(eventTimestamp); Map<String, String> tags = new HashMap<>(); Object eventNameObj = mapValue.get(Utils.EVENT_NAME); String eventName = eventNameObj != null ? (String)eventNameObj : ""; tags.put(Utils.EVENT_NAME, eventName); Map<String, Object> fields = new HashMap<>(); fields.put("count", countRows.getCount()); return new InfluxDBPoint("dropped_events_count", eventTS, tags, fields);//TODO: measurement name });*/ droppedEventsStream.addSink(influxSink); @@@@@@@@@@@@@@@@@@@@@@@@@@@@@ CountRows is a POJO wrapper around the Map<String, Object> to add the count: public static class CountRows implements Serializable, Comparable<CountRows> { Map<String, Object> row; Long count; String eventName; ......... TIA, |
Free forum by Nabble | Edit this page |