|
Hi,
I'm trying to count the number of events in a window (every 5 seconds). The code below works fine if there are events in the window, if there are no events in the window no output is emitted.
What I want to achieve is a count of 0 when there are no events in the time window of 5 seconds.
Can you help me?
Thanks
DataStreamSource<ObjectNode> stream = env.addSource(myConsumer); DataStream<InputTuple_usb_monitor> tupledStream = stream.map(new Json2Tuple()); SingleOutputStreamOperator<AvgCountSum> out = tupledStream .filter(new FilterFunction<InputTuple_usb_monitor>() { private static final long serialVersionUID = 1L; @Override public boolean filter(InputTuple_usb_monitor arg0) throws Exception { return (arg0.usb_code.equals("UMDFHostDeviceArrivalBegin")); } }) .timeWindowAll(Time.seconds(5)) .aggregate(new AvgCountAggregate()); out.print();
|