@Override
public T deserialize(byte[] message) throws IOException {
checkAvroInitialized();
getInputStream().setBuffer(message);
Schema writerSchema = schemaCoder.readSchema(getInputStream());
GenericDatumReader<T> datumReader = getDatumReader();
datumReader.setSchema(writerSchema);
datumReader.setExpected(writerSchema); // <-- the difference
return datumReader.read(null, getDecoder());
}
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want always. If
I have messages of different schema in the same topic, I cannot apply
`ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
this question
<https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without>
. I am also doing the same thing in my pipeline by providing custom
deserialiser using confluentSchemaRegistryClient. So as far as I understood,
in this usecase there is no way to tell flink about
`GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
function. Please tell me if my understanding is correct.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbHFree forum by Nabble | Edit this page |