Hi, as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now face a class cast exception. The reproducible example is available at https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 I do not understand (yet) why such a simple example of reading Avro from a Schema Registry and Kafka (in the scala API) is still causing problems. java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1] Best, Georg |
For anyone following this: the discussion is happening on the Jira
issue: https://issues.apache.org/jira/browse/FLINK-18478 Best, Aljoscha On 23.07.20 15:32, Georg Heiler wrote: > Hi, > > as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now > face a class cast exception. > The reproducible example is available at > https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 > > I do not understand (yet) why such a simple example of reading Avro from a > Schema Registry and Kafka (in the scala API) is still causing problems. > > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to scala.Product > > ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to scala.Product > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy( > CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.api.operators. > StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: > 104) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.api.operators. > StreamSourceContexts$NonTimestampContext.collectWithTimestamp( > StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher > .emitRecordsWithTimestamps(AbstractFetcher.java:352) > ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher > .partitionConsumerRecordsHandler(KafkaFetcher.java:185) > ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher > .runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1 > .jar:1.11.1] > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase > .run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11 > .1.jar:1.11.1] > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > Best, > Georg > |
Free forum by Nabble | Edit this page |