Hey,
I have some questions to aggregate functions such as max or min. Take the following example:
//create Stream with event time where Data contains an ID, a timestamp and a temperature value
DataStream<Data> oneStream = env.fromElements(
new Data(123, new Date(116, 8,8,11,11,11), 5),
new Data(124, new Date(116, 8,8,12,10,11), 8),
new Data(125, new Date(116, 8,8,12,12,11), 10),
new Data(126, new Date(116, 8,8,12,15,11), 2))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Data>() {
@Override
public long extractAscendingTimestamp(final Data data) {
return data.getTimestamp().getTime();
}
});
//calluclate max value of temperature per hour
DataStream<Data> maxStream = WatermarkStream
.keyBy("id")
.timeWindow(Time.hours(1))
.max("temp");
maxStream.print();
Here are the questions I ran into:
1.)
Why does the resulting stream “maxStream” have to be of type Data? From the documentation and the difference to maxBy I would expect the resulting stream to be of type Integer?
2.)
Executing the code as it is above, I would expect the printed result be the data with ID 124 and ID 125. However, the execution prints all 4 data sets. Did I totally get this example wrong? How would the code
need to look, to get the expected result?
3.)
Can you point me to a good example for using time windows and aggregates? I couldn’t find one that explains the above questions.
Thanks for your help and best wishes,
Claudia
Free forum by Nabble | Edit this page |