Live updating Serialization Schemas in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Live updating Serialization Schemas in Flink

Hunter Herman

Hi flink-users! I need advice about how to tackle a programming problem I’m facing. I have a bunch of jobs that look something like this sketch

Source<GenericRecord> kafkaSource;

kafkaSource

    .map(function that takes generic record)

    .map( ... )

    ...

    .sink(kafka sink that takes in generic records)

The reason we represent data as GenericRecords is that the Avro schemas that are in play vary during runtime. We write schema descriptions to a separate topic. We're aware of the performance penalty of passing GenericRecords to/from operators/kafka, so we wrote our own Kafka serialization schema and Kryo serializer for GenericRecords. The tricky part is our custom serializer needs to know what the current list of schemas is so it can figure out how to ser/de messages as they pass through the graph.

I can't for the life of me figure out how to pass this info into our serializers in a sane way. The methods I'm aware of are:

1.      A static field somewhere that polls an external system for the list of records. We already do this but we think it will cause class loader leaks, since the polling thread is created inside the custom serializer, and its not clear where we should cancel it.

2.      Broadcast state. We could try and stream the schemas around our graph using broadcast state, but that means its going to be much less ergonomic to write these jobs; every single operator in the graph will have to receive the broadcast state, and will have to handle serialization internally instead of using our custom serializer

3.      A static field which is populated by the kafka stream. This avoids the thread leak of (1), but I don't think flink guarantees we can get this static field populated in every task slot. Its hard to control where the kafka stream will be processed.

I know its a complicated situation, so I hope it came across clearly. I feel pretty stumped, as none of the solutions I've considered seem adequate. Is there another option I haven't thought of? Is there a better way to manage a dynamic set of Avro schemas without restarting? Would love any advice! Thanks!

 

SO: https://stackoverflow.com/questions/64054753/live-updating-serialization-schemas-in-flink

Reply | Threaded
Open this post in threaded view
|

Re: Live updating Serialization Schemas in Flink

Dawid Wysakowicz-2

Hi,

Unfortunately I don't have a nice solution for you. I would also generally discourage such a pattern. Usually how multiple/dynamic schemas are used is with a help of schema registry. In that case you have some sort of an id serialized along with records which you can use to look up the schema. In that scenario you don't need to poll for schemas, which can easily go out of sync or may have all sorts of problems you described.

Best of luck with figuring out your problem.

Best,

Dawid

On 24/09/2020 23:37, Hunter Herman wrote:

Hi flink-users! I need advice about how to tackle a programming problem I’m facing. I have a bunch of jobs that look something like this sketch

Source<GenericRecord> kafkaSource;

kafkaSource

    .map(function that takes generic record)

    .map( ... )

    ...

    .sink(kafka sink that takes in generic records)

The reason we represent data as GenericRecords is that the Avro schemas that are in play vary during runtime. We write schema descriptions to a separate topic. We're aware of the performance penalty of passing GenericRecords to/from operators/kafka, so we wrote our own Kafka serialization schema and Kryo serializer for GenericRecords. The tricky part is our custom serializer needs to know what the current list of schemas is so it can figure out how to ser/de messages as they pass through the graph.

I can't for the life of me figure out how to pass this info into our serializers in a sane way. The methods I'm aware of are:

1.      A static field somewhere that polls an external system for the list of records. We already do this but we think it will cause class loader leaks, since the polling thread is created inside the custom serializer, and its not clear where we should cancel it.

2.      Broadcast state. We could try and stream the schemas around our graph using broadcast state, but that means its going to be much less ergonomic to write these jobs; every single operator in the graph will have to receive the broadcast state, and will have to handle serialization internally instead of using our custom serializer

3.      A static field which is populated by the kafka stream. This avoids the thread leak of (1), but I don't think flink guarantees we can get this static field populated in every task slot. Its hard to control where the kafka stream will be processed.

I know its a complicated situation, so I hope it came across clearly. I feel pretty stumped, as none of the solutions I've considered seem adequate. Is there another option I haven't thought of? Is there a better way to manage a dynamic set of Avro schemas without restarting? Would love any advice! Thanks!

 

SO: https://stackoverflow.com/questions/64054753/live-updating-serialization-schemas-in-flink


signature.asc (849 bytes) Download Attachment