http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Reading-from-AVRO-files-tp35850p35853.html
Thanks Timo,
the stacktrace with 1.9.2-generated specific file is the following
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-06-01T02:00:42.105Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
... 7 more
I reckon logical types might have been considered somehow experimental since...ever. But, honestly, I've been using them in the Kafka/Java ecosystem as well as in Spark without too many problems.
For my specific use case, the schema is given. Messages are produced by a 3rd party and we cannot change the schema (especially because it's a legit schema).
I am desperately looking for a workaround.
I had a similar issue with a Kafka Source, and AVRO records containing decimals and timestamps. Timestamps worked but not decimals.
I was able to work around the problem using GenericRecords.
But Kafka source relies on AvroDeserializationSchema rather than AvroSerializer, and has no problem handling GenericRecords.
I'm honestly finding very confusing having different ways of handling AVRO deserialization inside Flink core components.
Cheers
Lorenzo