Re: Help needed to increase throughput of simple flink app

Posted by Arvid Heise-3 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Help-needed-to-increase-throughput-of-simple-flink-app-tp39289p39417.html

If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it.
If you want to encode different kinds of events, then the common approach is to use some kind of envelope schema where different event types are encoded as optional fields.

If you want to stick with your custom approach, then you probably want to implement your own AvroDeserializationSchema that reuses the existing CachedSchemaCoderProvider. If you check the code of RegistryAvroDeserializationSchema, you will notice that the actual implementation is rather slim.

@Override
public T deserialize(byte[] message) throws IOException {
checkAvroInitialized();
getInputStream().setBuffer(message);
Schema writerSchema = schemaCoder.readSchema(getInputStream());

GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(writerSchema);
datumReader.setExpected(writerSchema); // <-- the difference

return datumReader.read(null, getDecoder());
}

On Thu, Nov 12, 2020 at 1:42 PM ashwinkonale <[hidden email]> wrote:
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want always. If
I have messages of different schema in the same topic, I cannot apply
`ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
this  question
<https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without>
. I am also doing the same thing in my pipeline by providing custom
deserialiser using confluentSchemaRegistryClient. So as far as I understood,
in this usecase there is no way to tell flink about
`GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
function. Please tell me if my understanding is correct.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng