JSON file not found - StreamExecutionEnvironment

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

JSON file not found - StreamExecutionEnvironment

Camelia-Elena Ciolac
Hello,

I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.
As a context for my question:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();   // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);

At runtime I pass the arguments as follows:

flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v 

and even if the file exists in the path, I still get the error stack:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more

The same thing happens if I put the file in HDFS and pass as argument the   hdfs:///pathToFile/theFile.txt

What could be the cause, in your opinion?


Thank you in advance!
Camelia



Reply | Threaded
Open this post in threaded view
|

Re: JSON file not found - StreamExecutionEnvironment

Gyula Fóra-2
Hello,

Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt"

I think this will fix your problem, however we need to fix this in the api.

Regards,
Gyula

On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
Hello,

I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.
As a context for my question:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();   // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);

At runtime I pass the arguments as follows:

flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v 

and even if the file exists in the path, I still get the error stack:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more

The same thing happens if I put the file in HDFS and pass as argument the   hdfs:///pathToFile/theFile.txt

What could be the cause, in your opinion?


Thank you in advance!
Camelia




Reply | Threaded
Open this post in threaded view
|

Re: JSON file not found - StreamExecutionEnvironment

Camelia-Elena Ciolac
Dear Gyula,

Thank You very much for your idea that indeed makes the program surpass that error.

Now I run into a deserialization error and I have some doubts of its cause.

Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields)  ?

I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution.


-------------------------------------


Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

    at org.apache.flink.client.program.Client.run(Client.java:325)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)


-------------------------

Best regards,
Camelia



De: "Gyula Fóra" <[hidden email]>
À: [hidden email]
Envoyé: Vendredi 7 Novembre 2014 14:06:47
Objet: Re: JSON file not found - StreamExecutionEnvironment

Hello,

Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt"

I think this will fix your problem, however we need to fix this in the api.

Regards,
Gyula

On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
Hello,

I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.
As a context for my question:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();   // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);

At runtime I pass the arguments as follows:

flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v 

and even if the file exists in the path, I still get the error stack:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more

The same thing happens if I put the file in HDFS and pass as argument the   hdfs:///pathToFile/theFile.txt

What could be the cause, in your opinion?


Thank you in advance!
Camelia





Reply | Threaded
Open this post in threaded view
|

Re: JSON file not found - StreamExecutionEnvironment

Gyula Fóra
Dear Camelia,

The error you got means that Flink cannot deserialize your user-function when trying to create vertexes in the processing graph. It could be that you are using some non-static inner class which cannot be serialized for parsing the JSON.

So for all your user functions for example flatmapfunction, the object you are passing must be serializable using default java serialization.

(This is not an error at runtime when parsing sending the tuples, this happened when setting up the job)

Regards,
Gyula


On Fri, Nov 7, 2014 at 4:57 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
Dear Gyula,

Thank You very much for your idea that indeed makes the program surpass that error.

Now I run into a deserialization error and I have some doubts of its cause.

Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields)  ?

I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution.


-------------------------------------


Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

    at org.apache.flink.client.program.Client.run(Client.java:325)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)


-------------------------

Best regards,
Camelia



De: "Gyula Fóra" <[hidden email]>
À: [hidden email]
Envoyé: Vendredi 7 Novembre 2014 14:06:47
Objet: Re: JSON file not found - StreamExecutionEnvironment


Hello,

Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt"

I think this will fix your problem, however we need to fix this in the api.

Regards,
Gyula

On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
Hello,

I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.
As a context for my question:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();   // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);

At runtime I pass the arguments as follows:

flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v 

and even if the file exists in the path, I still get the error stack:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more

The same thing happens if I put the file in HDFS and pass as argument the   hdfs:///pathToFile/theFile.txt

What could be the cause, in your opinion?


Thank you in advance!
Camelia