Implement a sort inside the WindowFunction

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Implement a sort inside the WindowFunction

Felipe Gutierrez
Hi all,

I have a word count using flink stream and mey reduce transformations is applying a WindowFunction. I would like that this WindowFunction sort the output of the reduce. Is that possible? So I will sort by key the data set inside the window.

Thanks for your ideas!

Here is my code:

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new Splitter())
                .keyBy(new SumWordSelect()) // select the first value as a key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink server is up
                .reduce(new SumWordsReduce(), new FIlterWindowFunction())
                ;

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FIlterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }



--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Implement a sort inside the WindowFunction

Fabian Hueske-2
Hi Felipe,

Just like the ReduceFunction, the WindowFunction is applied in the context of a single key. So, it will be called for each key and always just see a single record (the reduced record of the key).
You'd have to add a non-keyed window (allWindow) for your sorting WindowFunction.
Note that this function cannot run in parallel.

Best, Fabian

2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I have a word count using flink stream and mey reduce transformations is applying a WindowFunction. I would like that this WindowFunction sort the output of the reduce. Is that possible? So I will sort by key the data set inside the window.

Thanks for your ideas!

Here is my code:

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new Splitter())
                .keyBy(new SumWordSelect()) // select the first value as a key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink server is up
                .reduce(new SumWordsReduce(), new FIlterWindowFunction())
                ;

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FIlterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }



--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez

Reply | Threaded
Open this post in threaded view
|

Re: Implement a sort inside the WindowFunction

Felipe Gutierrez
thanks Fabian,

I am building an example and generating my own fake source to process in Flink. I am going to implement more stuff with keys and event time processing to get more understanding of it.
I guess it is not very usual to use non-keyed windows since it is not running in parallel and it is not possible to split the processing. But I will implement some examples on this to get practice.

Thanks for your replay,
Felipe

On Thu, Mar 15, 2018 at 6:17 AM, Fabian Hueske <[hidden email]> wrote:
Hi Felipe,

Just like the ReduceFunction, the WindowFunction is applied in the context of a single key. So, it will be called for each key and always just see a single record (the reduced record of the key).
You'd have to add a non-keyed window (allWindow) for your sorting WindowFunction.
Note that this function cannot run in parallel.

Best, Fabian

2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <[hidden email]>:
Hi all,

I have a word count using flink stream and mey reduce transformations is applying a WindowFunction. I would like that this WindowFunction sort the output of the reduce. Is that possible? So I will sort by key the data set inside the window.

Thanks for your ideas!

Here is my code:

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new Splitter())
                .keyBy(new SumWordSelect()) // select the first value as a key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink server is up
                .reduce(new SumWordsReduce(), new FIlterWindowFunction())
                ;

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FIlterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }



--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez