Maxby() and KeyBy() question

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Maxby() and KeyBy() question

juanramallo80
Hi again!

I am working with two DataStreams, I want to get the maximun value from each pair of them, for example:

//Informacion (matchName, LocalOdd, AwayOdd)
       
        Informacion info1= new Informacion("Match1", 1.10, 3.22);
        Informacion info2= new Informacion("Match2", 2.11, 1.10);
        Informacion info3= new Informacion("Match3", 4.10, 1.05);
       
        Informacion info11= new Informacion("Match1", 1.80, 2.20);
        Informacion info22= new Informacion("Match2", 3.10, 1.15);
        Informacion info33= new Informacion("Match3", 2.12, 1.25);
       
       
        DataStream<Informacion> src = see.fromElements(info1,info2, info3);
        DataStream<Informacion> src2 = see.fromElements(info11,info22,info33);
        DataStream<Informacion> src3= src.union(src2);
       
        DataStream<Informacion> maxLocal = src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
       
        maxLocal.print();



Let's suppose that those are tennis matches with their names and their bet odds, and the name of the matches are the same on both streams, I mean Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal - Roger Federer").


I want to get the maximun localOdd from matches with the same name, the result of my code is:


1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]

It seems like it is taking the biggest value from all the matches and not by keyed matches


I am looking for this:


Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]



How can I get it?


Thanks in advanced




Reply | Threaded
Open this post in threaded view
|

Re: Maxby() and KeyBy() question

Fabian Hueske-2
Hi,

you are computing a running aggregate, i.e., you're getting one output record for each input record and the output record is the record with the largest value observed so far.
If the record with the largest value is the first, the record is sent out another time. This is what happened with Match3 in your example.

There are two ways to compute aggregates on streams: 1) a running aggregate as you just did, or 2) a windowed aggregate.
For a windowed aggregate, you need to need to specify a window. The window can be time or count based.
The following blog post should be a good introduction into Flink's window support [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-06-09 14:36 GMT+02:00 iñaki williams <[hidden email]>:
Hi again!

I am working with two DataStreams, I want to get the maximun value from each pair of them, for example:

//Informacion (matchName, LocalOdd, AwayOdd)
       
        Informacion info1= new Informacion("Match1", 1.10, 3.22);
        Informacion info2= new Informacion("Match2", 2.11, 1.10);
        Informacion info3= new Informacion("Match3", 4.10, 1.05);
       
        Informacion info11= new Informacion("Match1", 1.80, 2.20);
        Informacion info22= new Informacion("Match2", 3.10, 1.15);
        Informacion info33= new Informacion("Match3", 2.12, 1.25);
       
       
        DataStream<Informacion> src = see.fromElements(info1,info2, info3);
        DataStream<Informacion> src2 = see.fromElements(info11,info22,info33);
        DataStream<Informacion> src3= src.union(src2);
       
        DataStream<Informacion> maxLocal = src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
       
        maxLocal.print();



Let's suppose that those are tennis matches with their names and their bet odds, and the name of the matches are the same on both streams, I mean Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal - Roger Federer").


I want to get the maximun localOdd from matches with the same name, the result of my code is:


1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]

It seems like it is taking the biggest value from all the matches and not by keyed matches


I am looking for this:


Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]



How can I get it?


Thanks in advanced





Reply | Threaded
Open this post in threaded view
|

Re: Maxby() and KeyBy() question

juanramallo80
Understood!

I have created a WindowStream and now it is working. Thanks !


El jueves, 9 de junio de 2016, Fabian Hueske <[hidden email]> escribió:
Hi,

you are computing a running aggregate, i.e., you're getting one output record for each input record and the output record is the record with the largest value observed so far.
If the record with the largest value is the first, the record is sent out another time. This is what happened with Match3 in your example.

There are two ways to compute aggregates on streams: 1) a running aggregate as you just did, or 2) a windowed aggregate.
For a windowed aggregate, you need to need to specify a window. The window can be time or count based.
The following blog post should be a good introduction into Flink's window support [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-06-09 14:36 GMT+02:00 iñaki williams <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;juanramallo80@gmail.com&#39;);" target="_blank">juanramallo80@...>:
Hi again!

I am working with two DataStreams, I want to get the maximun value from each pair of them, for example:

//Informacion (matchName, LocalOdd, AwayOdd)
       
        Informacion info1= new Informacion("Match1", 1.10, 3.22);
        Informacion info2= new Informacion("Match2", 2.11, 1.10);
        Informacion info3= new Informacion("Match3", 4.10, 1.05);
       
        Informacion info11= new Informacion("Match1", 1.80, 2.20);
        Informacion info22= new Informacion("Match2", 3.10, 1.15);
        Informacion info33= new Informacion("Match3", 2.12, 1.25);
       
       
        DataStream<Informacion> src = see.fromElements(info1,info2, info3);
        DataStream<Informacion> src2 = see.fromElements(info11,info22,info33);
        DataStream<Informacion> src3= src.union(src2);
       
        DataStream<Informacion> maxLocal = src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
       
        maxLocal.print();



Let's suppose that those are tennis matches with their names and their bet odds, and the name of the matches are the same on both streams, I mean Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal - Roger Federer").


I want to get the maximun localOdd from matches with the same name, the result of my code is:


1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
1> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]

It seems like it is taking the biggest value from all the matches and not by keyed matches


I am looking for this:


Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]



How can I get it?


Thanks in advanced