flink kafka producer avro serializer problem

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink kafka producer avro serializer problem

Xin Ma
Hello,

Currently, I am using confluent Kafka Avro serializer to write to Kafka, and in the meanwhile register the schema to confluent schema registry.

The problem here is that our upstream is deserialized from msgpack and converted to a hashmap<String,ImmutableValue>, which is not serializable for avro. The map includes the properties of an event.  The key is the field name and the value is the field value. As we have many different msgpack upstreams, and each represents a type of event, we don't want to do the serialization as the official sample codes did. It has to provide an Avro schema file for each upstream, which is hard for us to manage.

So I got two ideas here,
first, since we can use java reflection to build java POJO from a json/hashmap, is it also possible to build an avro GenericRecord like that?

second, generate the avro schema by iterating each incoming hashmap's key and value. I found an implementation here 

Could anyone provide any recommendations on how to implement this?
package com.wish.log.etl.fn;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.msgpack.value.ImmutableValue;

import java.util.HashMap;
import java.util.Map;

public class KafkaGenericAvroSerializationSchema implements SerializationSchema<Map<String, ImmutableValue>> {

private final String registryUrl = "";
private transient KafkaAvroSerializer inner;
private final String topic;

public KafkaGenericAvroSerializationSchema (String topic){
this.topic = topic;
}

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroSerializer(client, props);
}
}

@Override
public byte[] serialize(Map<String, ImmutableValue> input) {
// KafkaAvroSerializer is not serializable, needs to be initialized here
checkInitialized();

// official sample codes
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericData.Record record = new GenericRecordBuilder(schema).set("f1","kevin")
.set("f2",1234)
.build();

// how to serialize my input

return inner.serialize(topic, input);
}

} 

Best,
Kevin

Reply | Threaded
Open this post in threaded view
|

Re: flink kafka producer avro serializer problem

Arvid Heise-4
Hi Kevin,

Could you please expand why using Avro maps is not an option? [1]
Of course, you wouldn't get some advantages like safe schema evolution but I can't see how that's supposed to work in your case anyways. Note that a map is also inefficient as keys are duplicated in each record.

Dynamically creating a schema during runtime is imho not a good idea: Usually, in a production environment you should disallow programmatic schema changes during runtime to avoid accidental changes.
Further putting different schemas into the same topic is heavily discouraged as it makes consumption painful and sidesteps schema evolution support.

The best way to get good production-ready, and efficient code would be to automatically translate all events in an Avro schema. If you want to write it into a common topic, the recommendation is to use a union type-like record like this:
 {"namespace": "org.apache.flink.avro.generated",
"type": "record",
"name": "Event",
"fields": [
{"name": "event1", "type": [ "null", "EventType1" ], "default": null},
{"name": "event2", "type": [ "null", "EventType2" ], "default": null}
]
}
Where EventTypeX is a record schema generated from your msgpack.
Then you can safely add new events as you go or change the fields of an existing EventType in a schema-evolution compatible way.

Note you always have friction when you go from schema-less to schema. If you opt for your solution, implementing consumers will be really tedious. It's up to you to decide where you want to have the pain (producer or consumer). In most cases, it should be producer.


On Thu, Jun 17, 2021 at 12:03 PM Xin Ma <[hidden email]> wrote:
Hello,

Currently, I am using confluent Kafka Avro serializer to write to Kafka, and in the meanwhile register the schema to confluent schema registry.

The problem here is that our upstream is deserialized from msgpack and converted to a hashmap<String,ImmutableValue>, which is not serializable for avro. The map includes the properties of an event.  The key is the field name and the value is the field value. As we have many different msgpack upstreams, and each represents a type of event, we don't want to do the serialization as the official sample codes did. It has to provide an Avro schema file for each upstream, which is hard for us to manage.

So I got two ideas here,
first, since we can use java reflection to build java POJO from a json/hashmap, is it also possible to build an avro GenericRecord like that?

second, generate the avro schema by iterating each incoming hashmap's key and value. I found an implementation here 

Could anyone provide any recommendations on how to implement this?
package com.wish.log.etl.fn;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.msgpack.value.ImmutableValue;

import java.util.HashMap;
import java.util.Map;

public class KafkaGenericAvroSerializationSchema implements SerializationSchema<Map<String, ImmutableValue>> {

private final String registryUrl = "";
private transient KafkaAvroSerializer inner;
private final String topic;

public KafkaGenericAvroSerializationSchema (String topic){
this.topic = topic;
}

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroSerializer(client, props);
}
}

@Override
public byte[] serialize(Map<String, ImmutableValue> input) {
// KafkaAvroSerializer is not serializable, needs to be initialized here
checkInitialized();

// official sample codes
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericData.Record record = new GenericRecordBuilder(schema).set("f1","kevin")
.set("f2",1234)
.build();

// how to serialize my input

return inner.serialize(topic, input);
}

} 

Best,
Kevin