Sort tuple dataset

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Sort tuple dataset

Kristoffer Sjögren
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();


Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Stephan Ewen
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();



Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Kristoffer Sjögren
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) 

However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1?

Do you have a minimal example? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();




Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Stephan Ewen
Hi!

I think sort partition is the right think, if you have only one partition (which makes sense, if you want a total order). It is not a parallel operation any mode, so use it only after the data size has been reduced (filters / aggregations). 

What about "data.sortPartition().setParallelism(1)".

Does that work for you?

Greetings,
Stephan


On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sjögren <[hidden email]> wrote:
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) 

However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1?

Do you have a minimal example? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();





Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Kristoffer Sjögren
That's the thing, there is no DataSet.sortPartition method in 0.8.1. Looking through the git history show that sortPartition was added 20th of February so I think that's 0.9-SNAPSHOT?


On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I think sort partition is the right think, if you have only one partition (which makes sense, if you want a total order). It is not a parallel operation any mode, so use it only after the data size has been reduced (filters / aggregations). 

What about "data.sortPartition().setParallelism(1)".

Does that work for you?

Greetings,
Stephan


On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sjögren <[hidden email]> wrote:
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) 

However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1?

Do you have a minimal example? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();






Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Kristoffer Sjögren
After building flink 0.9-SNAPSHOT from source and using DataSet.sortPartition is indeed working as expected.

This is fine but raises the question on how to go about sorting in 0.8.1?





On Sun, Mar 15, 2015 at 5:05 PM, Kristoffer Sjögren <[hidden email]> wrote:
That's the thing, there is no DataSet.sortPartition method in 0.8.1. Looking through the git history show that sortPartition was added 20th of February so I think that's 0.9-SNAPSHOT?


On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I think sort partition is the right think, if you have only one partition (which makes sense, if you want a total order). It is not a parallel operation any mode, so use it only after the data size has been reduced (filters / aggregations). 

What about "data.sortPartition().setParallelism(1)".

Does that work for you?

Greetings,
Stephan


On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sjögren <[hidden email]> wrote:
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) 

However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1?

Do you have a minimal example? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();







Reply | Threaded
Open this post in threaded view
|

Re: Sort tuple dataset

Stephan Ewen

I think that depends on your use case. If you want to work on the entire dataset as a whole anyways, you can assign a Dummy-Key (like 0) to all elements, group by that key and sort the group on the actual value.

What exactly is you use case? Does the above solution work there?

Am 15.03.2015 17:39 schrieb "Kristoffer Sjögren" <[hidden email]>:
After building flink 0.9-SNAPSHOT from source and using DataSet.sortPartition is indeed working as expected.

This is fine but raises the question on how to go about sorting in 0.8.1?





On Sun, Mar 15, 2015 at 5:05 PM, Kristoffer Sjögren <[hidden email]> wrote:
That's the thing, there is no DataSet.sortPartition method in 0.8.1. Looking through the git history show that sortPartition was added 20th of February so I think that's 0.9-SNAPSHOT?


On Sun, Mar 15, 2015 at 4:51 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

I think sort partition is the right think, if you have only one partition (which makes sense, if you want a total order). It is not a parallel operation any mode, so use it only after the data size has been reduced (filters / aggregations). 

What about "data.sortPartition().setParallelism(1)".

Does that work for you?

Greetings,
Stephan


On Sun, Mar 15, 2015 at 4:47 PM, Kristoffer Sjögren <[hidden email]> wrote:
Thanks for your answer. I guess i'm a bit infected by writing to much Crunch code and I also suspected that getDataSet() was the wrong thing to do :-) 

However I was expecting DataSet.sortPartition to do the sorting, but this method is missing in 0.8.1?

Do you have a minimal example? I was looking through the tests but most of them use sortPartition.

Cheers,
-Kristoffer



On Sun, Mar 15, 2015 at 4:22 PM, Stephan Ewen <[hidden email]> wrote:
Hi Kristoffer!

There are a few issues with that code:

1) Grouping and then calling "sort group" sorts within the group. In your case, you group after the entire element and each group has on value - the element. Sorting inside the group does not make any difference. There is no order across groups.

2) This code never groups and sorts. The calls to "groupBy(0).sortGroup(0, Order.DESCENDING)." do not group and sort already, they set up a grouping to be used with a reduce or aggregate function. The "getDataSet()" call gets you the original data set, which is the original input.

To see an illustration of this, get the program plan (env.getExecutionPlan()). You can render it using the html file "tools/planVisualizer.html".

Greetings,
Stephan


On Sun, Mar 15, 2015 at 3:29 PM, Kristoffer Sjögren <[hidden email]> wrote:
Hi

This is silly but I can't understand why the following code doesn't sort the collection of integers. It seems to be reasonable thing to do from an API perspective?

Cheers,
-Kristoffer

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.fromCollection(Lists.newArrayList(2,1,5,3,4,5)).map(new MapFunction<Integer, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(Integer value) throws Exception {
        return new Tuple1(value);
      }
    }).groupBy(0).sortGroup(0, Order.DESCENDING).getDataSet().print();
    env.execute();