Hi. You cannot collect grouped dataset directly. You can collect grouped data by using reduceGroup method.
Following code is example:
import org.apache.flink.util.Collector
val result = grouped_ds.reduceGroup {
(in, out: Collector[(Int, Seq[Int])]) => {
val seq = in.toSeq
// I assumed the first element in tuple is value and second element is key
// because you grouped by 1 (same as second element in scala tuple)
val key = seq(0)._2
val values = seq.map(_._1)
out.collect((key, values))
}
}.collect()
Then you can collect the data as (key1, (values, …), (key2, (values, …), (key3, (values, …), ...
Regards,
Chiwan Park
> On Jun 11, 2015, at 11:01 PM, Maximilian Alber <[hidden email]> wrote:
>
> Hi Flinksters,
>
> I tried to call collect on a grouped data set, somehow it did not work. Is this intended? If yes, why?
>
> Code snippet:
> // group a data set according to second field:
> val grouped_ds = cross_ds.groupBy(1)
> println("After groupBy: "+grouped_ds.collect())
>
> Error:
>
> [ant:scalac] /media/alber/datadisk/tmp/flink/code/test/src/main/scala/test.scala:107: error: value collect is not a member of org.apache.flink.api.scala.GroupedDataSet[(Int, Int)]
> [ant:scalac] println("After groupBy: "+grouped_ds.collect())
> [ant:scalac] ^
> [ant:scalac] one error found
>
>
> Thanks!
> Cheers,
> Max
Free forum by Nabble | Edit this page |