Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

Yassine MARZOUGUI
Hi all,

My job fails with the folowing exception : CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
The exception happens when adding partitionByRange(1).sortPartition(1, Order.DESCENDING) to the union of datasets. 

I made a smaller version that reproduces the bug :

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Iterator;

public class BugReproduce {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<WC> wc1 = env.fromElements(new WC("first",1), new WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
        DataSet<WC> wc2 = env.fromElements(new WC("third",1), new WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
        DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));

        DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
        DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
        DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
        DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
        all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print();

    }

    public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC> input){
        return input.groupBy("word").reduceGroup(new GroupReduceFunction<WC, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<WC> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Integer count = 0;
                Iterator<WC> iterator = iterable.iterator();
                if (iterator.hasNext()) {
                    String word= iterator.next().word;
                    while (iterator.hasNext()) {
                        iterator.next();
                        count += 1;
                    }
                    collector.collect(Tuple2.of(word,count));
                }
            }
        });
    }

    public static class WC {
        public String word;
        public int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }
}

Here is the exception stacktrace:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

I'm using Flink v1.1.3. Any help is appreciated. Thank you.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

Fabian Hueske-2
Hi Yassine,

I thought I had fixed that bug a few weeks a ago, but apparently the fix did not catch all cases.
Can you please reopen FLINK-2662 and post the program to reproduce the bug there?

Thanks,
Fabian

2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

My job fails with the folowing exception : CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
The exception happens when adding partitionByRange(1).sortPartition(1, Order.DESCENDING) to the union of datasets. 

I made a smaller version that reproduces the bug :

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Iterator;

public class BugReproduce {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<WC> wc1 = env.fromElements(new WC("first",1), new WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
        DataSet<WC> wc2 = env.fromElements(new WC("third",1), new WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
        DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));

        DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
        DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
        DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
        DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
        all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print();

    }

    public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC> input){
        return input.groupBy("word").reduceGroup(new GroupReduceFunction<WC, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<WC> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Integer count = 0;
                Iterator<WC> iterator = iterable.iterator();
                if (iterator.hasNext()) {
                    String word= iterator.next().word;
                    while (iterator.hasNext()) {
                        iterator.next();
                        count += 1;
                    }
                    collector.collect(Tuple2.of(word,count));
                }
            }
        });
    }

    public static class WC {
        public String word;
        public int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }
}

Here is the exception stacktrace:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

I'm using Flink v1.1.3. Any help is appreciated. Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

Yassine MARZOUGUI
Hi Fabian,

I commented on the issue and attached the program reproducing the bug, But I couldn't find how to re-open it (I think maybe I don't have enough permissions?).

Best,
Yassine


2016-10-25 12:49 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Yassine,

I thought I had fixed that bug a few weeks a ago, but apparently the fix did not catch all cases.
Can you please reopen FLINK-2662 and post the program to reproduce the bug there?

Thanks,
Fabian

2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <[hidden email]>:
Hi all,

My job fails with the folowing exception : CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
The exception happens when adding partitionByRange(1).sortPartition(1, Order.DESCENDING) to the union of datasets. 

I made a smaller version that reproduces the bug :

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Iterator;

public class BugReproduce {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<WC> wc1 = env.fromElements(new WC("first",1), new WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
        DataSet<WC> wc2 = env.fromElements(new WC("third",1), new WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
        DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));

        DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
        DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
        DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
        DataSet<Tuple2<String,Integer>> all = aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
        all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print();

    }

    public static DataSet<Tuple2<String,Integer>> aggregateWC(DataSet<WC> input){
        return input.groupBy("word").reduceGroup(new GroupReduceFunction<WC, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<WC> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Integer count = 0;
                Iterator<WC> iterator = iterable.iterator();
                if (iterator.hasNext()) {
                    String word= iterator.next().word;
                    while (iterator.hasNext()) {
                        iterator.next();
                        count += 1;
                    }
                    collector.collect(Tuple2.of(word,count));
                }
            }
        });
    }

    public static class WC {
        public String word;
        public int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }
}

Here is the exception stacktrace:

Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

I'm using Flink v1.1.3. Any help is appreciated. Thank you.

Best,
Yassine