Hi
This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective? Cheers, -Kristoffer final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() { @Override public Tuple1<Integer> map(Integer value) throws Exception { return new Tuple1(value); } }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print(); env.execute(); |
Hi Kristoffer! There are a few issues with that code: 1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups. 2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input. To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html". Greetings, Stephan On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
|
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1? Do you have a minimal example? I was looking through the tests but most of them use sortPartition. Cheers, -Kristoffer On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi! I think sort partition is the right think, if you have only one partition (which makes sense, if you want a total order). It is not a parallel operation any mode, so use it only after the data size has been reduced (filters / aggregations). What about "data.sortPartition().setParallelism(1)". Does that work for you? Greetings, Stephan On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sjögren <[hidden email]> wrote:
|
That's the thing, there is no DataSet.sortPartition method in 0.8.1. Looking through the git history show that sortPartition was added 20th of February so I think that's 0.9-SNAPSHOT? On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen <[hidden email]> wrote:
|
After building flink 0.9-SNAPSHOT from source and using DataSet.sortPartition is indeed working as expected. This is fine but raises the question on how to go about sorting in 0.8.1? On Sun, Mar 15, 2015 at 5:05 PM, Kristoffer Sjögren <[hidden email]> wrote:
|
I think that depends on your use case. If you want to work on the entire dataset as a whole anyways, you can assign a Dummy-Key (like 0) to all elements, group by that key and sort the group on the actual value. What exactly is you use case? Does the above solution work there? Am 15.03.2015 17:39 schrieb "Kristoffer Sjögren" <[hidden email]>:
|
Free forum by Nabble | Edit this page |