Dear Gyula,Thank You very much for your idea that indeed makes the program surpass that error.Now I run into a deserialization error and I have some doubts of its cause.Is it possible in Flink 0.7.0-incubating to parse an input JSON file containing heterogeneous types of records (e.g. corresponding to events having different structures of fields) ?I copy&paste below the whole error trace as it may contain some hints that maybe can help You suggest me a workaround, please. This is the only output that I receive after launching the program in execution.-------------------------------------Error: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:249)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
... 12 more
at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.Exception: Failed to deploy the task flatMap-2 (1/8) - execution #0 to slot SubSlot 1 (ee5b634754a028c12a321648f48e4886 (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193)
at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63)
at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53)
at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175)
at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191)
... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.ReadJSONDirectly$SelectFieldsFlatMap
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:249)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:604)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1591)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1496)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
... 12 more
at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
at org.apache.flink.client.program.Client.run(Client.java:325)
at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:62)
at org.apache.flink.streaming.util.ClusterUtil.runOnMiniCluster(ClusterUtil.java:80)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:42)
at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)-------------------------Best regards,CameliaDe: "Gyula Fóra" <[hidden email]>
À: [hidden email]
Envoyé: Vendredi 7 Novembre 2014 14:06:47
Objet: Re: JSON file not found - StreamExecutionEnvironmentHello,Please try running the same job, but for the file path drop the file:// so just "/Users/X/Y/Z/theFile.txt"I think this will fix your problem, however we need to fix this in the api.Regards,GyulaOn Fri, Nov 7, 2014 at 1:54 PM, Camelia-Elena Ciolac <[hidden email]> wrote:Hello,I wrote a small program to test the JSON parsing capability with the new streaming API of Flink 0.7.0-incubating, but I ran into a "file not found" exception.As a context for my question:StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // neither with StreamExecutionEnvironment.getExecutionEnvironment() it doesn't work
DataStream<Tuple4<String,Integer,Integer,Long>> ds1 = env.readTextFile(args[0]). flatMap (....);At runtime I pass the arguments as follows:flink run --jarfile ./quickstart/target/quickstart-0.1.jar --class org.apache.flink.ReadJSONDirectly --arguments file:///Users/X/Y/Z/theFile.txt file:///Users/X/Y/Z/outputFile.txt -vand even if the file exists in the path, I still get the error stack:Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: java.lang.IllegalArgumentException: File not found: file:///Users/X/Y/Z/theFile.txt
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.checkIfFileExists(StreamExecutionEnvironment.java:196)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readTextFile(StreamExecutionEnvironment.java:164)
at org.apache.flink.ReadJSONDirectly.main(ReadJSONDirectly.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
... 6 moreThe same thing happens if I put the file in HDFS and pass as argument the hdfs:///pathToFile/theFile.txtWhat could be the cause, in your opinion?Thank you in advance!Camelia
Free forum by Nabble | Edit this page |