I've got an issue performing joins using Python API in flink-1.1.1. With this example code get an NPE (below). However, the NPE disappears when the filter is removed. Is there an error I'm making in this brief example or is this a Flink bug?
env = get_environment() env.set_parallelism(1) input1 = env.from_elements("1|0","1|2") \ .map(lambda x: x.split("|")) input2 = env.from_elements("1|b") \ .map(lambda x: x.split("|")) \ .filter(lambda x: x[0] != "0") joined = input1 \ .join(input2) \ .where(0) \ .equal_to(0) \ .write_text("output.txt", write_mode=WriteMode.OVERWRITE) env.execute(local=True) ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.NullPointerException at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) at org.apache.flink.api.java.DataSet.join(DataSet.java:742) at org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) at org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) at org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) at org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) at org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) at org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) ... 6 more |
I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
originally contributed the Python API. He can probably tell whether this is a bug in the Python API or Flink ioperator side of things. ;) On Mon, Aug 15, 2016 at 10:14 PM, davis k <[hidden email]> wrote: > I've got an issue performing joins using Python API in flink-1.1.1. With > this example code get an NPE (below). However, the NPE disappears when the > filter is removed. Is there an error I'm making in this brief example or is > this a Flink bug? > > > > env = get_environment() > env.set_parallelism(1) > > input1 = env.from_elements("1|0","1|2") \ > .map(lambda x: x.split("|")) > > input2 = env.from_elements("1|b") \ > .map(lambda x: x.split("|")) \ > .filter(lambda x: x[0] != "0") > > > joined = input1 \ > .join(input2) \ > .where(0) \ > .equal_to(0) \ > .write_text("output.txt", write_mode=WriteMode.OVERWRITE) > > env.execute(local=True) > > > > > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) > at > org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) > at > org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) > at > org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) > at org.apache.flink.api.java.DataSet.join(DataSet.java:742) > at > org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) > at > org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) > at > org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) > at > org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > ... 6 more |
looks like a bug, will look into it. :)
On 16.08.2016 10:29, Ufuk Celebi wrote: > I think that this is actually a bug in Flink. I'm cc'ing Chesnay who > originally contributed the Python API. He can probably tell whether > this is a bug in the Python API or Flink ioperator side of things. ;) > > On Mon, Aug 15, 2016 at 10:14 PM, davis k <[hidden email]> wrote: >> I've got an issue performing joins using Python API in flink-1.1.1. With >> this example code get an NPE (below). However, the NPE disappears when the >> filter is removed. Is there an error I'm making in this brief example or is >> this a Flink bug? >> >> >> >> env = get_environment() >> env.set_parallelism(1) >> >> input1 = env.from_elements("1|0","1|2") \ >> .map(lambda x: x.split("|")) >> >> input2 = env.from_elements("1|b") \ >> .map(lambda x: x.split("|")) \ >> .filter(lambda x: x[0] != "0") >> >> >> joined = input1 \ >> .join(input2) \ >> .where(0) \ >> .equal_to(0) \ >> .write_text("output.txt", write_mode=WriteMode.OVERWRITE) >> >> env.execute(local=True) >> >> >> >> >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main method >> caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >> Caused by: java.lang.NullPointerException >> at >> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) >> at >> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) >> at >> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) >> at >> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) >> at org.apache.flink.api.java.DataSet.join(DataSet.java:742) >> at >> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) >> at >> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) >> at >> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) >> at >> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) >> at >> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) >> at >> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >> ... 6 more |
Found the issue, there was a missing tab in the chaining method...
On 16.08.2016 12:12, Chesnay Schepler wrote: > looks like a bug, will look into it. :) > > On 16.08.2016 10:29, Ufuk Celebi wrote: >> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who >> originally contributed the Python API. He can probably tell whether >> this is a bug in the Python API or Flink ioperator side of things. ;) >> >> On Mon, Aug 15, 2016 at 10:14 PM, davis k >> <[hidden email]> wrote: >>> I've got an issue performing joins using Python API in flink-1.1.1. >>> With >>> this example code get an NPE (below). However, the NPE disappears >>> when the >>> filter is removed. Is there an error I'm making in this brief >>> example or is >>> this a Flink bug? >>> >>> >>> >>> env = get_environment() >>> env.set_parallelism(1) >>> >>> input1 = env.from_elements("1|0","1|2") \ >>> .map(lambda x: x.split("|")) >>> >>> input2 = env.from_elements("1|b") \ >>> .map(lambda x: x.split("|")) \ >>> .filter(lambda x: x[0] != "0") >>> >>> >>> joined = input1 \ >>> .join(input2) \ >>> .where(0) \ >>> .equal_to(0) \ >>> .write_text("output.txt", write_mode=WriteMode.OVERWRITE) >>> >>> env.execute(local=True) >>> >>> >>> >>> >>> >>> ------------------------------------------------------------ >>> The program finished with the following exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method >>> caused an error. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) >>> >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >>> >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >>> >>> at >>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >>> >>> at >>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >>> Caused by: java.lang.NullPointerException >>> at >>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) >>> >>> at >>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) >>> >>> at >>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) >>> >>> at >>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) >>> >>> at org.apache.flink.api.java.DataSet.join(DataSet.java:742) >>> at >>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) >>> >>> at >>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) >>> >>> at >>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) >>> >>> at >>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) >>> >>> at >>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) >>> >>> at >>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) >>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >>> >>> ... 6 more > > |
Awesome, thanks Chesnay! On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <[hidden email]> wrote: Found the issue, there was a missing tab in the chaining method... |
Free forum by Nabble | Edit this page |