Hello all,
I grouped by the input based on it's id to count the number of elements in each group. DataStream<Tuple2<String, Long>> gridWithCount; Upon printing the above datastream it shows with duplicate rows: Output: (1, 1) (1,2) (2,1) (1,3) (2,2)....... Whereas I wanted the distinct rows with final count: Needed Output: (1,3) (2,2).. What could be the way to achieve this. Regards, Subash Basnet |
Hi Subash,
You should also split your elements in windows. If not, Flink emits an element for each incoming record. That is why you have: (1,1) (1,2) (1,3) … Kostas > On Aug 22, 2016, at 5:58 PM, subash basnet <[hidden email]> wrote: > > Hello all, > > I grouped by the input based on it's id to count the number of elements in each group. > DataStream<Tuple2<String, Long>> gridWithCount; > Upon printing the above datastream it shows with duplicate rows: > Output: > (1, 1) > (1,2) > (2,1) > (1,3) > (2,2)....... > > Whereas I wanted the distinct rows with final count: > Needed Output: > (1,3) > (2,2).. > > What could be the way to achieve this. > > > Regards, > Subash Basnet |
In reply to this post by subashbasnet
Hello Kostas, Sorry for late reply. But I couldn't understand how to apply split in datastream, such as in below to get the distinct output stream element with the count after applying group by and reduce. DataStream<Tuple2<String, Long>> gridWithDensity = pointsWithGridCoordinates.map(new AddCountAppender()) .keyBy(2).reduce(new GridPointsCount()).map(new RetrieveGridWithCount()); gridWithDensity.print(); Current Output: Required Output: (33330,1) (33330,3) (33330,2) (00000,4) (00000,1) (00000,2) (00000,3) (33330,3) (00000,4) public static final class GridPointsCount implements ReduceFunction<Tuple4<Point, Grid, String, Long>> { @Override public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, String, Long> val1, Tuple4<Point, Grid, String, Long> val2) { return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, val1.f3 + val2.f3); } } Regards, Subash Basnet On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <[hidden email]> wrote:
|
Hi subash, A stream is infinite, hence it has no notion of "final" count. To get distinct counts you need to define a period (= a window [1] ) over which you count elements and emit a result, by adding a winow operator before the reduce. For example the following will emit distinct counts every 10 minutes over the last 10 minutes period: stream.keyby(2) .window(Time.minutes(10)) .reduce(new GridPointsCount()) On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <[hidden email]> wrote:
|
Sorry I mistyped the code, it should be timeWindow(Time.minutes(10)) instead of window(Time.minutes(10)). On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |