how to get rid of duplicate rows group by in DataStream

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

how to get rid of duplicate rows group by in DataStream

subashbasnet
Hello all, 

I grouped by the input based on it's id to count the number of elements in each group. 
DataStream<Tuple2<String, Long>> gridWithCount;
Upon printing the above datastream it shows with duplicate rows:
Output: 
(1, 1)
(1,2)
(2,1)
(1,3)
(2,2).......

Whereas I wanted the distinct rows with final count:
Needed Output:
(1,3)
(2,2)..

What could be the way to achieve this. 


Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: how to get rid of duplicate rows group by in DataStream

Kostas Kloudas
Hi Subash,

You should also split your elements in windows.
If not, Flink emits an element for each incoming record.
That is why you have:

(1,1)
(1,2)
(1,3)



Kostas

> On Aug 22, 2016, at 5:58 PM, subash basnet <[hidden email]> wrote:
>
> Hello all,
>
> I grouped by the input based on it's id to count the number of elements in each group.
> DataStream<Tuple2<String, Long>> gridWithCount;
> Upon printing the above datastream it shows with duplicate rows:
> Output:
> (1, 1)
> (1,2)
> (2,1)
> (1,3)
> (2,2).......
>
> Whereas I wanted the distinct rows with final count:
> Needed Output:
> (1,3)
> (2,2)..
>
> What could be the way to achieve this.
>
>
> Regards,
> Subash Basnet

Reply | Threaded
Open this post in threaded view
|

Re: how to get rid of duplicate rows group by in DataStream

subashbasnet
In reply to this post by subashbasnet
Hello Kostas,

Sorry for late reply. But I couldn't understand how to apply split in datastream, such as in below to get the distinct output stream element with the count after applying group by and reduce. 

DataStream<Tuple2<String, Long>> gridWithDensity = pointsWithGridCoordinates.map(new AddCountAppender())
.keyBy(2).reduce(new GridPointsCount()).map(new RetrieveGridWithCount());
gridWithDensity.print();

Current Output:                                                                 Required Output:
(33330,1)                                                                          (33330,3)
(33330,2)                                                                          (00000,4)
(00000,1)
(00000,2)
(00000,3)
(33330,3)
(00000,4)

public static final class GridPointsCount implements ReduceFunction<Tuple4<Point, Grid, String, Long>> {
@Override
public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, String, Long> val1,
Tuple4<Point, Grid, String, Long> val2) {
return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, val1.f3 + val2.f3);
}
}


Regards,
Subash Basnet

On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

You should also split your elements in windows.
If not, Flink emits an element for each incoming record.
That is why you have:

(1,1)
(1,2)
(1,3)



Kostas

> On Aug 22, 2016, at 5:58 PM, subash basnet <[hidden email]> wrote:
>
> Hello all,
>
> I grouped by the input based on it's id to count the number of elements in each group.
> DataStream<Tuple2<String, Long>> gridWithCount;
> Upon printing the above datastream it shows with duplicate rows:
> Output:
> (1, 1)
> (1,2)
> (2,1)
> (1,3)
> (2,2).......
>
> Whereas I wanted the distinct rows with final count:
> Needed Output:
> (1,3)
> (2,2)..
>
> What could be the way to achieve this.
>
>
> Regards,
> Subash Basnet



Reply | Threaded
Open this post in threaded view
|

Re: how to get rid of duplicate rows group by in DataStream

Yassin Marzouki
Hi subash,

A stream is infinite, hence it has no notion of "final" count. To get distinct counts you need to define a period (= a window [1] ) over which you count elements and emit a result, by adding a winow operator before the reduce.
For example the following will emit distinct counts every 10 minutes over the last 10 minutes period:

stream.keyby(2)
      .window(Time.minutes(10))
      .reduce(new GridPointsCount())



On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <[hidden email]> wrote:
Hello Kostas,

Sorry for late reply. But I couldn't understand how to apply split in datastream, such as in below to get the distinct output stream element with the count after applying group by and reduce. 

