Hello Flinksters,
What is the most idiomatic way in Flink to get the count of records grouped by a Key (the Key can have multiple fields)? I have referred to this ticket but because it is still open, I can't make out what has been the final decision. Let's say that we have following records (case class or tuple, whatever): f1, f2, f3, f4 ------------------ 1, 1, 2, "A" 1, 1, 2, "B" 2, 1, 3, "A" 3, 1, 4, "C" I group this DateSet on a composite key of (f2,f3) and then, I need the count: ([1,2], 2) ([1,3], 1) ([1,4], 1) I could have gone the way of accepted wisdom of mapping with an extra '1' for every key and then, reducing with a sum operation, but I think it is somewhat low-level than what one is expected to do. Spark has this countByKey operator for such a purpose. Could someone please nudge me to the right direction? -- Nirmalya |
Hello all,
This is how I have moved ahead with the implementation of finding count of a GroupedDataSet: val k = envDefault .fromElements((1,1,2,"A"),(1,1,2,"B"),(2,1,3,"B"),(3,1,4,"C")) .groupBy(1,2) .reduceGroup(nextGroup => { val asList = nextGroup.toList (asList.head._2,asList.head._3,asList.size) }) k.print() While this produces the expected output alright, I am not sure if this the ideal, idiomatic way to implement what I need. Could you please confirm? If there is a better way, I would like to be wiser of course. -- Nirmalya |
Hi Nirmalya, the solution with List.size() won't use a combiner and won't be efficient for large data sets with large groups.2016-05-01 12:48 GMT+02:00 nsengupta <[hidden email]>: Hello all, |
Hello Fabian,
Thanks for taking time to provide your recommendation, This is how I have implemented: case class Something(f1: Int,f2: Int,f3: Int,f4: String ) // My application's data structure val k = envDefault .fromElements(Something(1,1,2,"A"),Something(1,1,2,"B"),Something(2,1,3,"A"),Something(3,1,4,"C")) .map(e => (e.f1, e.f2, e.f3, e.f4,1)) // I create a temporary tuple .groupBy(1,2) .sum(4) .map(e => (Something(e._1,e._2,e._3,e._4),e._5)) The output is (Something(2,1,3,A),1) (Something(1,1,2,B),2) (Something(3,1,4,C),1) I need to create a temporary tuple, because I need group by fields of the case class; yet, I need to sum the fifth (newly added) field. Somehow, I feel this is clunky! Is this a preferred way? Is there a better (performant, yet idiomatic) way? Please make me wiser. -- Nirmalya |
Free forum by Nabble | Edit this page |