sorting groups

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

sorting groups

Michele Bertoni
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
        …

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


thanks!
cheers
Michele
Reply | Threaded
Open this post in threaded view
|

Re: sorting groups

Fabian Hueske-2
Hi,

the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key selector function for the sorting key if you also used a selector function for the grouping key.

In Scala the use of key selector functions is often not very obvious.

If you post the groupBy().sortGroup() command and the input type, I can help you getting it right.

Cheers, Fabian

2015-06-16 23:37 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
        …

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


thanks!
cheers
Michele

Reply | Threaded
Open this post in threaded view
|

Re: sorting groups

Michele Bertoni
Hi Fabian,
My dataset is of this type
RegionType (Long, String, Long, Long, Char, Array[GValue])
Where GValue is a case class implemented by
GString(v:String)
GDouble(v:Double)

I have two case of sorting:
In the first (topk) i have to group by the first field of the regions and sort by a set of fields of the GValue array

In the second (topg) i have to sort by the first field of the regions and by a set of fields of the array, then sort by one field of the array

For grouping i am using the groupby function with a function as parameter that creates the hash of the desired fields, something like
ds.groupby((r:RegionType) =>
  s = new stringBuilder
  s.append(r._1)
  grouping.init.foreach((index:int) =>
    s.append("#")
    s.append(r._6(index))
  )
  Md5.hash(s.toString)
)

Then i sort it using (in the topg case, the second)
.sortGroup(((r:RegionType)=>
  r._6(grouping.last ) /*here i am doing some cast, i am writing from my smartphone i don't remember all the details sorry*/ ),Order.ASCENDING)

in the first case instead i group only by r._1 and i have a recursive function that appends sortgroup operator to the grouoed dataset

Is there a way to solve this?

I think i don't understand what a keySelector is


Thanks!
Michele
Da: Fabian Hueske <[hidden email]>
Inviato: martedì 16 giugno 2015 23.43.03
A: [hidden email]
Oggetto: Re: sorting groups
 
Hi,

the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key selector function for the sorting key if you also used a selector function for the grouping key.

In Scala the use of key selector functions is often not very obvious.

If you post the groupBy().sortGroup() command and the input type, I can help you getting it right.

Cheers, Fabian

2015-06-16 23:37 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
        …

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


thanks!
cheers
Michele

Reply | Threaded
Open this post in threaded view
|

Re: sorting groups

Michele Bertoni
Got it,
I solved the problem changing the grouping function:
instead of group by r._1 (int the topk case)
now I group using a function that return r._1

thus both, group and sort, have a function as parameter
is there a reason why it works in this way? probably you should make it very clear in the documentation


michele


Il giorno 17/giu/2015, alle ore 08:35, Michele Bertoni <[hidden email]> ha scritto:

Hi Fabian,
My dataset is of this type
RegionType (Long, String, Long, Long, Char, Array[GValue])
Where GValue is a case class implemented by
GString(v:String)
GDouble(v:Double)

I have two case of sorting:
In the first (topk) i have to group by the first field of the regions and sort by a set of fields of the GValue array

In the second (topg) i have to sort by the first field of the regions and by a set of fields of the array, then sort by one field of the array

For grouping i am using the groupby function with a function as parameter that creates the hash of the desired fields, something like
ds.groupby((r:RegionType) =>
  s = new stringBuilder
  s.append(r._1)
  grouping.init.foreach((index:int) =>
    s.append("#")
    s.append(r._6(index))
  )
  Md5.hash(s.toString)
)

Then i sort it using (in the topg case, the second)
.sortGroup(((r:RegionType)=>
  r._6(grouping.last ) /*here i am doing some cast, i am writing from my smartphone i don't remember all the details sorry*/ ),Order.ASCENDING)

in the first case instead i group only by r._1 and i have a recursive function that appends sortgroup operator to the grouoed dataset

Is there a way to solve this?

I think i don't understand what a keySelector is


Thanks!
Michele
Da: Fabian Hueske <[hidden email]>
Inviato: martedì 16 giugno 2015 23.43.03
A: [hidden email]
Oggetto: Re: sorting groups
 
Hi,

the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key selector function for the sorting key if you also used a selector function for the grouping key.

In Scala the use of key selector functions is often not very obvious.

If you post the groupBy().sortGroup() command and the input type, I can help you getting it right.

Cheers, Fabian

2015-06-16 23:37 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
        …

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


thanks!
cheers
Michele


Reply | Threaded
Open this post in threaded view
|

Re: sorting groups

Fabian Hueske-2
The reason for this restriction is that KeySelector keys (i.e., keys that are extracted using a function) require special case handling at runtime. If we allow combinations of KeySelector keys with other keys for grouping and groupSorting, we have four different cases to cover compared to two. So this is not a technical limitation but mainly due to code complexity and development time.

I agree, that restriction should be better documented.

2015-06-17 15:34 GMT+02:00 Michele Bertoni <[hidden email]>:
Got it,
I solved the problem changing the grouping function:
instead of group by r._1 (int the topk case)
now I group using a function that return r._1

thus both, group and sort, have a function as parameter
is there a reason why it works in this way? probably you should make it very clear in the documentation


michele


Il giorno 17/giu/2015, alle ore 08:35, Michele Bertoni <[hidden email]> ha scritto:

Hi Fabian,
My dataset is of this type
RegionType (Long, String, Long, Long, Char, Array[GValue])
Where GValue is a case class implemented by
GString(v:String)
GDouble(v:Double)

I have two case of sorting:
In the first (topk) i have to group by the first field of the regions and sort by a set of fields of the GValue array

In the second (topg) i have to sort by the first field of the regions and by a set of fields of the array, then sort by one field of the array

For grouping i am using the groupby function with a function as parameter that creates the hash of the desired fields, something like
ds.groupby((r:RegionType) =>
  s = new stringBuilder
  s.append(r._1)
  grouping.init.foreach((index:int) =>
    s.append("#")
    s.append(r._6(index))
  )
  Md5.hash(s.toString)
)

Then i sort it using (in the topg case, the second)
.sortGroup(((r:RegionType)=>
  r._6(grouping.last ) /*here i am doing some cast, i am writing from my smartphone i don't remember all the details sorry*/ ),Order.ASCENDING)

in the first case instead i group only by r._1 and i have a recursive function that appends sortgroup operator to the grouoed dataset

Is there a way to solve this?

I think i don't understand what a keySelector is


Thanks!
Michele
Da: Fabian Hueske <[hidden email]>
Inviato: martedì 16 giugno 2015 23.43.03
A: [hidden email]
Oggetto: Re: sorting groups
 
Hi,

the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key selector function for the sorting key if you also used a selector function for the grouping key.

In Scala the use of key selector functions is often not very obvious.

If you post the groupBy().sortGroup() command and the input type, I can help you getting it right.

Cheers, Fabian

2015-06-16 23:37 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping.
        at org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
        at it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
        …

can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case


thanks!
cheers
Michele