FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

Anil K
Hi,

What is the best way to use Confluent SchemaRegistry with FlinkKafkaProducer?

What I have right now is as follows.
SerializationSchema<GenericRecord> serializationSchema =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistryUrl);

FlinkKafkaProducer<GenericRecord> kafkaProducer =
new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
outputStream.addSink(producer);
FlinkKafkaProducer with is SerializationSchema now depricated.
I am using flink 1.9.
How to use FlinkKafkaProducer with KafkaSerializationSchema with ConfluentSchemaRegsitry?
Is there some reference/documentation i could use?
Thanks , Anil.
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

Fabian Hueske-2
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian


Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <[hidden email]>:
Hi,

What is the best way to use Confluent SchemaRegistry with FlinkKafkaProducer?

What I have right now is as follows.
SerializationSchema<GenericRecord> serializationSchema =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistryUrl);

FlinkKafkaProducer<GenericRecord> kafkaProducer =
new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
outputStream.addSink(producer);
FlinkKafkaProducer with is SerializationSchema now depricated.
I am using flink 1.9.
How to use FlinkKafkaProducer with KafkaSerializationSchema with ConfluentSchemaRegsitry?
Is there some reference/documentation i could use?
Thanks , Anil.
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

Anil K
Thanks Fabian,

I ended up using something like below.
public class GenericSerializer implements KafkaSerializationSchema<GenericRecord> {

private final SerializationSchema<GenericRecord> valueSerializer;
private final String topic;

public GenericSerializer(String topic, Schema schemaValue, String schemaRegistryUrl) {
this.valueSerializer =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schemaValue, schemaRegistryUrl);
this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(GenericRecord element, Long timestamp) {
byte[] value = valueSerializer.serialize(element);
return new ProducerRecord<>(topic, value);
}
}
Then used a new object of GenericSerializer in the FlinkKafkaProducer 
FlinkKafkaProducer<GenericRecord> producer =
new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);
Thanks , Anil. 

On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <[hidden email]> wrote:
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian


Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <[hidden email]>:
Hi,

What is the best way to use Confluent SchemaRegistry with FlinkKafkaProducer?

What I have right now is as follows.
SerializationSchema<GenericRecord> serializationSchema =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistryUrl);

FlinkKafkaProducer<GenericRecord> kafkaProducer =
new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
outputStream.addSink(producer);
FlinkKafkaProducer with is SerializationSchema now depricated.
I am using flink 1.9.
How to use FlinkKafkaProducer with KafkaSerializationSchema with ConfluentSchemaRegsitry?
Is there some reference/documentation i could use?
Thanks , Anil.
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

Fabian Hueske-2
Thanks for sharing your solution Anil!

Cheers, Fabian

Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K <[hidden email]>:
Thanks Fabian,

I ended up using something like below.
public class GenericSerializer implements KafkaSerializationSchema<GenericRecord> {

private final SerializationSchema<GenericRecord> valueSerializer;
private final String topic;

public GenericSerializer(String topic, Schema schemaValue, String schemaRegistryUrl) {
this.valueSerializer =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schemaValue, schemaRegistryUrl);
this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(GenericRecord element, Long timestamp) {
byte[] value = valueSerializer.serialize(element);
return new ProducerRecord<>(topic, value);
}
}
Then used a new object of GenericSerializer in the FlinkKafkaProducer 
FlinkKafkaProducer<GenericRecord> producer =
new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);
Thanks , Anil. 

On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <[hidden email]> wrote:
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian


Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <[hidden email]>:
Hi,

What is the best way to use Confluent SchemaRegistry with FlinkKafkaProducer?

What I have right now is as follows.
SerializationSchema<GenericRecord> serializationSchema =
ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistryUrl);

FlinkKafkaProducer<GenericRecord> kafkaProducer =
new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
outputStream.addSink(producer);
FlinkKafkaProducer with is SerializationSchema now depricated.
I am using flink 1.9.
How to use FlinkKafkaProducer with KafkaSerializationSchema with ConfluentSchemaRegsitry?
Is there some reference/documentation i could use?
Thanks , Anil.