GroupedDataset collect

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

GroupedDataset collect

Maximilian Alber
Hi Flinksters,

I tried to call collect on a grouped data set, somehow it did not work. Is this intended? If yes, why?

Code snippet:
    // group a data set according to second field:
    val grouped_ds = cross_ds.groupBy(1)
    println("After groupBy: "+grouped_ds.collect())

Error:

[ant:scalac] /media/alber/datadisk/tmp/flink/code/test/src/main/scala/test.scala:107: error: value collect is not a member of org.apache.flink.api.scala.GroupedDataSet[(Int, Int)]
[ant:scalac]     println("After groupBy: "+grouped_ds.collect())
[ant:scalac]                                          ^
[ant:scalac] one error found


Thanks!
Cheers,
Max
Reply | Threaded
Open this post in threaded view
|

Re: GroupedDataset collect

Chiwan Park
Hi. You cannot collect grouped dataset directly. You can collect grouped data by using reduceGroup method.
Following code is example:

import org.apache.flink.util.Collector
val result = grouped_ds.reduceGroup {
  (in, out: Collector[(Int, Seq[Int])]) => {
    val seq = in.toSeq
    // I assumed the first element in tuple is value and second element is key
    // because you grouped by 1 (same as second element in scala tuple)
    val key = seq(0)._2
    val values = seq.map(_._1)

    out.collect((key, values))
  }
}.collect()

Then you can collect the data as (key1, (values, …), (key2, (values, …), (key3, (values, …), ...

Regards,
Chiwan Park

> On Jun 11, 2015, at 11:01 PM, Maximilian Alber <[hidden email]> wrote:
>
> Hi Flinksters,
>
> I tried to call collect on a grouped data set, somehow it did not work. Is this intended? If yes, why?
>
> Code snippet:
>     // group a data set according to second field:
>     val grouped_ds = cross_ds.groupBy(1)
>     println("After groupBy: "+grouped_ds.collect())
>
> Error:
>
> [ant:scalac] /media/alber/datadisk/tmp/flink/code/test/src/main/scala/test.scala:107: error: value collect is not a member of org.apache.flink.api.scala.GroupedDataSet[(Int, Int)]
> [ant:scalac]     println("After groupBy: "+grouped_ds.collect())
> [ant:scalac]                                          ^
> [ant:scalac] one error found
>
>
> Thanks!
> Cheers,
> Max




Reply | Threaded
Open this post in threaded view
|

Re: GroupedDataset collect

Maximilian Alber
Ok!
Thank you!

Cheers,
Max

On Thu, Jun 11, 2015 at 4:34 PM, Chiwan Park <[hidden email]> wrote:
Hi. You cannot collect grouped dataset directly. You can collect grouped data by using reduceGroup method.
Following code is example:

import org.apache.flink.util.Collector
val result = grouped_ds.reduceGroup {
  (in, out: Collector[(Int, Seq[Int])]) => {
    val seq = in.toSeq
    // I assumed the first element in tuple is value and second element is key
    // because you grouped by 1 (same as second element in scala tuple)
    val key = seq(0)._2
    val values = seq.map(_._1)

    out.collect((key, values))
  }
}.collect()

Then you can collect the data as (key1, (values, …), (key2, (values, …), (key3, (values, …), ...

Regards,
Chiwan Park

> On Jun 11, 2015, at 11:01 PM, Maximilian Alber <[hidden email]> wrote:
>
> Hi Flinksters,
>
> I tried to call collect on a grouped data set, somehow it did not work. Is this intended? If yes, why?
>
> Code snippet:
>     // group a data set according to second field:
>     val grouped_ds = cross_ds.groupBy(1)
>     println("After groupBy: "+grouped_ds.collect())
>
> Error:
>
> [ant:scalac] /media/alber/datadisk/tmp/flink/code/test/src/main/scala/test.scala:107: error: value collect is not a member of org.apache.flink.api.scala.GroupedDataSet[(Int, Int)]
> [ant:scalac]     println("After groupBy: "+grouped_ds.collect())
> [ant:scalac]                                          ^
> [ant:scalac] one error found
>
>
> Thanks!
> Cheers,
> Max