Posted by
AndreaKinn on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Problems-with-window-function-tp16226p16229.html
KeySelector was exactly what I need. Thank you a lot.
I modified my code in this way and now it works:
DataStream<Harness.KafkaRecord> LCxAccStream = env
.addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
.assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
.map(new MapFunction<Tuple8<String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){
@Override
public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception {
return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5);
}
}).setParallelism(4)
.keyBy(new KeySelector<Harness.KafkaRecord, String>() {
public String getKey(Harness.KafkaRecord record) { return
record.key; }
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
.apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
public void apply(String key,
TimeWindow window,
Iterable<Harness.KafkaRecord> input,
Collector<Harness.KafkaRecord> out)
throws Exception {
ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
for (Harness.KafkaRecord in: input)
list.add(in);
Collections.sort(list);
for(Harness.KafkaRecord output: list)
out.collect(output);
}
});
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/