Count of Grouped DataSet

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Count of Grouped DataSet

nsengupta
Hello Flinksters,

What is the most idiomatic way in Flink to get the count of records grouped  by a Key (the Key can have multiple fields)?

I have referred to this ticket but because it is still open, I can't make out what has been the final decision.

Let's say that we have following records (case class or tuple, whatever):

f1,  f2,  f3,  f4
------------------
1,   1,   2,   "A"
1,   1,   2,   "B"
2,   1,   3,   "A"
3,   1,   4,   "C"

I group this DateSet on a composite key of (f2,f3) and then, I need the count:
([1,2], 2)
([1,3], 1)
([1,4], 1)

I could have gone the way of accepted wisdom of mapping with an extra '1' for every key and then, reducing with a sum operation, but I think it is somewhat low-level than what one is expected to do. Spark has this countByKey operator for such a purpose.

Could someone please nudge me to the right direction?

-- Nirmalya

Reply | Threaded
Open this post in threaded view
|

Re: Count of Grouped DataSet

nsengupta
Hello all,

This is how I have moved ahead with the implementation of finding count of a GroupedDataSet:

val k = envDefault
      .fromElements((1,1,2,"A"),(1,1,2,"B"),(2,1,3,"B"),(3,1,4,"C"))
      .groupBy(1,2)
      .reduceGroup(nextGroup => {
            val asList = nextGroup.toList
        (asList.head._2,asList.head._3,asList.size)
      })

    k.print()


While this produces the expected output alright, I am not sure if this the ideal, idiomatic way to implement what I need. Could you please confirm? If there is a better way, I would like to be wiser of course.

-- Nirmalya
Reply | Threaded
Open this post in threaded view
|

Re: Count of Grouped DataSet

Fabian Hueske-2
Hi Nirmalya,

the solution with List.size() won't use a combiner and won't be efficient for large data sets with large groups.
I would recommend to add a 1 and use GroupedDataSet.sum().

2016-05-01 12:48 GMT+02:00 nsengupta <[hidden email]>:
Hello all,

This is how I have moved ahead with the implementation of finding count of a
GroupedDataSet:

*val k = envDefault
      .fromElements((1,1,2,"A"),(1,1,2,"B"),(2,1,3,"B"),(3,1,4,"C"))
      .groupBy(1,2)
      .reduceGroup(nextGroup => {
            val asList = nextGroup.toList
        (asList.head._2,asList.head._3,asList.size)
      })

    k.print()*

While this produces the expected output alright, I am not sure if this the
ideal, idiomatic way to implement what I need. Could you please confirm? If
there is a better way, I would like to be wiser of course.

-- Nirmalya



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-of-Grouped-DataSet-tp6592p6594.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Count of Grouped DataSet

nsengupta
Hello Fabian,

Thanks for taking time to provide your recommendation, This is how I have implemented:

case class Something(f1: Int,f2: Int,f3: Int,f4: String ) // My application's data structure

    val k = envDefault
      .fromElements(Something(1,1,2,"A"),Something(1,1,2,"B"),Something(2,1,3,"A"),Something(3,1,4,"C"))
      .map(e => (e.f1, e.f2, e.f3, e.f4,1))  // I create a temporary tuple
      .groupBy(1,2)
      .sum(4)
      .map(e => (Something(e._1,e._2,e._3,e._4),e._5))
      .print


The output is
(Something(2,1,3,A),1)
(Something(1,1,2,B),2)
(Something(3,1,4,C),1)


I need to create a temporary tuple, because I need group by fields of the case class; yet, I need to sum the fifth (newly added) field. Somehow, I feel this is clunky!

Is this a preferred way? Is there a better (performant, yet idiomatic) way? Please make me wiser.

-- Nirmalya