Order groups by their keys

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Order groups by their keys

Robert Schmidtke
Hey everyone,

I'm currently trying to implement TPC-H Q1 and that involves ordering of results. Now I'm not too familiar with the transformations yet, however for the life of me I cannot figure out how to get it to work. Consider the following toy example:

final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
new Tuple3<String, Integer, Integer>("a", 2, 1),
new Tuple3<String, Integer, Integer>("b", 1, 2),
new Tuple3<String, Integer, Integer>("a", 1, 3),
new Tuple3<String, Integer, Integer>("b", 1, 4),
new Tuple3<String, Integer, Integer>("a", 1, 5),
new Tuple3<String, Integer, Integer>("b", 2, 6),
new Tuple3<String, Integer, Integer>("a", 2, 7),
new Tuple3<String, Integer, Integer>("b", 2, 8));
elems.groupBy(0, 1).sum(2).print();

I want the output to be:
(a,1,8)
(a,2,8)
(b,1,6)
(b,2,14)

However the output is:
(a,2,8)
(b,1,6)
(b,2,14)
(a,1,8)

No matter where I place sorting of partitions or groups transformations (strange enough I just realized that when I don't add any ordering, the output is as expected; however this is just the case in the toy example and not in my TPC-H Q1). Is it currently not possible to achieve an ordered output in this case? Please bear with me if I overlooked the obvious, but I could not get a clear picture from the documentation.

I verified the results with the provided data from TPC-H, apart from the sorting everything is fine.

Thanks a bunch in advance,

Cheers
Robert

--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: Order groups by their keys

Fabian Hueske-2
Hi Robert,

there are two issues involved here.

1) Flink does not support totally ordered paralllel output out-of-the box.
Fully sorting data in parallel requires range partitioning which requires some knowledge of the data (distribution of the key values) to produce balanced partitions. Flink does not feature statistics collection to determine the key distribution that's why it does not offer a range partition operation yet. However, Flink supports to sort local partitions and custom partitioners. If you know the value distribution of the key, you can implement a custom partitioner and locally sort the partitions to obtain a fully sorted result.

2) print() collects all partitions in arbitrary order such that any order across partitions is destroyed (the order within partitions should not be affected).

Best, Fabian

2015-07-15 14:56 GMT+02:00 Robert Schmidtke <[hidden email]>:
Hey everyone,

I'm currently trying to implement TPC-H Q1 and that involves ordering of results. Now I'm not too familiar with the transformations yet, however for the life of me I cannot figure out how to get it to work. Consider the following toy example:

final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
new Tuple3<String, Integer, Integer>("a", 2, 1),
new Tuple3<String, Integer, Integer>("b", 1, 2),
new Tuple3<String, Integer, Integer>("a", 1, 3),
new Tuple3<String, Integer, Integer>("b", 1, 4),
new Tuple3<String, Integer, Integer>("a", 1, 5),
new Tuple3<String, Integer, Integer>("b", 2, 6),
new Tuple3<String, Integer, Integer>("a", 2, 7),
new Tuple3<String, Integer, Integer>("b", 2, 8));
elems.groupBy(0, 1).sum(2).print();

I want the output to be:
(a,1,8)
(a,2,8)
(b,1,6)
(b,2,14)

However the output is:
(a,2,8)
(b,1,6)
(b,2,14)
(a,1,8)

