Problems with window function

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems with window function

AndreaKinn
Hi all,
I'm trying to implement a time ordering inside a stream using window
function. Then my purposes is to order the element inside a tumbling window.

This is my code (written following the doc):

DataStream<Harness.KafkaRecord> LCxAccStream = env
                                .addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
                                .assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
                                .map(new MapFunction<Tuple8&lt;String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){

                                        @Override
                                        public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception {
                                                return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5);
                                        }
                                }).setParallelism(4)
                                .keyBy("key")
                                .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
                                       
                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                        throws Exception {
                                               
                                                 ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
                                                 
                                                 for (Harness.KafkaRecord in: input)
                                                  list.add(in);
                                                 Collections.sort(list);
                                                 for(Harness.KafkaRecord output: list)
                                                         out.collect(output);
                                        }
                                });

Clearly I have defined a comparator for Harness.KafkaRecord object.
Unfortunately the method .apply(...) shows the following error:

/The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>) in
the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not
applicable for the arguments (new
WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow>(){})
/

Honestly I don't understand why I can't use String instead of Tuple. Btw my
key type is a String and moreover I can't understand what could means the
type Tuple in this case.

Furthermore I noted that in the example here:  WindowFunction - The Generic
Case
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#keyed-vs-non-keyed-windows>  
it use a String type as key of the KeyedStream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Problems with window function

Tony Wei
Hi Andrea,

AFAIK, `keyBy` function you used will wrap all keys you selected into `Tuple`. You can use `Tuple.f0` to get your key, whose type will be `String`.
If you want the KeyedStream has String Type for its key, you can use `KeySelector` in keyBy function. [1]
Hope this will help you.

Best Regards,
Tony Wei


2017-10-15 7:00 GMT+08:00 AndreaKinn <[hidden email]>:
Hi all,
I'm trying to implement a time ordering inside a stream using window
function. Then my purposes is to order the element inside a tumbling window.

This is my code (written following the doc):

DataStream<Harness.KafkaRecord> LCxAccStream = env
                                .addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
                                .assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
                                .map(new MapFunction<Tuple8&lt;String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){

                                        @Override
                                        public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception {
                                                return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5);
                                        }
                                }).setParallelism(4)
                                .keyBy("key")
                                .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {

                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                        throws Exception {

                                                 ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();

                                                 for (Harness.KafkaRecord in: input)
                                                         list.add(in);
                                                 Collections.sort(list);
                                                 for(Harness.KafkaRecord output: list)
                                                         out.collect(output);
                                        }
                                });

Clearly I have defined a comparator for Harness.KafkaRecord object.
Unfortunately the method .apply(...) shows the following error:

/The method apply(WindowFunction<Harness.KafkaRecord,R,Tuple,TimeWindow>) in
the type WindowedStream<Harness.KafkaRecord,Tuple,TimeWindow> is not
applicable for the arguments (new
WindowFunction<Harness.KafkaRecord,Harness.KafkaRecord,String,TimeWindow>(){})
/

Honestly I don't understand why I can't use String instead of Tuple. Btw my
key type is a String and moreover I can't understand what could means the
type Tuple in this case.

Furthermore I noted that in the example here:  WindowFunction - The Generic
Case
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#keyed-vs-non-keyed-windows>
it use a String type as key of the KeyedStream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Problems with window function

AndreaKinn
KeySelector was exactly what I need. Thank you a lot.
I modified my code in this way and now it works:

DataStream<Harness.KafkaRecord> LCxAccStream = env
                                .addSource(new FlinkKafkaConsumer010<>("LCacc", new
CustomDeserializer(), properties)).setParallelism(4)
                                .assignTimestampsAndWatermarks(new
CustomTimestampExtractor()).setParallelism(4)
                                .map(new MapFunction<Tuple8&lt;String, String, Date, String, String,
Double, Double, Double>, Harness.KafkaRecord>(){

                                        @Override
                                        public Harness.KafkaRecord map(Tuple8<String, String, Date, String,
String, Double, Double, Double> value) throws Exception {
                                                return new Harness.KafkaRecord(value.f0, value.f1, value.f2, value.f3,
value.f4, value.f5);
                                        }
                                }).setParallelism(4)
                                .keyBy(new KeySelector<Harness.KafkaRecord, String>() {
                                     public String getKey(Harness.KafkaRecord record) { return
record.key; }
                                   })
                                .window(TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)))
                                .apply(new WindowFunction<Harness.KafkaRecord, Harness.KafkaRecord,
String, TimeWindow>() {
                                       
                                        public void apply(String key,
                                                        TimeWindow window,
                                                        Iterable<Harness.KafkaRecord> input,
                                                        Collector<Harness.KafkaRecord> out)
                                                        throws Exception {
                                               
                                                 ArrayList<Harness.KafkaRecord> list = new
ArrayList<Harness.KafkaRecord>();
                                                 
                                                 for (Harness.KafkaRecord in: input)
                                                  list.add(in);
                                                 Collections.sort(list);
                                                 for(Harness.KafkaRecord output: list)
                                                         out.collect(output);
                                        }
                                });



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/