Hi to all,
I tried to run my job on a brand new Flink cluster (0.9-SNAPSHOT) from the web client using the shading strategy of the quickstart example but I get this exception: Caused by: java.lang.RuntimeException: Could not create ClassReader: java.io.IOException: Class not found at org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:42) at org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:67) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:54) It seems that it cannot find some kryo class..how do I fix this? this is my shade plugin section of pom.xml: <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <!-- This list contains all dependencies of flink-dist Everything else will be packaged into the fat-jar --> <exclude>org.apache.flink:flink-shaded-*</exclude> <exclude>org.apache.flink:flink-core</exclude> <exclude>org.apache.flink:flink-java</exclude> <exclude>org.apache.flink:flink-scala</exclude> <exclude>org.apache.flink:flink-runtime</exclude> <exclude>org.apache.flink:flink-optimizer</exclude> <exclude>org.apache.flink:flink-clients</exclude> <exclude>org.apache.flink:flink-spargel</exclude> <exclude>org.apache.flink:flink-avro</exclude> <exclude>org.apache.flink:flink-java-examples</exclude> <exclude>org.apache.flink:flink-scala-examples</exclude> <exclude>org.apache.flink:flink-streaming-examples</exclude> <exclude>org.apache.flink:flink-streaming-core</exclude> <!-- Also exclude very big transitive dependencies of Flink WARNING: You have to remove these excludes if your code relies on other versions of these dependencies. --> <exclude>org.scala-lang:scala-library</exclude> <exclude>org.scala-lang:scala-compiler</exclude> <exclude>org.scala-lang:scala-reflect</exclude> <exclude>com.amazonaws:aws-java-sdk</exclude> <exclude>com.typesafe.akka:akka-actor_*</exclude> <exclude>com.typesafe.akka:akka-remote_*</exclude> <exclude>com.typesafe.akka:akka-slf4j_*</exclude> <exclude>io.netty:netty-all</exclude> <exclude>io.netty:netty</exclude> <exclude>org.eclipse.jetty:jetty-server</exclude> <exclude>org.eclipse.jetty:jetty-continuation</exclude> <exclude>org.eclipse.jetty:jetty-http</exclude> <exclude>org.eclipse.jetty:jetty-io</exclude> <exclude>org.eclipse.jetty:jetty-util</exclude> <exclude>org.eclipse.jetty:jetty-security</exclude> <exclude>org.eclipse.jetty:jetty-servlet</exclude> <exclude>commons-fileupload:commons-fileupload</exclude> <exclude>org.apache.avro:avro</exclude> <exclude>commons-collections:commons-collections</exclude> <exclude>org.codehaus.jackson:jackson-core-asl</exclude> <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude> <exclude>com.thoughtworks.paranamer:paranamer</exclude> <exclude>org.xerial.snappy:snappy-java</exclude> <exclude>org.apache.commons:commons-compress</exclude> <exclude>org.tukaani:xz</exclude> <exclude>com.esotericsoftware.kryo:kryo</exclude> <exclude>com.esotericsoftware.minlog:minlog</exclude> <exclude>org.objenesis:objenesis</exclude> <exclude>com.twitter:chill_*</exclude> <exclude>com.twitter:chill-java</exclude> <exclude>com.twitter:chill-avro_*</exclude> <exclude>com.twitter:chill-bijection_*</exclude> <exclude>com.twitter:bijection-core_*</exclude> <exclude>com.twitter:bijection-avro_*</exclude> <exclude>com.twitter:chill-protobuf</exclude> <exclude>com.google.protobuf:protobuf-java</exclude> <exclude>com.twitter:chill-thrift</exclude> <exclude>org.apache.thrift:libthrift</exclude> <exclude>commons-lang:commons-lang</exclude> <exclude>junit:junit</exclude> <exclude>de.javakaffee:kryo-serializers</exclude> <exclude>joda-time:joda-time</exclude> <exclude>org.apache.commons:commons-lang3</exclude> <exclude>org.slf4j:slf4j-api</exclude> <exclude>org.slf4j:slf4j-log4j12</exclude> <exclude>log4j:log4j</exclude> <exclude>org.apache.commons:commons-math</exclude> <exclude>org.apache.sling:org.apache.sling.commons.json</exclude> <exclude>commons-logging:commons-logging</exclude> <exclude>org.apache.httpcomponents:httpclient</exclude> <exclude>org.apache.httpcomponents:httpcore</exclude> <exclude>commons-codec:commons-codec</exclude> <exclude>com.fasterxml.jackson.core:jackson-core</exclude> <exclude>com.fasterxml.jackson.core:jackson-databind</exclude> <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> <exclude>org.codehaus.jettison:jettison</exclude> <exclude>stax:stax-api</exclude> <exclude>com.typesafe:config</exclude> <exclude>org.uncommons.maths:uncommons-maths</exclude> <exclude>com.github.scopt:scopt_*</exclude> <exclude>org.mortbay.jetty:servlet-api</exclude> <exclude>commons-io:commons-io</exclude> <exclude>commons-cli:commons-cli</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>org.apache.flink:*</artifact> <excludes> <exclude>org/apache/flink/shaded/**</exclude> <exclude>web-docs/**</exclude> </excludes> </filter> </filters> <createDependencyReducedPom>false</createDependencyReducedPom> <finalName>XXXX</finalName> <transformers> <!-- add Main-Class to manifest file --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class>XXX</Main-Class> </manifestEntries> </transformer> </transformers> </configuration> </execution> </executions> </plugin> |
Any insight about this..?
On Tue, May 19, 2015 at 12:49 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio, It would be helpful, if we knew which class could not be found. In the ClosureCleaner, can you change line 42 to include the class name in the error message? Like in this example: private static ClassReader getClassReader(Class<?> cls) {Could it be that you're running an old job on the latest snapshot version? This could cause class-related problems... Cheers, Max On Wed, May 20, 2015 at 9:41 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Yes it could be that the jar classes and those on the cluster are not aligned for some days..Now I'll recompile both sides and if I still have the error I will change line 42 as you suggested.
Tanks Max On Wed, May 20, 2015 at 10:53 AM, Maximilian Michels <[hidden email]> wrote:
|
Now I'm able to run the job but I get another exception..this time it seems that Flink it's not able to split my Parquet file:
Caused by: java.lang.ClassNotFoundException: parquet.hadoop.ParquetInputSplit at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit.readObject(HadoopInputSplit.java:104) I checked the jar and that class is present in my "fat" jar. What should I do now? On Wed, May 20, 2015 at 10:57 AM, Flavio Pompermaier <[hidden email]> wrote:
|
This is a bug in the HadoopInputSplit. It does not follow the general class loading rules in Flink. I think it is pretty straightforward to fix, I'll give it a quick shot... Can you send me the entire stack trace (where the serialization call comes from) to verify this? On Wed, May 20, 2015 at 12:03 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Here it is:
java.lang.RuntimeException: Requesting the next InputSplit failed. at org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:89) at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:340) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Unable to create InputSplit at org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit.readObject(HadoopInputSplit.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) at org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:83) ... 4 more Caused by: java.lang.ClassNotFoundException: parquet.hadoop.ParquetInputSplit at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit.readObject(HadoopInputSplit.java:104) ... 15 more On Wed, May 20, 2015 at 1:45 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi! I pushed a fix to the master that should solve this. It probably needs a bit until the snapshot repositories are synced. Let me know if it fixed your issue! Greetings, Stephan On Wed, May 20, 2015 at 1:48 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Thank you Stephan!I'll let you know tomorrow! On May 20, 2015 7:30 PM, "Stephan Ewen" <[hidden email]> wrote:
|
Now I'm able to run my job but after a while I get this other exception:
09:43:49,383 INFO org.apache.flink.runtime.taskmanager.TaskManager - Unregistering task and sending final execution state FINISHED to JobManager for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:490) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readTuplesFromThriftParquet(MyParquetThriftClass.java:94)) (a216cedd838190aebf3849fffe7fe576) 09:45:50,205 INFO org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results produced by task execution c088f1c46c6e823cd9cc90f0e679696c ERROR org.apache.flink.runtime.io.network.partition.ResultPartition - Error during release of result subpartition: Closing of asynchronous file channel was interrupted. java.io.IOException: Closing of asynchronous file channel was interrupted. at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:130) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:158) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.release(SpillableSubpartition.java:130) at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:288) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:91) at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:329) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:648) at java.lang.Thread.run(Thread.java:745) Any ideas? On Wed, May 20, 2015 at 9:11 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi! Interruptions usually happen as part of cancelling. Has the job failed for some other reason (and that exception is only a followup) ? Or it this the root cause of the failure. Stephan On Thu, May 21, 2015 at 9:55 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Could it be this the main failure reason?
09:45:58,650 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@192.168.234.83:6123] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 09:45:58,831 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@192.168.234.83:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. 09:45:58,889 INFO org.apache.flink.runtime.taskmanager.TaskManager - Disconnecting from JobManager: JobManager is no longer reachable 09:45:58,893 INFO org.apache.flink.runtime.taskmanager.TaskManager - Cancelling all computations and discarding all cached data. On Thu, May 21, 2015 at 9:57 AM, Stephan Ewen <[hidden email]> wrote:
|
This looks suspicious, but it should actually be also a consequence of a failure or disconnect between the TaskManager and the JobManager. Can you send us the whole log to have a closer look? Thanks, Stephan On Thu, May 21, 2015 at 10:59 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |