Error joining with Python API

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Error joining with Python API

davis k
I've got an issue performing joins using Python API in flink-1.1.1. With this example code get an NPE (below). However, the NPE disappears when the filter is removed. Is there an error I'm making in this brief example or is this a Flink bug? 



    env = get_environment()
    env.set_parallelism(1)

    input1 = env.from_elements("1|0","1|2") \
        .map(lambda x: x.split("|"))

    input2 = env.from_elements("1|b") \
        .map(lambda x: x.split("|")) \
        .filter(lambda x: x[0] != "0")


    joined = input1 \
        .join(input2) \
        .where(0) \
        .equal_to(0) \
        .write_text("output.txt", write_mode=WriteMode.OVERWRITE)

    env.execute(local=True)





------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.NullPointerException
        at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
        at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
        at org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
        at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
        at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
        at org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
        at org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
        at org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
        at org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
        at org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
        at org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        ... 6 more
Reply | Threaded
Open this post in threaded view
|

Re: Error joining with Python API

Ufuk Celebi
I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
originally contributed the Python API. He can probably tell whether
this is a bug in the Python API or Flink ioperator side of things. ;)

On Mon, Aug 15, 2016 at 10:14 PM, davis k <[hidden email]> wrote:

> I've got an issue performing joins using Python API in flink-1.1.1. With
> this example code get an NPE (below). However, the NPE disappears when the
> filter is removed. Is there an error I'm making in this brief example or is
> this a Flink bug?
>
>
>
>     env = get_environment()
>     env.set_parallelism(1)
>
>     input1 = env.from_elements("1|0","1|2") \
>         .map(lambda x: x.split("|"))
>
>     input2 = env.from_elements("1|b") \
>         .map(lambda x: x.split("|")) \
>         .filter(lambda x: x[0] != "0")
>
>
>     joined = input1 \
>         .join(input2) \
>         .where(0) \
>         .equal_to(0) \
>         .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>
>     env.execute(local=True)
>
>
>
>
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>         at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>         at
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>         at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>         at
> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>         at
> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>         at
> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>         at
> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>         at
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>         at
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>         ... 6 more
Reply | Threaded
Open this post in threaded view
|

Re: Error joining with Python API

Chesnay Schepler
looks like a bug, will look into it. :)

On 16.08.2016 10:29, Ufuk Celebi wrote:

> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
> originally contributed the Python API. He can probably tell whether
> this is a bug in the Python API or Flink ioperator side of things. ;)
>
> On Mon, Aug 15, 2016 at 10:14 PM, davis k <[hidden email]> wrote:
>> I've got an issue performing joins using Python API in flink-1.1.1. With
>> this example code get an NPE (below). However, the NPE disappears when the
>> filter is removed. Is there an error I'm making in this brief example or is
>> this a Flink bug?
>>
>>
>>
>>      env = get_environment()
>>      env.set_parallelism(1)
>>
>>      input1 = env.from_elements("1|0","1|2") \
>>          .map(lambda x: x.split("|"))
>>
>>      input2 = env.from_elements("1|b") \
>>          .map(lambda x: x.split("|")) \
>>          .filter(lambda x: x[0] != "0")
>>
>>
>>      joined = input1 \
>>          .join(input2) \
>>          .where(0) \
>>          .equal_to(0) \
>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>
>>      env.execute(local=True)
>>
>>
>>
>>
>>
>> ------------------------------------------------------------
>>   The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error.
>>          at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>          at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>          at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>          at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>          at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>          at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>          at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.NullPointerException
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>>          at
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>          at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>          at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>          at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>          ... 6 more

Reply | Threaded
Open this post in threaded view
|

Re: Error joining with Python API

Chesnay Schepler
Found the issue, there was a missing tab in the chaining method...

On 16.08.2016 12:12, Chesnay Schepler wrote:

> looks like a bug, will look into it. :)
>
> On 16.08.2016 10:29, Ufuk Celebi wrote:
>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>> originally contributed the Python API. He can probably tell whether
>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>
>> On Mon, Aug 15, 2016 at 10:14 PM, davis k
>> <[hidden email]> wrote:
>>> I've got an issue performing joins using Python API in flink-1.1.1.
>>> With
>>> this example code get an NPE (below). However, the NPE disappears
>>> when the
>>> filter is removed. Is there an error I'm making in this brief
>>> example or is
>>> this a Flink bug?
>>>
>>>
>>>
>>>      env = get_environment()
>>>      env.set_parallelism(1)
>>>
>>>      input1 = env.from_elements("1|0","1|2") \
>>>          .map(lambda x: x.split("|"))
>>>
>>>      input2 = env.from_elements("1|b") \
>>>          .map(lambda x: x.split("|")) \
>>>          .filter(lambda x: x[0] != "0")
>>>
>>>
>>>      joined = input1 \
>>>          .join(input2) \
>>>          .where(0) \
>>>          .equal_to(0) \
>>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>
>>>      env.execute(local=True)
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------------------------------------
>>>   The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method
>>> caused an error.
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>>
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>
>>>          at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>>
>>>          at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>>
>>>          at
>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>>          at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>>
>>>          at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>>> Caused by: java.lang.NullPointerException
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>>>
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>>>
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>>>
>>>          at
>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>>>
>>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>>
>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>          at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>>          at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>>
>>>          ... 6 more
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Error joining with Python API

davis k
Awesome, thanks Chesnay!

On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <[hidden email]> wrote:
Found the issue, there was a missing tab in the chaining method...


On 16.08.2016 12:12, Chesnay Schepler wrote:
looks like a bug, will look into it. :)

On 16.08.2016 10:29, Ufuk Celebi wrote:
I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
originally contributed the Python API. He can probably tell whether
this is a bug in the Python API or Flink ioperator side of things. ;)

On Mon, Aug 15, 2016 at 10:14 PM, davis k <[hidden email]> wrote:
I've got an issue performing joins using Python API in flink-1.1.1. With
this example code get an NPE (below). However, the NPE disappears when the
filter is removed. Is there an error I'm making in this brief example or is
this a Flink bug?



     env = get_environment()
     env.set_parallelism(1)

     input1 = env.from_elements("1|0","1|2") \
         .map(lambda x: x.split("|"))

     input2 = env.from_elements("1|b") \
         .map(lambda x: x.split("|")) \
         .filter(lambda x: x[0] != "0")


     joined = input1 \
         .join(input2) \
         .where(0) \
         .equal_to(0) \
         .write_text("output.txt", write_mode=WriteMode.OVERWRITE)

     env.execute(local=True)





------------------------------------------------------------
  The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
         at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
         at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
         at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
         at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
         at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.NullPointerException
         at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
         at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
         at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
         at
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
         at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
         at
org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
         at
org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
         at
org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
         at
org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
         at
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
         at
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:497)
         at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
         ... 6 more