Re: Kafka + confluent schema registry Avro parsing

Posted by Stephan Ewen on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kafka-confluent-schema-registry-Avro-parsing-tp3578p3591.html

The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster.

I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it.

Stephan


On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]> wrote:
Hi Max

Thanks for the example.

Based on your example here is what i did:

public class Streamingkafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(1);

ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", true);
VerifiableProperties vProps = new VerifiableProperties(props);

DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps);
env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
parameterTool.getProperties())).print();

env.execute();
}
public class MyAvroDeserializer implements DeserializationSchema<String> {
private KafkaAvroDecoder decoder;

public MyAvroDeserializer(VerifiableProperties vProps) {
this.decoder = new KafkaAvroDecoder(vProps);
}

@Override
public String deserialize(byte[] message) throws IOException {
return (String) this.decoder.fromBytes(message);
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return null;
}
}

Here is the error i am seeing...

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e not serializable
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
	at test.flink.Streamingkafka.main(Streamingkafka.java:25)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDecoder
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
	... 11 more


Am i doing some thing wrong in my code?



On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> wrote:
Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);



Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu