Hey everyone,
I'm currently trying to implement TPC-H Q1 and that involves ordering of results. Now I'm not too familiar with the transformations yet, however for the life of me I cannot figure out how to get it to work. Consider the following toy example: final ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements( new Tuple3<String, Integer, Integer>("a", 2, 1), new Tuple3<String, Integer, Integer>("b", 1, 2), new Tuple3<String, Integer, Integer>("a", 1, 3), new Tuple3<String, Integer, Integer>("b", 1, 4), new Tuple3<String, Integer, Integer>("a", 1, 5), new Tuple3<String, Integer, Integer>("b", 2, 6), new Tuple3<String, Integer, Integer>("a", 2, 7), new Tuple3<String, Integer, Integer>("b", 2, 8)); elems.groupBy(0, 1).sum(2).print(); I want the output to be: (a,1,8) (a,2,8) (b,1,6) (b,2,14) However the output is: (a,2,8) (b,1,6) (b,2,14) (a,1,8) No matter where I place sorting of partitions or groups transformations (strange enough I just realized that when I don't add any ordering, the output is as expected; however this is just the case in the toy example and not in my TPC-H Q1). Is it currently not possible to achieve an ordered output in this case? Please bear with me if I overlooked the obvious, but I could not get a clear picture from the documentation. Btw. the code is right here: https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137 I verified the results with the provided data from TPC-H, apart from the sorting everything is fine. Thanks a bunch in advance, Cheers Robert My GPG Key ID: 336E2680 |
Hi Robert, there are two issues involved here.2015-07-15 14:56 GMT+02:00 Robert Schmidtke <[hidden email]>:
|
In reply to this post by Robert Schmidtke
Hi Robert,
global sorting of the final output is currently no supported by Flink out-of-the-box. The reason is, that a global sort requires all data to be processed by a single node (what contradicts data parallelism). For small output, you could use a final "reduce" with no key (ie, all data go to a single group) and dop=1 and do the sorting in-memory in an own UDF. Hope this helps. -Matthias On 07/15/2015 02:56 PM, Robert Schmidtke wrote: > Hey everyone, > > I'm currently trying to implement TPC-H Q1 and that involves ordering of > results. Now I'm not too familiar with the transformations yet, however > for the life of me I cannot figure out how to get it to work. Consider > the following toy example: > > final ExecutionEnvironment env = ExecutionEnvironment > .getExecutionEnvironment(); > DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements( > new Tuple3<String, Integer, Integer>("a", 2, 1), > new Tuple3<String, Integer, Integer>("b", 1, 2), > new Tuple3<String, Integer, Integer>("a", 1, 3), > new Tuple3<String, Integer, Integer>("b", 1, 4), > new Tuple3<String, Integer, Integer>("a", 1, 5), > new Tuple3<String, Integer, Integer>("b", 2, 6), > new Tuple3<String, Integer, Integer>("a", 2, 7), > new Tuple3<String, Integer, Integer>("b", 2, 8)); > elems.groupBy(0, 1).sum(2).print(); > > I want the output to be: > (a,1,8) > (a,2,8) > (b,1,6) > (b,2,14) > > However the output is: > (a,2,8) > (b,1,6) > (b,2,14) > (a,1,8) > > No matter where I place sorting of partitions or groups transformations > (strange enough I just realized that when I don't add any ordering, the > output is as expected; however this is just the case in the toy example > and not in my TPC-H Q1). Is it currently not possible to achieve an > ordered output in this case? Please bear with me if I overlooked the > obvious, but I could not get a clear picture from the documentation. > > Btw. the code is right > here: https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137 > I verified the results with the provided data from TPC-H, apart from the > sorting everything is fine. > > Thanks a bunch in advance, > > Cheers > Robert > > -- > My GPG Key ID: 336E2680 signature.asc (836 bytes) Download Attachment |
Yes, going to parallelism 1 is another option but you don't have to use a fake-reduce to enforce sorting. You can simply do: DataSet<Tuple3<Integer, String, String>> result = ... result .sortPartition(1, Order.ASCENDING).setParallelism(1) // sort on first String field .output(...); Fabian 2015-07-15 15:32 GMT+02:00 Matthias J. Sax <[hidden email]>: Hi Robert, |
This post was updated on .
.sortPartition(1, Order.ASCENDING) found in flink 9
|
Free forum by Nabble | Edit this page |