Hi! I'm new to this list, and also new to flink, so sorry if my question
is too amateur. I've created a simple flink streaming topology, which reads its input from kafka, apply some transformation on them, and sends them to a sink, which then stores the stream of documents into elasticsearch. DataStreamSink<ElaIndexCommand> dataStream = env .addSource(new ItemsKafkaSource(), 1) .flatMap(new TransformItemsStream()) .addSink(new ElasticSearchSink()); It runs in local environment wonderfully, but I have some troubles deploying it to a remote execution environment. For the remote environment, i'm running this version of flink on a yarn on top of hadoop 2.3: http://xenia.sote.hu/ftp/mirrors/www.apache.org/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz "yarn version" gives the following output: Hadoop 2.3.0-cdh5.1.2 Subversion git://github.sf.cloudera.com/CDH/cdh.git -r 251e630be743d5abaec6ba62cdc5077d229e017f Compiled by jenkins on 2014-08-26T01:36Z Compiled with protoc 2.5.0 From source with checksum ec11b8ec19ca2bf3e7cb1bbe4ee182 This command was run using /usr/lib/hadoop/hadoop-common-2.3.0-cdh5.1.2.jar I've created a fatjar, but when I'm trying to submit it, I've get the following error: Error: The program execution failed: java.lang.Exception: Failed to deploy the task source-1 (1/1) - execution #0 to slot SubSlot 0 (b6ad89147571b7a3817b0d726f5c584e (0) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: com.gravityrd.flink.ItemsKafkaSource at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: com.gravityrd.flink.ItemsKafkaSource at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) The classes are in the jar file, when I'm running "./bin/flink run example-toplogy-flink.jar" with -verbose:class, I see them loaded locally. After I've tried to submit the example toplogies: 1, I successfully submitted the normal wordcount example. 2, streaming wordcount gives me this error: java.lang.NoClassDefFoundError: org/apache/flink/examples/java/wordcount/util/WordCountData (but i think its "normal", because that class is missing from the jar) 3, TwitterStream gives this error: Executing TwitterStream example with built-in default data. Provide parameters to read input data from a file. USAGE: TwitterStream <pathToPropertiesFile> Error: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream; java.lang.NoSuchMethodError: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream; 4, IterateExample gives the following error (similar to my original problem) Error: The program execution failed: java.lang.Exception: Failed to deploy the task iterationSource-3 (2/2) - execution #0 to slot SubSlot 0 (eddf0569c5374e0975551230c08f0efe (1) - ALLOCATED/ALIVE): java.lang.RuntimeException: Cannot deserialize invokable object at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:193) at org.apache.flink.streaming.api.streamvertex.StreamVertex.initialize(StreamVertex.java:63) at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:53) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:175) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:418) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) Caused by: org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: org.apache.flink.streaming.examples.iteration.IterateExample$Step at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230) at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268) at org.apache.flink.streaming.api.StreamConfig.getFunction(StreamConfig.java:191) ... 10 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.examples.iteration.IterateExample$Step at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) Can anybody guide me in the right direction, about what I am doing wrong, or how should I debug it? Thanks, Attila Axt |
Hey Attila, Thanks for trying out streaming! As for your issues: * Your simple topology: make sure that your user defined functions (e.g. ItemsKafkaSource) and the classes containing them are serializable, so that they can be shipped. For anything more specific if you can give send over your code to me I am more than happy to have a look at it. * Streaming wordcount not working without provided dataset: Yes, thanks for reporting it is my fault - which was known, but now I also have a JIRA [1] for it. :) However if you give it some input it should work. * Can you run the Twitter example locally by the way? Cheers, Marton On Mon, Nov 3, 2014 at 2:53 PM, axt <[hidden email]> wrote:
|
Hi Marton! Sorry for my late response.
My classes are serializable, thats not what causing the error. I've managed to narrow the problem (to the official examples, without using any own code): I can run the wordcount example (with input supplied) and the twitter example, locally. Unfortunatelly when I try to submit them to the flink-on-yarn instance, in both cases, I get the ClassNotFound errors. Stack traces are here: http://pastebin.com/MMLaNTcv I've tried it with two different hadoop installs. I'm starting flink-on-yarn with this command: HADOOP_HOME=/usr/lib/hadoop /opt/flink-yarn-0.7.0-incubating/bin/yarn-session.sh -n 2 -jm 1024 -tm 4096 -s 4 Can you give me the versions what you are using in your test environments? Do you have any specific configuration options? Maybe any ideas how should I debug this problem? Just a note: the "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.groupBy([I)Lorg/apache/flink/streaming/api/datastream/GroupedDataStream;" errors have disappeared, it was caused just by an inconsistent build. Greetings, Attila On 2014-11-03 16:57, Márton Balassi wrote:
|
Hey Attila, Thanks for the detailed bug report. I've picked up the issue and fixed the build for the streaming example packages. [1] There is an outstanding issue with the most likely the class loading when executing streaming code remotely. I've started working on it, Robert was kind enough to give me some guidance on it. Getting back to you as soon as it is fixed, Marton On Wed, Nov 5, 2014 at 5:08 PM, axt <[hidden email]> wrote:
|
Dear Attila, Both the issues you have reported are fixed with the streaming commits just pushed to the current master. Could you please retry running the jobs? Best, Marton On Mon, Nov 10, 2014 at 4:51 PM, Márton Balassi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |