Hello everybody,
I am reading a text-file and like to outer-join it with another text-file. Because I do not know, how to call an outer-join throw Flink, I am adding a dummy-row "-1" via union to one of these Dataset: DataSet<Tuple3<String, Integer, Integer>> myNull = env .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1))); names = names.union(myNull); But if I look to the results, I never can see this value or the values from the other datasets, who should be joined with it. Thank you very much. I hope someone knows how I can solve this problem. Best regards, Alex |
Hello again,
I also like to show the Strack-Tracks. It points to the env.execute(A3NameWithBoss.class.toString());: eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error. at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399) at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302) at eu.stratosphere.client.program.Client.run(Client.java:246) at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327) at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927) at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) Caused by: eu.stratosphere.compiler.CompilerException: Error translating node 'Union "Union" : UNION [[ GlobalProperties [partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99) at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157) at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159) at eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141) at eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169) at eu.stratosphere.client.program.Client.getJobGraph(Client.java:222) at eu.stratosphere.client.program.Client.run(Client.java:293) at eu.stratosphere.client.program.Client.run(Client.java:288) at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50) at de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384) ... 6 more Caused by: java.lang.NullPointerException at eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303) ... 22 more On 17.08.2014 18:48, Alexander Sirotin wrote: > Hello everybody, > > I am reading a text-file and like to outer-join it with another > text-file. Because I do not know, how to call an outer-join throw > Flink, I am adding a dummy-row "-1" via union to one of these Dataset: > > DataSet<Tuple3<String, Integer, Integer>> myNull = env > .fromElements(new Tuple3<String, Integer, Integer>("-1", > new Integer(-1), new Integer(-1))); > > names = names.union(myNull); > > But if I look to the results, I never can see this value or the values > from the other datasets, who should be joined with it. > > Thank you very much. I hope someone knows how I can solve this problem. > > Best regards, > Alex |
In reply to this post by Alexander Sirotin
Hi Alex, right now, there is not native support for outer joins in Flink. This is on our road map though. You can however do an outer join using a CoGroup function.
CoGroup groups two data sets on a join-key and calls a user function for each pair of groups with identical keys. In the user function, each group is represented by an iterator and you can check if the iterator is empty which is the special case of an outer join.
For the example you showed with the union of the names and myNull data sets. Did you check the result directly after the union or after the join? Regarding the exception you posted, is it possible that you post the program that caused the exception?
Thank you very much, Fabian 2014-08-17 18:48 GMT+02:00 Alexander Sirotin <[hidden email]>: Hello everybody, |
Hi Fabian,
thank you for your help. I will try the CoGroup-function. At the end of this email there is my program. I changed it and attach therefore the stracktrace again. After the join I called union with the myNull-DataSet. This DataSet was created by calling fromElements - so not read from a file. The union-function then caused the showed Exception and I didnt get any result. Without the union the programs functions, but then it is an inner-join. Anyway I will try the CoGroup-method. Thank you again and best regards, Alex Error: The main method caused an error. eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error. at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:399) at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:302) at eu.stratosphere.client.program.Client.run(Client.java:246) at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327) at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927) at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) Caused by: eu.stratosphere.compiler.CompilerException: Error translating node 'Union "Union" : UNION [[ GlobalProperties [partitioning=HASH_PARTITIONED, on fields [2]] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:355) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:99) at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:157) at eu.stratosphere.compiler.plan.DualInputPlanNode.accept(DualInputPlanNode.java:159) at eu.stratosphere.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:141) at eu.stratosphere.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:159) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:169) at eu.stratosphere.client.program.Client.getJobGraph(Client.java:222) at eu.stratosphere.client.program.Client.run(Client.java:293) at eu.stratosphere.client.program.Client.run(Client.java:288) at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50) at de.hshannover.vis.flink.jobdb.A3NameWithBoss.main(A3NameWithBoss.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:384) ... 6 more Caused by: java.lang.NullPointerException at eu.stratosphere.pact.runtime.task.util.TaskConfig.setDriver(TaskConfig.java:291) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.createDualInputVertex(NepheleJobGraphGenerator.java:812) at eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:303) ... 22 more package de.hshannover.vis.flink.jobdb; import java.util.ArrayList; import java.util.List; import de.hshannover.vis.flink.jobdb.mapper.EmployeeMapper; import de.hshannover.vis.flink.jobdb.utils.IParameter; import de.hshannover.vis.flink.jobdb.utils.Loader; import de.hshannover.vis.flink.jobdb.utils.OutputPath; import eu.stratosphere.api.java.DataSet; import eu.stratosphere.api.java.ExecutionEnvironment; import eu.stratosphere.api.java.functions.*; import eu.stratosphere.api.java.tuple.*; public class A3NameWithBoss { @SuppressWarnings({ "unchecked", "serial" }) public static void main(String[] args) throws Exception { List<IParameter> list = new ArrayList<IParameter>(); list.add(new EmployeeMapper()); list.add(new OutputPath("Output", "")); Loader loader = new Loader(list, args); loader.load(); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); // get input data DataSet<Tuple3<String, Integer, Integer>> names = loader.data.get(0).map( new MapFunction<Tuple11 <Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer> , Tuple3<String,Integer,Integer>>() { public Tuple3<String,Integer,Integer> map(Tuple11 <Integer,String,String,String,String,String,String,Integer,Double,Integer,Integer> line) throws Exception { return new Tuple3<String,Integer,Integer>(line.f1 + " " + line.f2, line.f0, line.f9); } }); DataSet<Tuple3<String, Integer, Integer>> myNull = env .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1))); names = names.union(myNull); DataSet<Tuple2<String, String>> result = names.join(names).where(2) .equalTo(1).projectFirst(0).projectSecond(0) .types(String.class, String.class); // emit result result.writeAsCsv(loader.output.get(0).getPath(), "\n", " | "); // execute program env.execute(A3NameWithBoss.class.toString()); } } |
Hi Alex, thanks a lot for posting your program! I modified the code a little bit and tested it against the current master without problems (code pasted below). We will release a new version that includes many bugfixes very soon. If you do not want to wait for the release, I suggest to clone the release-0.6-rc7 branch from the Flink Git repository. I also noticed that your code is not doing a proper outer join. Outer joins preserve all tuples that do not find a matching join partner and fill the missing attributes with null values. With coGroup you can preserve tuples without join partners (those where one iterator is empty).
Adding the tuple from the collection set using the union, just adds an additional tuple that will always join with itself and will be included in the result. Please let me know, if you have further questions or observe any other problem with the system.
Best, Fabian ---------- Code ---------------- public class ProgTest { @SuppressWarnings({ "unchecked", "serial" }) public static void main(String[] args) throws Exception { ArrayList<Tuple2<Integer, String>> data = new ArrayList<Tuple2<Integer, String>>(); data.add(new Tuple2<Integer, String>(1,"Peter")); data.add(new Tuple2<Integer, String>(2,"Max")); data.add(new Tuple2<Integer, String>(3,"Sarah")); data.add(new Tuple2<Integer, String>(4,"Julia")); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); // get input data DataSet<Tuple3<String, Integer, Integer>> names = env.fromCollection(data).map( new MapFunction<Tuple2<Integer,String>, Tuple3<String,Integer,Integer>>() { public Tuple3<String,Integer,Integer> map(Tuple2<Integer,String> line) throws Exception { return new Tuple3<String,Integer,Integer>(line.f1, line.f0, 2); } }); DataSet<Tuple3<String, Integer, Integer>> myNull = env .fromElements(new Tuple3<String, Integer, Integer>("-1", new Integer(-1), new Integer(-1))); names = names.union(myNull); DataSet<Tuple2<String, String>> result = names.join(names).where(2) .equalTo(1).projectFirst(0).projectSecond(0) .types(String.class, String.class); // emit result
result.writeAsCsv("file:///testOut", "\n", " | "); // execute program env.execute(); } } 2014-08-18 20:54 GMT+02:00 Alexander Sirotin <[hidden email]>: Hi Fabian, |
@Fabian: What was the source of the error? Can you open a JIRA issue with the description? On Mon, Aug 18, 2014 at 10:49 PM, Fabian Hueske <[hidden email]> wrote:
|
Don't know what caused the error. Couldn't reproduce it with the current master. Alex' code was implemented against the eu.stratosphere namespace.That's why I recommended to switch to the upcoming release / build from the current source. Fabian 2014-08-19 10:11 GMT+02:00 Stephan Ewen <[hidden email]>:
|
In reply to this post by Stephan Ewen
Hello Stephan,
I take Fabians suggestions and mail again, if something goes wrong. Currently everything is clear for me. But it can take some time, until I continue with this program, because I like to other flink-programs. Best regards Alex On 19.08.2014 10:11, Stephan Ewen wrote: > @Fabian: What was the source of the error? Can you open a JIRA issue > with the description? > > > |
Free forum by Nabble | Edit this page |