timewindowall and aggregate(count): count 0 when no event in the window

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

timewindowall and aggregate(count): count 0 when no event in the window

Luigi Sgaglione
Hi, 

I'm trying to count the number of events in a window (every 5 seconds).
The code below works fine if there are events in the window, if there are no events in the window no output is emitted.

What I want to achieve is a count of 0 when there are no events in the time window of 5 seconds.

Can you help me?


Thanks   

DataStreamSource<ObjectNode> stream = env.addSource(myConsumer);
DataStream<InputTuple_usb_monitor> tupledStream = stream.map(new Json2Tuple());
SingleOutputStreamOperator<AvgCountSum> out = tupledStream
.filter(new FilterFunction<InputTuple_usb_monitor>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(InputTuple_usb_monitor arg0) throws Exception {
return (arg0.usb_code.equals("UMDFHostDeviceArrivalBegin"));
}
})
.timeWindowAll(Time.seconds(5))
.aggregate(new AvgCountAggregate());
out.print();