Applying the same operator twice on a windowed stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Applying the same operator twice on a windowed stream

Abdul Salam Shaikh
Hi everyone, 

I have a window definition like this at the moment in snapshot version 1.2.0: 

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new RawEventTransformer());                                                                      
        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
           }
        });
      
        DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
        flatCorrectedStream.print();

My apply function generally corrects the raw event streams. 

I want to evaluate the corrected stream and generate a stream of EvaluatedWindowObjectsStream.

However,  using the same operator twice is not an allowed: 

 DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
                                   .apply(new WindowEvaluater());

I am looking for options where I can achieve the above case. I am looking to keep the logic of correcting the streams and evaluating the streams separately. Hence, the above case rises. 

​Thanks!​


Reply | Threaded
Open this post in threaded view
|

Re: Applying the same operator twice on a windowed stream

Fabian Hueske-2
Hi,

the window operation is completed after you called apply the first time.
The result is a regular DataStream.

I assume your TrafficWindow emits multiple records. Otherwise, you'd probably apply a simple MapFunction after the window.
So you are looking for a way to iterate over all values returned by a single TrafficWindow call.

I think the easiest way would be to emit a single record from TrafficWindow that contains all original records and to unnest and evaluate the records in a following FlatMapFucntion.
If that does not work, you would need to define another window.

Best, Fabian


2017-01-27 21:27 GMT+01:00 Abdul Salam Shaikh <[hidden email]>:
Hi everyone, 

I have a window definition like this at the moment in snapshot version 1.2.0: 

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new RawEventTransformer());                                                                      
        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
           }
        });
      
        DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
        flatCorrectedStream.print();

My apply function generally corrects the raw event streams. 

I want to evaluate the corrected stream and generate a stream of EvaluatedWindowObjectsStream.

However,  using the same operator twice is not an allowed: 

 DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
                                   .apply(new WindowEvaluater());

I am looking for options where I can achieve the above case. I am looking to keep the logic of correcting the streams and evaluating the streams separately. Hence, the above case rises. 

​Thanks!​



Reply | Threaded
Open this post in threaded view
|

Re: Applying the same operator twice on a windowed stream

Abdul Salam Shaikh
Yes, your assumption is right. My TrafficWindow is emitting multiple records and I am looking for a way to iterate over these values and emit another set of multiple records(which would be the computed values from the previous stream).

Thanks a lot for your input Mr. Hueske :) 

On Fri, Jan 27, 2017 at 9:55 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

the window operation is completed after you called apply the first time.
The result is a regular DataStream.

I assume your TrafficWindow emits multiple records. Otherwise, you'd probably apply a simple MapFunction after the window.
So you are looking for a way to iterate over all values returned by a single TrafficWindow call.

I think the easiest way would be to emit a single record from TrafficWindow that contains all original records and to unnest and evaluate the records in a following FlatMapFucntion.
If that does not work, you would need to define another window.

Best, Fabian


2017-01-27 21:27 GMT+01:00 Abdul Salam Shaikh <[hidden email]>:
Hi everyone, 

I have a window definition like this at the moment in snapshot version 1.2.0: 

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new RawEventTransformer());                                                                      
        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
           }
        });
      
        DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
        flatCorrectedStream.print();

My apply function generally corrects the raw event streams. 

I want to evaluate the corrected stream and generate a stream of EvaluatedWindowObjectsStream.

However,  using the same operator twice is not an allowed: 

 DataStream<FlatObject> flatCorrectedStream = 
                            keyStream
                            .window(GlobalWindows.create())
                            .trigger(new WindowCustomTrigger())
                            .evictor(new WindowEvictor())
                            .apply(new TrafficWindow());
                                   .apply(new WindowEvaluater());

I am looking for options where I can achieve the above case. I am looking to keep the logic of correcting the streams and evaluating the streams separately. Hence, the above case rises. 

​Thanks!​






--
Thanks & Regards,

Abdul Salam Shaikh