GenericData cannot be cast to type scala.Product

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

GenericData cannot be cast to type scala.Product

Georg Heiler
Hi,

as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now face a class cast exception.

I do not understand (yet) why such a simple example of reading Avro from a Schema Registry and Kafka (in the scala API) is still causing problems.

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
 
ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: GenericData cannot be cast to type scala.Product

Aljoscha Krettek
For anyone following this: the discussion is happening on the Jira
issue: https://issues.apache.org/jira/browse/FLINK-18478

Best,
Aljoscha

On 23.07.20 15:32, Georg Heiler wrote:

> Hi,
>
> as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now
> face a class cast exception.
> The reproducible example is available at
> https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89
>
> I do not understand (yet) why such a simple example of reading Avro from a
> Schema Registry and Kafka (in the scala API) is still causing problems.
>
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to scala.Product
>
> ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to scala.Product
>      at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(
> CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
> 104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
> .emitRecordsWithTimestamps(AbstractFetcher.java:352)
> ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
> .partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
> .runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1
> .jar:1.11.1]
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11
> .1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>      at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> Best,
> Georg
>