Hi all,
I'm trying to implement a time ordering inside a stream using window function. Then my purposes is to order the element inside a tumbling window. This is my code (written following the doc): DataStream<Harness.KafkaRecord> LCxAccStream = env .addSource(new FlinkKafkaConsumer010<>("LCacc", new CustomDeserializer(), properties)).setParallelism(4) .assignTimestampsAndWatermarks(new CustomTimestampExtractor()).setParallelism(4) .map(new MapFunction<Tuple8<String, String, Date, String, String, Double, Double, Double>, Harness.KafkaRecord>(){ @Override public Harness.KafkaRecord map(Tuple8<String, String, Date, String, String, Double, Double, Double> value) throws Exception { return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3, value.f4, value.f5); } }).setParallelism(4) .keyBy("key") .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE))) .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord, String, TimeWindow>() { public void apply(String key, TimeWindow window, Iterable<Harness.KafkaRecord> input, Collector<Harness.KafkaRecord> out) throws Exception { ArrayList<Harness.KafkaRecord> list = new ArrayList<Harness.KafkaRecord>(); for (Harness.KafkaRecord in: input) list.add(in); Collections.sort(list); for(Harness.KafkaRecord output: list) out.collect(output); } }); Clearly I have defined a comparator for Harness.KafkaRecord object. Unfortunately the method .apply(...) shows the following error: /The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>) in the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not applicable for the arguments (new WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow>(){}) / Honestly I don't understand why I can't use String instead of Tuple. Btw my key type is a String and moreover I can't understand what could means the type Tuple in this case. Furthermore I noted that in the example here: WindowFunction - The Generic Case <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#keyed-vs-non-keyed-windows> it use a String type as key of the KeyedStream. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, AFAIK, `keyBy` function you used will wrap all keys you selected into `Tuple`. You can use `Tuple.f0` to get your key, whose type will be `String`. If you want the KeyedStream has String Type for its key, you can use `KeySelector` in keyBy function. [1] Hope this will help you. Best Regards, Tony Wei 2017-10-15 7:00 GMT+08:00 AndreaKinn <[hidden email]>: Hi all, |
KeySelector was exactly what I need. Thank you a lot.
I modified my code in this way and now it works: DataStream<Harness.KafkaRecord> LCxAccStream = env .addSource(new FlinkKafkaConsumer010<>("LCacc", new CustomDeserializer(), properties)).setParallelism(4) .assignTimestampsAndWatermarks(new CustomTimestampExtractor()).setParallelism(4) .map(new MapFunction<Tuple8<String, String, Date, String, String, Double, Double, Double>, Harness.KafkaRecord>(){ @Override public Harness.KafkaRecord map(Tuple8<String, String, Date, String, String, Double, Double, Double> value) throws Exception { return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3, value.f4, value.f5); } }).setParallelism(4) .keyBy(new KeySelector<Harness.KafkaRecord, String>() { public String getKey(Harness.KafkaRecord record) { return record.key; } }) .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE))) .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord, String, TimeWindow>() { public void apply(String key, TimeWindow window, Iterable<Harness.KafkaRecord> input, Collector<Harness.KafkaRecord> out) throws Exception { ArrayList<Harness.KafkaRecord> list = new ArrayList<Harness.KafkaRecord>(); for (Harness.KafkaRecord in: input) list.add(in); Collections.sort(list); for(Harness.KafkaRecord output: list) out.collect(output); } }); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |