Re: JSON file not found - StreamExecutionEnvironment

Posted by Camelia-Elena Ciolac on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/JSON-file-not-found-StreamExecutionEnvironment-tp335p340.html

Dear Gyula,

Thank You very much for your idea that indeed makes the program surpass that error.

Now I run into a deserialization error and I have some doubts of its cause.

Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields)  ?

I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution.


-------------------------------------


Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
    at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
    at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
    at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
    at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
    ... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:249)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
    ... 12 more

    at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

    at org.apache.flink.client.program.Client.run(Client.java:325)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62)
    at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)


-------------------------

Best regards,
Camelia



De: "Gyula Fóra" <[hidden email]>
À: [hidden email]
Envoyé: Vendredi 7 Novembre 2014 14:06:47
Objet: Re: JSON file not found - StreamExecutionEnvironment

Hello,

Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt"

I think this will fix your problem, however we need to fix this in the api.

Regards,
Gyula

On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
Hello,

I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.
As a context for my question:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();   // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);

At runtime I pass the arguments as follows:

flink run --jarfile  ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt  file:///Users/X/Y/Z/outputFile.txt  -v 

and even if the file exists in the path, I still get the error stack:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
    at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more

The same thing happens if I put the file in HDFS and pass as argument the   hdfs:///pathToFile/theFile.txt

What could be the cause, in your opinion?


Thank you in advance!
Camelia