DataStream<Tuple2<String, Long>> gridWithDensity = pointsWithGridCoordinates.map(new AddCountAppender())
.keyBy(2).reduce(new GridPointsCount()).map(new RetrieveGridWithCount());
gridWithDensity.print();

Current Output:                                                                 Required Output:
(33330,1)                                                                          (33330,3)
(33330,2)                                                                          (00000,4)
(00000,1)
(00000,2)
(00000,3)
(33330,3)
(00000,4)

public static final class GridPointsCount implements ReduceFunction<Tuple4<Point, Grid, String, Long>> {
@Override
public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, String, Long> val1,
Tuple4<Point, Grid, String, Long> val2) {
return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, val1.f3 + val2.f3);
}
}


Regards,
Subash Basnet

On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

You should also split your elements in windows.
If not, Flink emits an element for each incoming record.
That is why you have:

(1,1)
(1,2)
(1,3)



Kostas

> On Aug 22, 2016, at 5:58 PM, subash basnet <[hidden email]> wrote:
>
> Hello all,
>
> I grouped by the input based on it's id to count the number of elements in each group.
> DataStream<Tuple2<String, Long>> gridWithCount;
> Upon printing the above datastream it shows with duplicate rows:
> Output:
> (1, 1)
> (1,2)
> (2,1)
> (1,3)
> (2,2).......
>
> Whereas I wanted the distinct rows with final count:
> Needed Output:
> (1,3)
> (2,2)..
>
> What could be the way to achieve this.
>
>
> Regards,
> Subash Basnet




Reply | Threaded
Open this post in threaded view
|

Re: how to get rid of duplicate rows group by in DataStream

Yassin Marzouki
Sorry I mistyped the code, it should be timeWindow(Time.minutes(10)) instead of window(Time.minutes(10)).

On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui <[hidden email]> wrote:
Hi subash,

A stream is infinite, hence it has no notion of "final" count. To get distinct counts you need to define a period (= a window [1] ) over which you count elements and emit a result, by adding a winow operator before the reduce.
For example the following will emit distinct counts every 10 minutes over the last 10 minutes period:

stream.keyby(2)
      .window(Time.minutes(10))
      .reduce(new GridPointsCount())



On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <[hidden email]> wrote:
Hello Kostas,

Sorry for late reply. But I couldn't understand how to apply split in datastream, such as in below to get the distinct output stream element with the count after applying group by and reduce. 

DataStream<Tuple2<String, Long>> gridWithDensity = pointsWithGridCoordinates.map(new AddCountAppender())
.keyBy(2).reduce(new GridPointsCount()).map(new RetrieveGridWithCount());
gridWithDensity.print();

Current Output:                                                                 Required Output:
(33330,1)                                                                          (33330,3)
(33330,2)                                                                          (00000,4)
(00000,1)
(00000,2)
(00000,3)
(33330,3)
(00000,4)

public static final class GridPointsCount implements ReduceFunction<Tuple4<Point, Grid, String, Long>> {
@Override
public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, String, Long> val1,
Tuple4<Point, Grid, String, Long> val2) {
return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, val1.f3 + val2.f3);
}
}


Regards,
Subash Basnet

On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

You should also split your elements in windows.
If not, Flink emits an element for each incoming record.
That is why you have:

(1,1)
(1,2)
(1,3)



Kostas

> On Aug 22, 2016, at 5:58 PM, subash basnet <[hidden email]> wrote:
>
> Hello all,
>
> I grouped by the input based on it's id to count the number of elements in each group.
> DataStream<Tuple2<String, Long>> gridWithCount;
> Upon printing the above datastream it shows with duplicate rows:
> Output:
> (1, 1)
> (1,2)
> (2,1)
> (1,3)
> (2,2).......
>
> Whereas I wanted the distinct rows with final count:
> Needed Output:
> (1,3)
> (2,2)..
>
> What could be the way to achieve this.
>
>
> Regards,
> Subash Basnet