Return unique counter using groupReduceFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Return unique counter using groupReduceFunction

Biplob Biswas
Hi,

I am using a groupreduce function to aggregate the content of the objects but at the same time i need to return a unique counter from the function but my attempts are failing and the identifiers are somehow very random and getting duplicated.

Following is the part of my code which is supposed to generate a unique counter and return it with out.collect.


        public static class sumReducer implements
        GroupReduceFunction<Tuple2<Integer, Point>, Tuple5<Integer,Point, Point, Long, Long>> {

                double sum[] = null;
                double sumOfSquare[] = null;
                long timestamp = 0;
                @Override
                public void reduce(Iterable<Tuple2<Integer, Point>> in, Collector<Tuple5<Integer,Point, Point, Long, Long>> out)
                                throws Exception {
                       
                        int id = 0;
                        long count = 0;
                        boolean flag = true;
                        for(Tuple2<Integer, Point> i:in)
                        {
                                if(flag)
                                {
                                        timestamp++;
                                        System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );
                                        sum = new double[i.f1.pt.length];
                                        sumOfSquare = new double[sum.length];
                                        id = i.f0;
                                        for(int j=0;j<sum.length;j++)
                                        {
                                                sum[j] = i.f1.pt[j];
                                                sumOfSquare[j] = i.f1.pt[j] * i.f1.pt[j];
                                        }
                                        flag = false;
                                }
                                else
                                {
                                        int len = i.f1.pt.length;
                                        for(int j=0;j<len;j++)
                                        {
                                                sum[j] += i.f1.pt[j];
                                                sumOfSquare[j] += (i.f1.pt[j] * i.f1.pt[j]);
                                        }
                                }
                                count++;
                        }
                        out.collect(new Tuple5<Integer,Point, Point, Long, Long>(id,new Point(sum), new Point(sumOfSquare),count, timestamp));
                }

I want the timestamp to be unique, but even though the code "System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes once for each of the identifier (given by i.f0) by which it is grouped and then the groupReducce function is called still I get the following output for the above println statement.

uniqueid: 2, t: 1
uniqueid: 1, t: 1
uniqueid: 7, t: 2
uniqueid: 9, t: 3
uniqueid: 6, t: 2
uniqueid: 3, t: 1
uniqueid: 5, t: 2
uniqueid: 8, t: 3

I dont really get why I am getting this discrepancy, probably I am missing some Flink concept, I am relatively very new to the flink platform and any help is appreciated. Thanks a lot.

Thanks and Regards
Reply | Threaded
Open this post in threaded view
|

Re: Return unique counter using groupReduceFunction

Fabian Hueske-2
Hi Biplob,

Flink is a distributed, data parallel system which means that there are several instances of you ReduceFunction running in parallel, each with its own timestamp counter.
If you want to have a unique timestamp, you have to set the parallelism of the reduce operator to 1, but then the program might become inefficient.

Maybe DataSetUtils.zipWithIndex() or DataSetUtils.zipWithUniqueId() are helpful for you use case.

Best, Fabian


2016-04-26 17:12 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi,

I am using a groupreduce function to aggregate the content of the objects
but at the same time i need to return a unique counter from the function but
my attempts are failing and the identifiers are somehow very random and
getting duplicated.

Following is the part of my code which is supposed to generate a unique
counter and return it with out.collect.


        public static class sumReducer implements
        GroupReduceFunction<Tuple2&lt;Integer, Point>, Tuple5<Integer,Point, Point,
Long, Long>> {

                double sum[] = null;
                double sumOfSquare[] = null;
                long timestamp = 0;
                @Override
                public void reduce(Iterable<Tuple2&lt;Integer, Point>> in,
Collector<Tuple5&lt;Integer,Point, Point, Long, Long>> out)
                                throws Exception {

                        int id = 0;
                        long count = 0;
                        boolean flag = true;
                        for(Tuple2<Integer, Point> i:in)
                        {
                                if(flag)
                                {
                                        timestamp++;
                                        System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );
                                        sum = new double[i.f1.pt.length];
                                        sumOfSquare = new double[sum.length];
                                        id = i.f0;
                                        for(int j=0;j<sum.length;j++)
                                        {
                                                sum[j] = i.f1.pt[j];
                                                sumOfSquare[j] = i.f1.pt[j] * i.f1.pt[j];
                                        }
                                        flag = false;
                                }
                                else
                                {
                                        int len = i.f1.pt.length;
                                        for(int j=0;j&lt;len;j++)
                                        {
                                                sum[j] += i.f1.pt[j];
                                                sumOfSquare[j] += (i.f1.pt[j] * i.f1.pt[j]);
                                        }
                                }
                                count++;
                        }
                        out.collect(new Tuple5&lt;Integer,Point, Point, Long, Long>(id,new
Point(sum), new Point(sumOfSquare),count, timestamp));
                }

I want the timestamp to be unique, but even though the code
"System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
once for each of the identifier (given by i.f0) by which it is grouped and
then the groupReducce function is called still I get the following output
for the above println statement.

uniqueid: 2, t: 1
uniqueid: 1, t: 1
uniqueid: 7, t: 2
uniqueid: 9, t: 3
uniqueid: 6, t: 2
uniqueid: 3, t: 1
uniqueid: 5, t: 2
uniqueid: 8, t: 3

I dont really get why I am getting this discrepancy, probably I am missing
some Flink concept, I am relatively very new to the flink platform and any
help is appreciated. Thanks a lot.

Thanks and Regards



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.