ClassCastException when redeploying Flink job on running cluster

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

ClassCastException when redeploying Flink job on running cluster

Josh
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}

Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Till Rohrmann
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}


Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Josh
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}



Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Josh
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}




Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Till Rohrmann
The only thing I could think of is to not use the SpecificData singleton but instead creating a new SpecificData object for each SpecificDatumReader (you can pass it as a third argument to the constructor). This, of course, is not really efficient. But you could try it out to see whether it solves your problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 4:24 PM, Josh <[hidden email]> wrote:
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}





Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Josh
Thanks Till, your suggestion worked!

I actually just created a new SpecificData for each AvroDeserializationSchema instance, so I think it's still just as efficient.

Josh

On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <[hidden email]> wrote:
The only thing I could think of is to not use the SpecificData singleton but instead creating a new SpecificData object for each SpecificDatumReader (you can pass it as a third argument to the constructor). This, of course, is not really efficient. But you could try it out to see whether it solves your problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 4:24 PM, Josh <[hidden email]> wrote:
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}






Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when redeploying Flink job on running cluster

Till Rohrmann
Great to hear :-)

On Wed, Jun 8, 2016 at 7:45 PM, Josh <[hidden email]> wrote:
Thanks Till, your suggestion worked!

I actually just created a new SpecificData for each AvroDeserializationSchema instance, so I think it's still just as efficient.

Josh

On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <[hidden email]> wrote:
The only thing I could think of is to not use the SpecificData singleton but instead creating a new SpecificData object for each SpecificDatumReader (you can pass it as a third argument to the constructor). This, of course, is not really efficient. But you could try it out to see whether it solves your problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 4:24 PM, Josh <[hidden email]> wrote:
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <[hidden email]> wrote:
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <[hidden email]> wrote:
Hi Josh,

the error message you've posted usually indicates that there is a class loader issue. When you first run your program the class com.me.avro.MyAvroType will be first loaded (by the user code class loader). I suspect that this class is now somewhere cached (e.g. the avro serializer) and when you run your program a second time, then there is a new user code class loader which has loaded the same class and now you want to convert an instance of the first class into the second class. However, these two classes are not identical since they were loaded by different class loaders.

In order to find the culprit, it would be helpful to see the full stack trace of the ClassCastException and the code of the AvroDeserializationSchema. I suspect that something similar to https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <[hidden email]> wrote:
Hi all,

Currently I have to relaunch my Flink cluster every time I want to upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema. The ClassCastException occurs every time I redeploy the job without killing the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a little strange to get the avro type information (see below), and I'm wondering if that's causing the problem when the Flink job creates an AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] 

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}