Window Stream - Need assistance

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Window Stream - Need assistance

Titus Rakkesh
Dear Friends,
         I have 2 streams of the below data types.

DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;

DataStream<Tuple2<String, Double>> unionReloadsStream;

These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).

So I wrote the following method to do this task.


public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
            DataStream<Tuple3<String, Integer, Double>> activationsStream,
            DataStream<Tuple2<String, Double>> unifiedReloadStream) {
       
        return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
                .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
                .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
                .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
                            Tuple2<String, Double> second) {
                        return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
                    }
                });
    }


and calling as,

DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
       
activationWindowStream.print();


But I couldn't see anything printing.

I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?

Thanks,
Rakkesh
Reply | Threaded
Open this post in threaded view
|

Re: Window Stream - Need assistance

Xingcan Cui
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote:
>
> Dear Friends,
>          I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>        
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>        
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh

Reply | Threaded
Open this post in threaded view
|

Re: Window Stream - Need assistance

Titus Rakkesh
Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote:
>
> Dear Friends,
>          I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh


Reply | Threaded
Open this post in threaded view
|

Re: Window Stream - Need assistance

Xingcan Cui
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 



On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <[hidden email]> wrote:

Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote:
>
> Dear Friends,
>          I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh



Reply | Threaded
Open this post in threaded view
|

Re: Window Stream - Need assistance

Titus Rakkesh
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the elements coming with splittedActivationTuple with a 24 hour expiry and then do operations on that when elements coming with stream "unionReloadsStream" (bigger set).

On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 



On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <[hidden email]> wrote:

Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote:
>
> Dear Friends,
>          I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh




Reply | Threaded
Open this post in threaded view
|

Re: Window Stream - Need assistance

vino yang
Hi Rakkesh,

As Xingcan said, the trigger is required by the window to FIRE, you can use time window (contains a inner trigger) or (ProcessFunction + State + Timer).

Thanks, vino.

2018-07-18 21:44 GMT+08:00 Titus Rakkesh <[hidden email]>:
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the elements coming with splittedActivationTuple with a 24 hour expiry and then do operations on that when elements coming with stream "unionReloadsStream" (bigger set).

On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 



On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <[hidden email]> wrote:

Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[hidden email]> wrote:
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote:
>
> Dear Friends,
>          I have 2 streams of the below data types.
>
> DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>
> DataStream<Tuple2<String, Double>> unionReloadsStream;
>
> These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream"  will receive more data than "splittedActivationTuple". I need to store  "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field).
>
> So I wrote the following method to do this task.
>
>
> public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload(
>             DataStream<Tuple3<String, Integer, Double>> activationsStream,
>             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>         
>         return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector())
>                 .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create())
>                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>                     private static final long serialVersionUID = 1L;
>                     @Override
>                     public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first,
>                             Tuple2<String, Double> second) {
>                         return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1);
>                     }
>                 });
>     }
>
>
> and calling as,
>
> DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
>         
> activationWindowStream.print();
>
>
> But I couldn't see anything printing.
>
> I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if  unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing?
>
> Thanks,
> Rakkesh