Hello,
I need some guidance on how to report a bug.
I’m testing version 1.2 on my local cluster and the first time I submit the job everything works but whenever I re-submit the same job it fails with org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: au.com.my.package.schema.p.WowTransaction cannot be cast to au.com.my.package.schema.p.WowTransaction at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132) at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763) at org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412) ... 14 more I'm running a flink cluster built from the "release-1.2" branch on github. How can I validate that this is a Flink big? Where can I report this? What sort of information do I need to provide? Cheers, --
|
Hi Guiliano, thanks for bringing up this issue.So it might actually be a bug in Flink. It would be great if you could provide a short example program and instructions how to reproduce the problem. [1] https://issues.apache.org/jira/browse/FLINK 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <[hidden email]>:
|
Hi, I'd like to chime in since I've faced the same issue running Flink 1.1.4. I have a long-running YARN session which I use to run multiple streaming jobs concurrently. Once after cancelling and resubmitting the job I saw the "X cannot be cast to X" ClassCastException exception in logs. I restarted YARN session, then the problem disappeared. The class that failed to be cast was autogenerated by Avro compiler. I know that Avro's Java binding does caching schemas in some static WeakHashMap. I'm wondering whether that may step in the way of Flink classloading design. Anyway, I would be interested in watching the issue in Flink JIRA. Giuliano, could you provide the issue number? Thanks, Yury 2017-01-11 14:11 GMT+03:00 Fabian Hueske <[hidden email]>:
|
@Giuliano: any updates? Very curious to figure out what's causing
this. As Fabian said, this is most likely a class loading issue. Judging from the stack trace, you are not running with YARN but a standalone cluster. Is that correct? Class loading wise nothing changed between Flink 1.1 and Flink 1.2 with respect to class loading and standalone clusters. Did you put any JARs into the lib folder of Flink before submitting the job? – Ufuk On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin <[hidden email]> wrote: > Hi, > > I'd like to chime in since I've faced the same issue running Flink 1.1.4. I > have a long-running YARN session which I use to run multiple streaming jobs > concurrently. Once after cancelling and resubmitting the job I saw the "X > cannot be cast to X" ClassCastException exception in logs. I restarted YARN > session, then the problem disappeared. > > The class that failed to be cast was autogenerated by Avro compiler. I know > that Avro's Java binding does caching schemas in some static WeakHashMap. > I'm wondering whether that may step in the way of Flink classloading design. > > Anyway, I would be interested in watching the issue in Flink JIRA. > > Giuliano, could you provide the issue number? > > Thanks, > Yury > > 2017-01-11 14:11 GMT+03:00 Fabian Hueske <[hidden email]>: >> >> Hi Guiliano, >> >> thanks for bringing up this issue. >> A "ClassCastException: X cannot be cast to X" often points to a >> classloader issue. >> So it might actually be a bug in Flink. >> >> I assume you submit the same application (same jar file) with the same >> command right? >> Did you cancel the job before resubmitting? >> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE button >> on top) and include the commit hash from which you built Flink? >> It would be great if you could provide a short example program and >> instructions how to reproduce the problem. >> >> Thank you very much, >> Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK >> >> >> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <[hidden email]>: >>> >>> Hello, >>> >>> >>> >>> I need some guidance on how to report a bug. >>> >>> >>> >>> I’m testing version 1.2 on my local cluster and the first time I submit >>> the job everything works but whenever I re-submit the same job it fails with >>> >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) >>> >>> at >>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) >>> >>> at >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>> >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) >>> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) >>> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>> >>> at >>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>> >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> >>> at >>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>> >>> at scala.App$class.main(App.scala:76) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) >>> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) >>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>> >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) >>> >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) >>> >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) >>> >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) >>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >>> >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) >>> >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) >>> >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >>> >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>> >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>> >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> Caused by: java.lang.RuntimeException: Could not forward element to next >>> operator >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.lang.ClassCastException: >>> au.com.my.package.schema.p.WowTransaction cannot be cast to >>> au.com.my.package.schema.p.WowTransaction >>> >>> at >>> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132) >>> >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763) >>> >>> at >>> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) >>> >>> at >>> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412) >>> >>> ... 14 more >>> >>> >>> I'm running a flink cluster built from the "release-1.2" branch on >>> github. >>> >>> >>> How can I validate that this is a Flink big? >>> >>> Where can I report this? >>> >>> What sort of information do I need to provide? >>> >>> >>> >>> Cheers, >>> >>> Giuliano Caliari >>> -- >>> -- >>> Giuliano Caliari (+55 11 984898464) >>> >>> Master Software Engineer by Escola Politécnica da USP >>> Bachelor in Computer Science by Instituto de Matemática e Estatística da >>> USP >>> >> > |
Hi! I think Yury pointed out the correct diagnosis. Caching the classes across multiple jobs in the same session can cause these types of issues. For YARN single-job deployments, Flink 1.2 will not to any dynamic classloading any more, but start with everything in the application classpath. For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot containers. Best, Stephan On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi <[hidden email]> wrote: @Giuliano: any updates? Very curious to figure out what's causing |
For my case I tracked down the culprit. It's been Avro indeed. I'm providing details below, since I believe the pattern is pretty common for such issues. In YARN setup there are several sources where classes are loaded from: Flink lib directory, YARN lib directories, user code. The first two sources are handled by system classloader, the last one is loaded by FlinkUserCodeClassLoader. My streaming job parses Avro-encoded data using SpecificRecord facility. In essence, the job looks like this: Source -> Avro parser (Map) -> Sink. Parallelism is 1. Job operates inside a long-lived YARN session. I have a subclass of SpecificRecord, say it's name is MySpecificRecord. From class loading perspective, Avro library classes, including the SpecificRecord, are loaded by system class loader from YARN lib dir - such classes are shared across different Flink tasks within task manager. On the other side, MySpecificRecord is in the job fat jar, so it gets loaded by FlinkUserCodeClassLoader. Upon every job restart, task gets a new FlinkUserCodeClassLoader instance, so classes from user code are confined to a task instance. Simply put, the parsing itself looks like this: val bean = new SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...) Now, the scenario: 1. I start my job. Parsing is initiated, so the SpecificDatumReader and SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader is instantiated, let's denote its instance as "A". MySpecificRecord then gets loaded by A. 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache that maps some string key derived from Avro schema to the implementing class. So during parsing I get MySpecificRecord (A) cached there. 3. I stop the job and re-submit it. The JVM process is the same, so all standard Avro classes, including SpecificData, remain loaded. A new task instance is created and gets a new FlinkUserCodeClassLoader instance, let's name it "B". A new MySpecificRecord class incarnation is loaded by B. From JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), even though their bytecode is identical. 4. The job starts parsing again. SpecificDatumReader consults SpecificData.INSTANCE's cache for any stashed classes and finds MySpecificRecord (A) there. 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a bean for filling the parsed data in. 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back to job. 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B). 8. ClassCastException :^( I fixed the issue by not using the SpecificData.INSTANCE singleton (even though this is considered a common and expected practice). I feed every parser a new instance of SpecificData. This way the class cache is confined to a parser instance and gets recycled along with it. Hope this helps, Yury 2017-01-16 14:03 GMT+03:00 Stephan Ewen <[hidden email]>:
|
Hello,
Yuri's description of the issue is spot on. We are running our cluster on YARN and using Avro for serialization, exactly as described. @Ufuk, I'm running my Cluster on YARN, 4 Task Managers with 2 slots each but this particular job has parallelism 1. @Yuri, I'll test your fix as soon as I can and report back. @Fabian, do you still want me to open the issue? Cheers, |
Hi Giuliano, I think it would be good to document this behavior, not sure though what the best place would be.It would be nice, if you could open a JIRA and describe the issue there (basically copy Yuri's analysis). Thank you, Fabian 2017-01-19 8:35 GMT+01:00 Giuliano Caliari <[hidden email]>: Hello, |
Thank you Giuliano! 2017-01-25 6:54 GMT+01:00 Giuliano Caliari <[hidden email]>: Issue reported: |
Quick update: I've closed the issue after confirming that Yuri's workaround fixed it for us.
|
Free forum by Nabble | Edit this page |