Union-Method does not function

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Union-Method does not function

Alexander Sirotin
Hello everybody,

I am reading a text-file and like to outer-join it with another
text-file. Because I do not know, how to call an outer-join throw Flink,
I am adding a dummy-row "-1" via union to one of these Dataset:

         DataSet<Tuple3<String, Integer, Integer>> myNull = env
                 .fromElements(new Tuple3<String, Integer, Integer>("-1",
                         new Integer(-1), new Integer(-1)));

         names = names.union(myNull);

But if I look to the results, I never can see this value or the values
from the other datasets, who should be joined with it.

Thank you very much. I hope someone knows how I can solve this problem.

Best regards,
Alex
Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Alexander Sirotin
Hello again,

I also like to show the Strack-Tracks. It points to the
env.execute(A3NameWithBoss.class.toString());:

eu.stratosphere.client.program.ProgramInvocationException: The main
method caused an error.
         at
eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399)
         at
eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302)
         at eu.stratosphere.client.program.Client.run(Client.java:246)
         at
eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
         at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
         at
eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
         at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
Caused by: eu.stratosphere.compiler.CompilerException: Error translating
node 'Union "Union" : UNION [[ GlobalProperties
[partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties
[ordering=null, grouped=null, unique=null] ]]': null
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99)
         at
eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157)
         at
eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159)
         at
eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141)
         at
eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169)
         at
eu.stratosphere.client.program.Client.getJobGraph(Client.java:222)
         at eu.stratosphere.client.program.Client.run(Client.java:293)
         at eu.stratosphere.client.program.Client.run(Client.java:288)
         at
eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
         at
de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:62)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
         at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:606)
         at
eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384)
         ... 6 more
Caused by: java.lang.NullPointerException
         at
eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303)
         ... 22 more


On 17.08.2014 18:48, Alexander Sirotin wrote:

> Hello everybody,
>
> I am reading a text-file and like to outer-join it with another
> text-file. Because I do not know, how to call an outer-join throw
> Flink, I am adding a dummy-row "-1" via union to one of these Dataset:
>
>         DataSet<Tuple3<String, Integer, Integer>> myNull = env
>                 .fromElements(new Tuple3<String, Integer, Integer>("-1",
>                         new Integer(-1), new Integer(-1)));
>
>         names = names.union(myNull);
>
> But if I look to the results, I never can see this value or the values
> from the other datasets, who should be joined with it.
>
> Thank you very much. I hope someone knows how I can solve this problem.
>
> Best regards,
> Alex

Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Fabian Hueske
In reply to this post by Alexander Sirotin
Hi Alex,

right now, there is not native support for outer joins in Flink. This is on our road map though.

You can however do an outer join using a CoGroup function.
CoGroup groups two data sets on a join-key and calls a user function for each pair of groups with identical keys.
In the user function, each group is represented by an iterator and you can check if the iterator is empty which is the special case of an outer join.

For the example you showed with the union of the names and myNull data sets. Did you check the result directly after the union or after the join?

Regarding the exception you posted, is it possible that you post the program that caused the exception?

Thank you very much,
Fabian


2014-08-17 18:48 GMT+02:00 Alexander Sirotin <[hidden email]>:
Hello everybody,

I am reading a text-file and like to outer-join it with another text-file. Because I do not know, how to call an outer-join throw Flink, I am adding a dummy-row "-1" via union to one of these Dataset:

        DataSet<Tuple3<String, Integer, Integer>> myNull = env
                .fromElements(new Tuple3<String, Integer, Integer>("-1",
                        new Integer(-1), new Integer(-1)));

        names = names.union(myNull);

But if I look to the results, I never can see this value or the values from the other datasets, who should be joined with it.

Thank you very much. I hope someone knows how I can solve this problem.

Best regards,
Alex

Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Alexander Sirotin
Hi Fabian,

thank you for your help. I will try the CoGroup-function.

At the end of this email there is my program. I changed it and attach
therefore the stracktrace again. After the join I called union with the
myNull-DataSet. This DataSet was created by calling fromElements - so
not read from a file. The union-function then caused the showed
Exception and I didnt get any result. Without the union the programs
functions, but then it is an inner-join. Anyway I will try the
CoGroup-method.

Thank you again and best regards,
Alex

Error: The main method caused an error.
eu.stratosphere.client.program.ProgramInvocationException: The main
method caused an error.
         at
eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399)
         at
eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302)
         at eu.stratosphere.client.program.Client.run(Client.java:246)
         at
eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
         at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
         at
eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
         at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
Caused by: eu.stratosphere.compiler.CompilerException: Error translating
node 'Union "Union" : UNION [[ GlobalProperties
[partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties
[ordering=null, grouped=null, unique=null] ]]': null
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99)
         at
eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157)
         at
eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159)
         at
eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141)
         at
eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169)
         at
eu.stratosphere.client.program.Client.getJobGraph(Client.java:222)
         at eu.stratosphere.client.program.Client.run(Client.java:293)
         at eu.stratosphere.client.program.Client.run(Client.java:288)
         at
eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
         at
de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:59)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
         at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:606)
         at
eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384)
         ... 6 more
Caused by: java.lang.NullPointerException
         at
eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812)
         at
eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303)
         ... 22 more


package de.hshannover.vis.flink.jobdb;

import java.util.ArrayList;
import java.util.List;

import de.hshannover.vis.flink.jobdb.mapper.EmployeeMapper;
import de.hshannover.vis.flink.jobdb.utils.IParameter;
import de.hshannover.vis.flink.jobdb.utils.Loader;
import de.hshannover.vis.flink.jobdb.utils.OutputPath;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.*;
import eu.stratosphere.api.java.tuple.*;

public class A3NameWithBoss {

     @SuppressWarnings({ "unchecked", "serial" })
     public static void main(String[] args) throws Exception {

         List<IParameter> list = new ArrayList<IParameter>();
         list.add(new EmployeeMapper());
         list.add(new OutputPath("Output", ""));
         Loader loader = new Loader(list, args);

         loader.load();

         // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment
                 .getExecutionEnvironment();

         // get input data
         DataSet<Tuple3<String, Integer, Integer>> names =
loader.data.get(0).map(
                 new MapFunction<Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                 , Tuple3<String,Integer,Integer>>() {
                     public Tuple3<String,Integer,Integer> map(Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                     line) throws Exception {
                         return new
Tuple3<String,Integer,Integer>(line.f1 + " " + line.f2, line.f0,
                                 line.f9);
                     }

                 });

         DataSet<Tuple3<String, Integer, Integer>> myNull = env
                 .fromElements(new Tuple3<String, Integer, Integer>("-1",
                         new Integer(-1), new Integer(-1)));

         names = names.union(myNull);

         DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                 .equalTo(1).projectFirst(0).projectSecond(0)
                 .types(String.class, String.class);

         // emit result
         result.writeAsCsv(loader.output.get(0).getPath(), "\n", " | ");

         // execute program
         env.execute(A3NameWithBoss.class.toString());
     }
}
Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Fabian Hueske
Hi Alex,

thanks a lot for posting your program!
I modified the code a little bit and tested it against the current master without problems (code pasted below).

We will release a new version that includes many bugfixes very soon. If you do not want to wait for the release, I suggest to clone the release-0.6-rc7 branch from the Flink Git repository.

I also noticed that your code is not doing a proper outer join. Outer joins preserve all tuples that do not find a matching join partner and fill the missing attributes with null values. With coGroup you can preserve tuples without join partners (those where one iterator is empty).
Adding the tuple from the collection set using the union, just adds an additional tuple that will always join with itself and will be included in the result.

Please let me know, if you have further questions or observe any other problem with the system.

Best, Fabian

---------- Code ----------------

public class ProgTest {

     @SuppressWarnings({ "unchecked", "serial" })
     public static void main(String[] args) throws Exception {
      ArrayList<Tuple2<Integer, String>> data = new ArrayList<Tuple2<Integer, String>>();
      data.add(new Tuple2<Integer, String>(1,"Peter"));
      data.add(new Tuple2<Integer, String>(2,"Max"));
      data.add(new Tuple2<Integer, String>(3,"Sarah"));
      data.add(new Tuple2<Integer, String>(4,"Julia"));
     
      // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment
                 .getExecutionEnvironment();
         // get input data
         DataSet<Tuple3<String, Integer, Integer>> names = env.fromCollection(data).map(
                 new MapFunction<Tuple2<Integer,String>, Tuple3<String,Integer,Integer>>() {
                     public Tuple3<String,Integer,Integer> map(Tuple2<Integer,String> line) throws Exception {
                         return new Tuple3<String,Integer,Integer>(line.f1, line.f0, 2);
                     }
                 });
         DataSet<Tuple3<String, Integer, Integer>> myNull = env
                 .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1)));
         names = names.union(myNull);
         DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                 .equalTo(1).projectFirst(0).projectSecond(0)
                 .types(String.class, String.class);
         // emit result
         result.writeAsCsv("file:///testOut", "\n", " | ");
         // execute program
         env.execute();
     }
}




2014-08-18 20:54 GMT+02:00 Alexander Sirotin <[hidden email]>:
Hi Fabian,

thank you for your help. I will try the CoGroup-function.

At the end of this email there is my program. I changed it and attach therefore the stracktrace again. After the join I called union with the myNull-DataSet. This DataSet was created by calling fromElements - so not read from a file. The union-function then caused the showed Exception and I didnt get any result. Without the union the programs functions, but then it is an inner-join. Anyway I will try the CoGroup-method.

Thank you again and best regards,
Alex

Error: The main method caused an error.

eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error.
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399)
        at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302)
        at eu.stratosphere.client.program.Client.run(Client.java:246)
        at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
        at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
        at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
        at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
Caused by: eu.stratosphere.compiler.CompilerException: Error translating node 'Union "Union" : UNION [[ GlobalProperties [partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159)
        at eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141)
        at eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169)
        at eu.stratosphere.client.program.Client.getJobGraph(Client.java:222)
        at eu.stratosphere.client.program.Client.run(Client.java:293)
        at eu.stratosphere.client.program.Client.run(Client.java:288)
        at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
        at de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:59)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384)
        ... 6 more
Caused by: java.lang.NullPointerException
        at eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303)
        ... 22 more


package de.hshannover.vis.flink.jobdb;

import java.util.ArrayList;
import java.util.List;

import de.hshannover.vis.flink.jobdb.mapper.EmployeeMapper;
import de.hshannover.vis.flink.jobdb.utils.IParameter;
import de.hshannover.vis.flink.jobdb.utils.Loader;
import de.hshannover.vis.flink.jobdb.utils.OutputPath;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.*;
import eu.stratosphere.api.java.tuple.*;

public class A3NameWithBoss {

    @SuppressWarnings({ "unchecked", "serial" })
    public static void main(String[] args) throws Exception {

        List<IParameter> list = new ArrayList<IParameter>();
        list.add(new EmployeeMapper());
        list.add(new OutputPath("Output", ""));
        Loader loader = new Loader(list, args);

        loader.load();

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();

        // get input data
        DataSet<Tuple3<String, Integer, Integer>> names = loader.data.get(0).map(
                new MapFunction<Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                , Tuple3<String,Integer,Integer>>() {
                    public Tuple3<String,Integer,Integer> map(Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                    line) throws Exception {
                        return new Tuple3<String,Integer,Integer>(line.f1 + " " + line.f2, line.f0,
                                line.f9);
                    }

                });


        DataSet<Tuple3<String, Integer, Integer>> myNull = env
                .fromElements(new Tuple3<String, Integer, Integer>("-1",
                        new Integer(-1), new Integer(-1)));

        names = names.union(myNull);

        DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                .equalTo(1).projectFirst(0).projectSecond(0)
                .types(String.class, String.class);

        // emit result
        result.writeAsCsv(loader.output.get(0).getPath(), "\n", " | ");

        // execute program
        env.execute(A3NameWithBoss.class.toString());
    }
}

Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Stephan Ewen
@Fabian: What was the source of the error? Can you open a JIRA issue with the description?


On Mon, Aug 18, 2014 at 10:49 PM, Fabian Hueske <[hidden email]> wrote:
Hi Alex,

thanks a lot for posting your program!
I modified the code a little bit and tested it against the current master without problems (code pasted below).

We will release a new version that includes many bugfixes very soon. If you do not want to wait for the release, I suggest to clone the release-0.6-rc7 branch from the Flink Git repository.

I also noticed that your code is not doing a proper outer join. Outer joins preserve all tuples that do not find a matching join partner and fill the missing attributes with null values. With coGroup you can preserve tuples without join partners (those where one iterator is empty).
Adding the tuple from the collection set using the union, just adds an additional tuple that will always join with itself and will be included in the result.

Please let me know, if you have further questions or observe any other problem with the system.

Best, Fabian

---------- Code ----------------

public class ProgTest {

     @SuppressWarnings({ "unchecked", "serial" })
     public static void main(String[] args) throws Exception {
      ArrayList<Tuple2<Integer, String>> data = new ArrayList<Tuple2<Integer, String>>();
      data.add(new Tuple2<Integer, String>(1,"Peter"));
      data.add(new Tuple2<Integer, String>(2,"Max"));
      data.add(new Tuple2<Integer, String>(3,"Sarah"));
      data.add(new Tuple2<Integer, String>(4,"Julia"));

     
      // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment
                 .getExecutionEnvironment();
         // get input data
         DataSet<Tuple3<String, Integer, Integer>> names = env.fromCollection(data).map(
                 new MapFunction<Tuple2<Integer,String>, Tuple3<String,Integer,Integer>>() {
                     public Tuple3<String,Integer,Integer> map(Tuple2<Integer,String> line) throws Exception {
                         return new Tuple3<String,Integer,Integer>(line.f1, line.f0, 2);
                     }
                 });
         DataSet<Tuple3<String, Integer, Integer>> myNull = env
                 .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1)));
         names = names.union(myNull);
         DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                 .equalTo(1).projectFirst(0).projectSecond(0)
                 .types(String.class, String.class);
         // emit result
         result.writeAsCsv("file:///testOut", "\n", " | ");
         // execute program
         env.execute();
     }
}




2014-08-18 20:54 GMT+02:00 Alexander Sirotin <[hidden email]>:

Hi Fabian,

thank you for your help. I will try the CoGroup-function.

At the end of this email there is my program. I changed it and attach therefore the stracktrace again. After the join I called union with the myNull-DataSet. This DataSet was created by calling fromElements - so not read from a file. The union-function then caused the showed Exception and I didnt get any result. Without the union the programs functions, but then it is an inner-join. Anyway I will try the CoGroup-method.

Thank you again and best regards,
Alex

Error: The main method caused an error.

eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error.
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399)
        at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302)
        at eu.stratosphere.client.program.Client.run(Client.java:246)
        at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
        at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
        at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
        at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
Caused by: eu.stratosphere.compiler.CompilerException: Error translating node 'Union "Union" : UNION [[ GlobalProperties [partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159)
        at eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141)
        at eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169)
        at eu.stratosphere.client.program.Client.getJobGraph(Client.java:222)
        at eu.stratosphere.client.program.Client.run(Client.java:293)
        at eu.stratosphere.client.program.Client.run(Client.java:288)
        at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
        at de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:59)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384)
        ... 6 more
Caused by: java.lang.NullPointerException
        at eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303)
        ... 22 more


package de.hshannover.vis.flink.jobdb;

import java.util.ArrayList;
import java.util.List;

import de.hshannover.vis.flink.jobdb.mapper.EmployeeMapper;
import de.hshannover.vis.flink.jobdb.utils.IParameter;
import de.hshannover.vis.flink.jobdb.utils.Loader;
import de.hshannover.vis.flink.jobdb.utils.OutputPath;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.*;
import eu.stratosphere.api.java.tuple.*;

public class A3NameWithBoss {

    @SuppressWarnings({ "unchecked", "serial" })
    public static void main(String[] args) throws Exception {

        List<IParameter> list = new ArrayList<IParameter>();
        list.add(new EmployeeMapper());
        list.add(new OutputPath("Output", ""));
        Loader loader = new Loader(list, args);

        loader.load();

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();

        // get input data
        DataSet<Tuple3<String, Integer, Integer>> names = loader.data.get(0).map(
                new MapFunction<Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                , Tuple3<String,Integer,Integer>>() {
                    public Tuple3<String,Integer,Integer> map(Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                    line) throws Exception {
                        return new Tuple3<String,Integer,Integer>(line.f1 + " " + line.f2, line.f0,
                                line.f9);
                    }

                });


        DataSet<Tuple3<String, Integer, Integer>> myNull = env
                .fromElements(new Tuple3<String, Integer, Integer>("-1",
                        new Integer(-1), new Integer(-1)));

        names = names.union(myNull);

        DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                .equalTo(1).projectFirst(0).projectSecond(0)
                .types(String.class, String.class);

        // emit result
        result.writeAsCsv(loader.output.get(0).getPath(), "\n", " | ");

        // execute program
        env.execute(A3NameWithBoss.class.toString());
    }
}


Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Fabian Hueske
Don't know what caused the error. Couldn't reproduce it with the current master.
Alex' code was implemented against the eu.stratosphere namespace.

That's why I recommended to switch to the upcoming release / build from the current source.

Fabian



2014-08-19 10:11 GMT+02:00 Stephan Ewen <[hidden email]>:
@Fabian: What was the source of the error? Can you open a JIRA issue with the description?


On Mon, Aug 18, 2014 at 10:49 PM, Fabian Hueske <[hidden email]> wrote:
Hi Alex,

thanks a lot for posting your program!
I modified the code a little bit and tested it against the current master without problems (code pasted below).

We will release a new version that includes many bugfixes very soon. If you do not want to wait for the release, I suggest to clone the release-0.6-rc7 branch from the Flink Git repository.

I also noticed that your code is not doing a proper outer join. Outer joins preserve all tuples that do not find a matching join partner and fill the missing attributes with null values. With coGroup you can preserve tuples without join partners (those where one iterator is empty).
Adding the tuple from the collection set using the union, just adds an additional tuple that will always join with itself and will be included in the result.

Please let me know, if you have further questions or observe any other problem with the system.

Best, Fabian

---------- Code ----------------

public class ProgTest {

     @SuppressWarnings({ "unchecked", "serial" })
     public static void main(String[] args) throws Exception {
      ArrayList<Tuple2<Integer, String>> data = new ArrayList<Tuple2<Integer, String>>();
      data.add(new Tuple2<Integer, String>(1,"Peter"));
      data.add(new Tuple2<Integer, String>(2,"Max"));
      data.add(new Tuple2<Integer, String>(3,"Sarah"));
      data.add(new Tuple2<Integer, String>(4,"Julia"));

     
      // set up the execution environment
         final ExecutionEnvironment env = ExecutionEnvironment
                 .getExecutionEnvironment();
         // get input data
         DataSet<Tuple3<String, Integer, Integer>> names = env.fromCollection(data).map(
                 new MapFunction<Tuple2<Integer,String>, Tuple3<String,Integer,Integer>>() {
                     public Tuple3<String,Integer,Integer> map(Tuple2<Integer,String> line) throws Exception {
                         return new Tuple3<String,Integer,Integer>(line.f1, line.f0, 2);
                     }
                 });
         DataSet<Tuple3<String, Integer, Integer>> myNull = env
                 .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1)));
         names = names.union(myNull);
         DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                 .equalTo(1).projectFirst(0).projectSecond(0)
                 .types(String.class, String.class);
         // emit result
         result.writeAsCsv("file:///testOut", "\n", " | ");
         // execute program
         env.execute();
     }
}




2014-08-18 20:54 GMT+02:00 Alexander Sirotin <[hidden email]>:

Hi Fabian,

thank you for your help. I will try the CoGroup-function.

At the end of this email there is my program. I changed it and attach therefore the stracktrace again. After the join I called union with the myNull-DataSet. This DataSet was created by calling fromElements - so not read from a file. The union-function then caused the showed Exception and I didnt get any result. Without the union the programs functions, but then it is an inner-join. Anyway I will try the CoGroup-method.

Thank you again and best regards,
Alex

Error: The main method caused an error.

eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error.
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399)
        at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302)
        at eu.stratosphere.client.program.Client.run(Client.java:246)
        at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
        at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
        at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
        at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
Caused by: eu.stratosphere.compiler.CompilerException: Error translating node 'Union "Union" : UNION [[ GlobalProperties [partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157)
        at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159)
        at eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141)
        at eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169)
        at eu.stratosphere.client.program.Client.getJobGraph(Client.java:222)
        at eu.stratosphere.client.program.Client.run(Client.java:293)
        at eu.stratosphere.client.program.Client.run(Client.java:288)
        at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
        at de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:59)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384)
        ... 6 more
Caused by: java.lang.NullPointerException
        at eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812)
        at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303)
        ... 22 more


package de.hshannover.vis.flink.jobdb;

import java.util.ArrayList;
import java.util.List;

import de.hshannover.vis.flink.jobdb.mapper.EmployeeMapper;
import de.hshannover.vis.flink.jobdb.utils.IParameter;
import de.hshannover.vis.flink.jobdb.utils.Loader;
import de.hshannover.vis.flink.jobdb.utils.OutputPath;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.*;
import eu.stratosphere.api.java.tuple.*;

public class A3NameWithBoss {

    @SuppressWarnings({ "unchecked", "serial" })
    public static void main(String[] args) throws Exception {

        List<IParameter> list = new ArrayList<IParameter>();
        list.add(new EmployeeMapper());
        list.add(new OutputPath("Output", ""));
        Loader loader = new Loader(list, args);

        loader.load();

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();

        // get input data
        DataSet<Tuple3<String, Integer, Integer>> names = loader.data.get(0).map(
                new MapFunction<Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                , Tuple3<String,Integer,Integer>>() {
                    public Tuple3<String,Integer,Integer> map(Tuple11
<Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer>
                    line) throws Exception {
                        return new Tuple3<String,Integer,Integer>(line.f1 + " " + line.f2, line.f0,
                                line.f9);
                    }

                });


        DataSet<Tuple3<String, Integer, Integer>> myNull = env
                .fromElements(new Tuple3<String, Integer, Integer>("-1",
                        new Integer(-1), new Integer(-1)));

        names = names.union(myNull);

        DataSet<Tuple2<String, String>> result = names.join(names).where(2)
                .equalTo(1).projectFirst(0).projectSecond(0)
                .types(String.class, String.class);

        // emit result
        result.writeAsCsv(loader.output.get(0).getPath(), "\n", " | ");

        // execute program
        env.execute(A3NameWithBoss.class.toString());
    }
}



Reply | Threaded
Open this post in threaded view
|

Re: Union-Method does not function

Alexander Sirotin
In reply to this post by Stephan Ewen
Hello Stephan,

I take Fabians suggestions and mail again, if something goes wrong.
Currently everything is clear for me. But it can take some time, until I
continue with this program, because I like to other flink-programs.

Best regards
Alex

On 19.08.2014 10:11, Stephan Ewen wrote:
> @Fabian: What was the source of the error? Can you open a JIRA issue
> with the description?
>
>
>