No matter where I place sorting of partitions or groups transformations (strange enough I just realized that when I don't add any ordering, the output is as expected; however this is just the case in the toy example and not in my TPC-H Q1). Is it currently not possible to achieve an ordered output in this case? Please bear with me if I overlooked the obvious, but I could not get a clear picture from the documentation.

I verified the results with the provided data from TPC-H, apart from the sorting everything is fine.

Thanks a bunch in advance,

Cheers
Robert

--
My GPG Key ID: 336E2680

Reply | Threaded
Open this post in threaded view
|

Re: Order groups by their keys

Matthias J. Sax
In reply to this post by Robert Schmidtke
Hi Robert,

global sorting of the final output is currently no supported by Flink
out-of-the-box. The reason is, that a global sort requires all data to
be processed by a single node (what contradicts data parallelism).

For small output, you could use a final "reduce" with no key (ie, all
data go to a single group) and dop=1 and do the sorting in-memory in an
own UDF.

Hope this helps.

-Matthias

On 07/15/2015 02:56 PM, Robert Schmidtke wrote:

> Hey everyone,
>
> I'm currently trying to implement TPC-H Q1 and that involves ordering of
> results. Now I'm not too familiar with the transformations yet, however
> for the life of me I cannot figure out how to get it to work. Consider
> the following toy example:
>
> final ExecutionEnvironment env = ExecutionEnvironment
> .getExecutionEnvironment();
> DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
> new Tuple3<String, Integer, Integer>("a", 2, 1),
> new Tuple3<String, Integer, Integer>("b", 1, 2),
> new Tuple3<String, Integer, Integer>("a", 1, 3),
> new Tuple3<String, Integer, Integer>("b", 1, 4),
> new Tuple3<String, Integer, Integer>("a", 1, 5),
> new Tuple3<String, Integer, Integer>("b", 2, 6),
> new Tuple3<String, Integer, Integer>("a", 2, 7),
> new Tuple3<String, Integer, Integer>("b", 2, 8));
> elems.groupBy(0, 1).sum(2).print();
>
> I want the output to be:
> (a,1,8)
> (a,2,8)
> (b,1,6)
> (b,2,14)
>
> However the output is:
> (a,2,8)
> (b,1,6)
> (b,2,14)
> (a,1,8)
>
> No matter where I place sorting of partitions or groups transformations
> (strange enough I just realized that when I don't add any ordering, the
> output is as expected; however this is just the case in the toy example
> and not in my TPC-H Q1). Is it currently not possible to achieve an
> ordered output in this case? Please bear with me if I overlooked the
> obvious, but I could not get a clear picture from the documentation.
>
> Btw. the code is right
> here: https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137
> I verified the results with the provided data from TPC-H, apart from the
> sorting everything is fine.
>
> Thanks a bunch in advance,
>
> Cheers
> Robert
>
> --
> My GPG Key ID: 336E2680


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Order groups by their keys

Fabian Hueske-2
Yes, going to parallelism 1 is another option but you don't have to use a fake-reduce to enforce sorting.
You can simply do:

DataSet<Tuple3<Integer, String, String>> result = ...
result
  .sortPartition(1, Order.ASCENDING).setParallelism(1) // sort on first String field
  .output(...);

Fabian

2015-07-15 15:32 GMT+02:00 Matthias J. Sax <[hidden email]>:
Hi Robert,

global sorting of the final output is currently no supported by Flink
out-of-the-box. The reason is, that a global sort requires all data to
be processed by a single node (what contradicts data parallelism).

For small output, you could use a final "reduce" with no key (ie, all
data go to a single group) and dop=1 and do the sorting in-memory in an
own UDF.

Hope this helps.

-Matthias

On 07/15/2015 02:56 PM, Robert Schmidtke wrote:
> Hey everyone,
>
> I'm currently trying to implement TPC-H Q1 and that involves ordering of
> results. Now I'm not too familiar with the transformations yet, however
> for the life of me I cannot figure out how to get it to work. Consider
> the following toy example:
>
> final ExecutionEnvironment env = ExecutionEnvironment
> .getExecutionEnvironment();
> DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
> new Tuple3<String, Integer, Integer>("a", 2, 1),
> new Tuple3<String, Integer, Integer>("b", 1, 2),
> new Tuple3<String, Integer, Integer>("a", 1, 3),
> new Tuple3<String, Integer, Integer>("b", 1, 4),
> new Tuple3<String, Integer, Integer>("a", 1, 5),
> new Tuple3<String, Integer, Integer>("b", 2, 6),
> new Tuple3<String, Integer, Integer>("a", 2, 7),
> new Tuple3<String, Integer, Integer>("b", 2, 8));
> elems.groupBy(0, 1).sum(2).print();
>
> I want the output to be:
> (a,1,8)
> (a,2,8)
> (b,1,6)
> (b,2,14)
>
> However the output is:
> (a,2,8)
> (b,1,6)
> (b,2,14)
> (a,1,8)
>
> No matter where I place sorting of partitions or groups transformations
> (strange enough I just realized that when I don't add any ordering, the
> output is as expected; however this is just the case in the toy example
> and not in my TPC-H Q1). Is it currently not possible to achieve an
> ordered output in this case? Please bear with me if I overlooked the
> obvious, but I could not get a clear picture from the documentation.
>
> Btw. the code is right
> here: https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137
> I verified the results with the provided data from TPC-H, apart from the
> sorting everything is fine.
>
> Thanks a bunch in advance,
>
> Cheers
> Robert
>
> --
> My GPG Key ID: 336E2680


Reply | Threaded
Open this post in threaded view
|

Re: Order groups by their keys

hagersaleh
This post was updated on .
.sortPartition(1, Order.ASCENDING) found in flink 9