Hello, I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception. As a context for my question: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....); At runtime I pass the arguments as follows: flink run --jarfile ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt file:///Users/X/Y/Z/outputFile.txt -v and even if the file exists in the path, I still get the error stack: Error: The main method caused an error. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) at org.apache.flink.client.program.Client.run(Client.java:244) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164) at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) ... 6 more The same thing happens if I put the file in HDFS and pass as argument the hdfs:///pathToFile/theFile.txt What could be the cause, in your opinion? Thank you in advance! Camelia |
Hello, Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt" I think this will fix your problem, however we need to fix this in the api. Regards, Gyula On Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
|
Dear Gyula, Thank You very much for your idea that indeed makes the program surpass that error. Now I run into a deserialization error and I have some doubts of its cause. Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields) ? I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution. ------------------------------------- Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) ... 12 more at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224) ... 12 more at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) at org.apache.flink.client.program.Client.run(Client.java:325) at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62) at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42) at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) at org.apache.flink.client.program.Client.run(Client.java:244) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) ------------------------- Best regards, Camelia De: "Gyula Fóra" <[hidden email]> |
Dear Camelia, The error you got means that Flink cannot deserialize your user-function when trying to create vertexes in the processing graph. It could be that you are using some non-static inner class which cannot be serialized for parsing the JSON. So for all your user functions for example flatmapfunction, the object you are passing must be serializable using default java serialization. (This is not an error at runtime when parsing sending the tuples, this happened when setting up the job) Regards, Gyula On Fri, Nov 7, 2014 at 4:57 PM, Camelia-Elena Ciolac <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |