Tumbling window rich functionality

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling window rich functionality

Swapnil Chougule
Hi Team,

I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?

Attaching code snippet what I tried

impressions.map(new LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple, TimeWindow>() {
               
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //setup method
                }

                public void apply(Tuple key, TimeWindow window,
                        Iterable<Tuple2<Tuple2<Integer, Integer>, Long>> input,
                        Collector<Boolean> out) throws Exception {
                    //do processing
                }
               
                @Override
                public void close() throws Exception {
                    //tear down method
                    super.close();
                }
            });

Thanks,
Swapnil
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window rich functionality

Aljoscha Krettek
Hi,
WindowFunction.apply() will be called once for each window so you should be able to do the setup/teardown in there. open() and close() are called at the start of processing, end of processing, respectively.

Cheers,
Aljoscha

On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <[hidden email]> wrote:
Hi Team,

I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?

Attaching code snippet what I tried

impressions.map(new LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple, TimeWindow>() {
               
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //setup method
                }

                public void apply(Tuple key, TimeWindow window,
                        Iterable<Tuple2<Tuple2<Integer, Integer>, Long>> input,
                        Collector<Boolean> out) throws Exception {
                    //do processing
                }
               
                @Override
                public void close() throws Exception {
                    //tear down method
                    super.close();
                }
            });

Thanks,
Swapnil
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window rich functionality

LiZhe
In reply to this post by Swapnil Chougule
I agree with you. I can not use RichWindowFunction on window.apply(new MyWindowFunction).

I think it is not supported right now.

But you can use a RichFlatMapFunction after windowFunction to satisfy your needs, such as stateful values.
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window rich functionality

Swapnil Chougule
In reply to this post by Aljoscha Krettek
Thanks Aljoscha.

Whenever I am using WindowFunction.apply() on keyed stream, apply() will be called once or multiple times (equal to number of keys in that windowed stream)?

Ex:
DataStream<Boolean> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple key, TimeWindow window,
                            Iterable<Tuple2<String, Integer>> input,
                            Collector<Boolean> out) throws Exception {
                     //Some business logic
                    }
                });

Regards,
Swapnil

On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
WindowFunction.apply() will be called once for each window so you should be able to do the setup/teardown in there. open() and close() are called at the start of processing, end of processing, respectively.

Cheers,
Aljoscha

On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <[hidden email]> wrote:
Hi Team,

I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?

Attaching code snippet what I tried

impressions.map(new LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple, TimeWindow>() {
               
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //setup method
                }

                public void apply(Tuple key, TimeWindow window,
                        Iterable<Tuple2<Tuple2<Integer, Integer>, Long>> input,
                        Collector<Boolean> out) throws Exception {
                    //do processing
                }
               
                @Override
                public void close() throws Exception {
                    //tear down method
                    super.close();
                }
            });

Thanks,
Swapnil

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window rich functionality

rmetzger0
Hi,
apply() will be called for each key.

On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule <[hidden email]> wrote:
Thanks Aljoscha.

Whenever I am using WindowFunction.apply() on keyed stream, apply() will be called once or multiple times (equal to number of keys in that windowed stream)?

Ex:
DataStream<Boolean> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple key, TimeWindow window,
                            Iterable<Tuple2<String, Integer>> input,
                            Collector<Boolean> out) throws Exception {
                     //Some business logic
                    }
                });

Regards,
Swapnil

On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
WindowFunction.apply() will be called once for each window so you should be able to do the setup/teardown in there. open() and close() are called at the start of processing, end of processing, respectively.

Cheers,
Aljoscha

On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <[hidden email]> wrote:
Hi Team,

I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?

Attaching code snippet what I tried

impressions.map(new LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple, TimeWindow>() {
               
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //setup method
                }

                public void apply(Tuple key, TimeWindow window,
                        Iterable<Tuple2<Tuple2<Integer, Integer>, Long>> input,
                        Collector<Boolean> out) throws Exception {
                    //do processing
                }
               
                @Override
                public void close() throws Exception {
                    //tear down method
                    super.close();
                }
            });

Thanks,
Swapnil