Hi all,
My job fails with the folowing exception : CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. The exception happens when adding partitionByRange(1).sortPartition(1, Order.DESCENDING) to the union of datasets. I made a smaller version that reproduces the bug : import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.util.Iterator; public class BugReproduce { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<WC> wc1 = env.fromElements(new WC("first",1), new WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2)); DataSet<WC> wc2 = env.fromElements(new WC("third",1), new WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2)); DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2)); DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1); DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2); DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3); DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3); all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print(); } public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC> input){ return input.groupBy("word").reduceGroup(new GroupReduceFunction<WC, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<WC> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { Integer count = 0; Iterator<WC> iterator = iterable.iterator(); if (iterator.hasNext()) { String word= iterator.next().word; while (iterator.hasNext()) { iterator.next(); count += 1; } collector.collect(Tuple2.of(word,count)); } } }); } public static class WC { public String word; public int count; public WC() { } public WC(String word, int count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } } } Here is the exception stacktrace: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.myorg.prod.BugReproduce.main(BugReproduce.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) I'm using Flink v1.1.3. Any help is appreciated. Thank you. Best, Yassine |
Hi Yassine, I thought I had fixed that bug a few weeks a ago, but apparently the fix did not catch all cases.Fabian 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
|
Hi Fabian, I commented on the issue and attached the program reproducing the bug, But I couldn't find how to re-open it (I think maybe I don't have enough permissions?). Best, Yassine 2016-10-25 12:49 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Free forum by Nabble | Edit this page |