Kafka + confluent schema registry Avro parsing

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka + confluent schema registry Avro parsing

Madhukar Thota
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu

Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Maximilian Michels
Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);



Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu


Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Madhukar Thota
Hi Max

Thanks for the example.

Based on your example here is what i did:

public class Streamingkafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(1);

ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", true);
VerifiableProperties vProps = new VerifiableProperties(props);

DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps);
env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
parameterTool.getProperties())).print();

env.execute();
}
public class MyAvroDeserializer implements DeserializationSchema<String> {
private KafkaAvroDecoder decoder;

public MyAvroDeserializer(VerifiableProperties vProps) {
this.decoder = new KafkaAvroDecoder(vProps);
}

@Override
public String deserialize(byte[] message) throws IOException {
return (String) this.decoder.fromBytes(message);
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return null;
}
}

Here is the error i am seeing...

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e not serializable
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
	at test.flink.Streamingkafka.main(Streamingkafka.java:25)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDecoder
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
	... 11 more


Am i doing some thing wrong in my code?



On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> wrote:
Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);



Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu



Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Stephan Ewen
The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster.

I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it.

Stephan


On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]> wrote:
Hi Max

Thanks for the example.

Based on your example here is what i did:

public class Streamingkafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(1);

ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", true);
VerifiableProperties vProps = new VerifiableProperties(props);

DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps);
env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
parameterTool.getProperties())).print();

env.execute();
}
public class MyAvroDeserializer implements DeserializationSchema<String> {
private KafkaAvroDecoder decoder;

public MyAvroDeserializer(VerifiableProperties vProps) {
this.decoder = new KafkaAvroDecoder(vProps);
}

@Override
public String deserialize(byte[] message) throws IOException {
return (String) this.decoder.fromBytes(message);
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return null;
}
}

Here is the error i am seeing...

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e not serializable
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
	at test.flink.Streamingkafka.main(Streamingkafka.java:25)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDecoder
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
	... 11 more


Am i doing some thing wrong in my code?



On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> wrote:
Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);



Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
Hi 

I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. 

I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.

Any help is appreciated. Thanks in Advance. 

Thanks,
Madhu




Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Maximilian Michels
Stephan is right, this should do it in deserialize():

    if (decoder == null) {
        decoder = new KafkaAvroDecoder(vProps);
    }

Further, you might have to specify the correct return type for
getProducedType(). You may use

    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

Cheers,
Max

On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote:

> The KafkaAvroDecoder is not serializable, and Flink uses serialization to
> distribute the code to the TaskManagers in the cluster.
>
> I think you need to "lazily" initialize the decoder, in the first invocation
> of "deserialize()". That should do it.
>
> Stephan
>
>
> On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]>
> wrote:
>>
>> Hi Max
>>
>> Thanks for the example.
>>
>> Based on your example here is what i did:
>>
>> public class Streamingkafka {
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.enableCheckpointing(500);
>>         env.setParallelism(1);
>>
>>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>>         Properties props = new Properties();
>>         props.put("schema.registry.url", "http://localhost:8081");
>>         props.put("specific.avro.reader", true);
>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>
>>         DeserializationSchema<String> decoder = new
>> MyAvroDeserializer(vProps);
>>         env.addSource(new
>> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>>                 parameterTool.getProperties())).print();
>>
>>         env.execute();
>>     }
>>
>> public class MyAvroDeserializer implements DeserializationSchema<String> {
>>     private KafkaAvroDecoder decoder;
>>
>>     public MyAvroDeserializer(VerifiableProperties vProps) {
>>         this.decoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>>     @Override
>>     public String deserialize(byte[] message) throws IOException {
>>         return (String) this.decoder.fromBytes(message);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(String nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<String> getProducedType() {
>>         return null;
>>     }
>> }
>>
>>
>> Here is the error i am seeing...
>>
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: Object
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> io.confluent.kafka.serializers.KafkaAvroDecoder
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> ... 11 more
>>
>>
>>
>> Am i doing some thing wrong in my code?
>>
>>
>>
>> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]>
>> wrote:
>>>
>>> Hi Madhukar,
>>>
>>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>>> you supply a DeserializationSchema in the constructor. You simply create a
>>> class which implements DeserializationSchema and contains the
>>> KafkaAvroDecoder with the schema registry.
>>>
>>> Like so:
>>>
>>> public class MyAvroDeserializer implements DeserializationSchema<MyType>
>>> {
>>>
>>>     private KafkaAvroDecoder decoder;
>>>
>>>     public MyAvroDeserializer() {
>>>          SchemaRegistryClient schemaRegistry = new
>>> SchemaRegistryClient(...);
>>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>>>     }
>>>
>>>     public MyType deserialize(byte[] message) throws Exception {
>>>          return (MyType) this.decoder.fromBytes(messages);
>>>     }
>>>
>>>     public boolean isEndOfStream(MyType nextElement) {
>>>          return false;
>>>     }
>>>
>>> }
>>>
>>> Then you supply this class when creating the consumer:
>>>
>>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>>> Properties props = new Properties();
>>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>>> FetcherType fetcherType =
>>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
>>> props, offsetStore, fetcherType);
>>>
>>>
>>> Let me know if that works for you.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>>> <[hidden email]> wrote:
>>>>
>>>> Hi
>>>>
>>>> I am very new to Avro. Currently I am using confluent Kafka version and
>>>> I am able to write an Avro message to Kafka by storing schema in schema
>>>> registry. Now I need to consume those messages using Flink Kafka Consumer
>>>> and having a hard time to deserialize the messages.
>>>>
>>>> I am looking for an example on how to deserialize Avro message where
>>>> schema is stored in schema registry.
>>>>
>>>> Any help is appreciated. Thanks in Advance.
>>>>
>>>> Thanks,
>>>> Madhu
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Madhukar Thota
Hi Max/Ewen,

Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions.

public class MyAvroDeserializer implements DeserializationSchema<String> {

private transient KafkaAvroDecoder decoder;

public MyAvroDeserializer(VerifiableProperties vProps) {
if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}
}

@Override
public String deserialize(byte[] message) throws IOException {
return (String) decoder.fromBytes(message);
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return TypeExtractor.getForClass(String.class);
}

}
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(1/4) switched to FAILED 
java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)

11/19/2015 07:47:39	Job execution switched to status FAILING.
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELED 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELED 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELED 
11/19/2015 07:47:39	Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)



On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote:
Stephan is right, this should do it in deserialize():

    if (decoder == null) {
        decoder = new KafkaAvroDecoder(vProps);
    }

Further, you might have to specify the correct return type for
getProducedType(). You may use

    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

Cheers,
Max

On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote:
> The KafkaAvroDecoder is not serializable, and Flink uses serialization to
> distribute the code to the TaskManagers in the cluster.
>
> I think you need to "lazily" initialize the decoder, in the first invocation
> of "deserialize()". That should do it.
>
> Stephan
>
>
> On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]>
> wrote:
>>
>> Hi Max
>>
>> Thanks for the example.
>>
>> Based on your example here is what i did:
>>
>> public class Streamingkafka {
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.enableCheckpointing(500);
>>         env.setParallelism(1);
>>
>>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>>         Properties props = new Properties();
>>         props.put("schema.registry.url", "http://localhost:8081");
>>         props.put("specific.avro.reader", true);
>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>
>>         DeserializationSchema<String> decoder = new
>> MyAvroDeserializer(vProps);
>>         env.addSource(new
>> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>>                 parameterTool.getProperties())).print();
>>
>>         env.execute();
>>     }
>>
>> public class MyAvroDeserializer implements DeserializationSchema<String> {
>>     private KafkaAvroDecoder decoder;
>>
>>     public MyAvroDeserializer(VerifiableProperties vProps) {
>>         this.decoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>>     @Override
>>     public String deserialize(byte[] message) throws IOException {
>>         return (String) this.decoder.fromBytes(message);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(String nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<String> getProducedType() {
>>         return null;
>>     }
>> }
>>
>>
>> Here is the error i am seeing...
>>
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: Object
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> io.confluent.kafka.serializers.KafkaAvroDecoder
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> ... 11 more
>>
>>
>>
>> Am i doing some thing wrong in my code?
>>
>>
>>
>> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]>
>> wrote:
>>>
>>> Hi Madhukar,
>>>
>>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>>> you supply a DeserializationSchema in the constructor. You simply create a
>>> class which implements DeserializationSchema and contains the
>>> KafkaAvroDecoder with the schema registry.
>>>
>>> Like so:
>>>
>>> public class MyAvroDeserializer implements DeserializationSchema<MyType>
>>> {
>>>
>>>     private KafkaAvroDecoder decoder;
>>>
>>>     public MyAvroDeserializer() {
>>>          SchemaRegistryClient schemaRegistry = new
>>> SchemaRegistryClient(...);
>>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>>>     }
>>>
>>>     public MyType deserialize(byte[] message) throws Exception {
>>>          return (MyType) this.decoder.fromBytes(messages);
>>>     }
>>>
>>>     public boolean isEndOfStream(MyType nextElement) {
>>>          return false;
>>>     }
>>>
>>> }
>>>
>>> Then you supply this class when creating the consumer:
>>>
>>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>>> Properties props = new Properties();
>>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>>> FetcherType fetcherType =
>>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
>>> props, offsetStore, fetcherType);
>>>
>>>
>>> Let me know if that works for you.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>>> <[hidden email]> wrote:
>>>>
>>>> Hi
>>>>
>>>> I am very new to Avro. Currently I am using confluent Kafka version and
>>>> I am able to write an Avro message to Kafka by storing schema in schema
>>>> registry. Now I need to consume those messages using Flink Kafka Consumer
>>>> and having a hard time to deserialize the messages.
>>>>
>>>> I am looking for an example on how to deserialize Avro message where
>>>> schema is stored in schema registry.
>>>>
>>>> Any help is appreciated. Thanks in Advance.
>>>>
>>>> Thanks,
>>>> Madhu
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Till Rohrmann

The constructor of Java classes after deserialization is not necessarily called. Thus, you should move the check

if (this.decoder == null) {
    this.decoder = new KafkaAvroDecoder(vProps);
}

into the deserialize method of MyAvroDeserializer.

Cheers,
Till


On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota <[hidden email]> wrote:
Hi Max/Ewen,

Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions.

public class MyAvroDeserializer implements DeserializationSchema<String> {

private transient KafkaAvroDecoder decoder;

public MyAvroDeserializer(VerifiableProperties vProps) {
if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}
}

@Override
public String deserialize(byte[] message) throws IOException {
return (String) decoder.fromBytes(message);
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return TypeExtractor.getForClass(String.class);
}

}
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(1/4) switched to FAILED 
java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)

11/19/2015 07:47:39	Job execution switched to status FAILING.
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELING 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELED 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELED 
11/19/2015 07:47:39	Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELED 
11/19/2015 07:47:39	Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
	at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)



On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote:
Stephan is right, this should do it in deserialize():

    if (decoder == null) {
        decoder = new KafkaAvroDecoder(vProps);
    }

Further, you might have to specify the correct return type for
getProducedType(). You may use

    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

Cheers,
Max

On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote:
> The KafkaAvroDecoder is not serializable, and Flink uses serialization to
> distribute the code to the TaskManagers in the cluster.
>
> I think you need to "lazily" initialize the decoder, in the first invocation
> of "deserialize()". That should do it.
>
> Stephan
>
>
> On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]>
> wrote:
>>
>> Hi Max
>>
>> Thanks for the example.
>>
>> Based on your example here is what i did:
>>
>> public class Streamingkafka {
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.enableCheckpointing(500);
>>         env.setParallelism(1);
>>
>>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>>         Properties props = new Properties();
>>         props.put("schema.registry.url", "http://localhost:8081");
>>         props.put("specific.avro.reader", true);
>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>
>>         DeserializationSchema<String> decoder = new
>> MyAvroDeserializer(vProps);
>>         env.addSource(new
>> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>>                 parameterTool.getProperties())).print();
>>
>>         env.execute();
>>     }
>>
>> public class MyAvroDeserializer implements DeserializationSchema<String> {
>>     private KafkaAvroDecoder decoder;
>>
>>     public MyAvroDeserializer(VerifiableProperties vProps) {
>>         this.decoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>>     @Override
>>     public String deserialize(byte[] message) throws IOException {
>>         return (String) this.decoder.fromBytes(message);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(String nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<String> getProducedType() {
>>         return null;
>>     }
>> }
>>
>>
>> Here is the error i am seeing...
>>
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: Object
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> io.confluent.kafka.serializers.KafkaAvroDecoder
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> ... 11 more
>>
>>
>>
>> Am i doing some thing wrong in my code?
>>
>>
>>
>> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]>
>> wrote:
>>>
>>> Hi Madhukar,
>>>
>>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>>> you supply a DeserializationSchema in the constructor. You simply create a
>>> class which implements DeserializationSchema and contains the
>>> KafkaAvroDecoder with the schema registry.
>>>
>>> Like so:
>>>
>>> public class MyAvroDeserializer implements DeserializationSchema<MyType>
>>> {
>>>
>>>     private KafkaAvroDecoder decoder;
>>>
>>>     public MyAvroDeserializer() {
>>>          SchemaRegistryClient schemaRegistry = new
>>> SchemaRegistryClient(...);
>>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>>>     }
>>>
>>>     public MyType deserialize(byte[] message) throws Exception {
>>>          return (MyType) this.decoder.fromBytes(messages);
>>>     }
>>>
>>>     public boolean isEndOfStream(MyType nextElement) {
>>>          return false;
>>>     }
>>>
>>> }
>>>
>>> Then you supply this class when creating the consumer:
>>>
>>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>>> Properties props = new Properties();
>>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>>> FetcherType fetcherType =
>>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
>>> props, offsetStore, fetcherType);
>>>
>>>
>>> Let me know if that works for you.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>>> <[hidden email]> wrote:
>>>>
>>>> Hi
>>>>
>>>> I am very new to Avro. Currently I am using confluent Kafka version and
>>>> I am able to write an Avro message to Kafka by storing schema in schema
>>>> registry. Now I need to consume those messages using Flink Kafka Consumer
>>>> and having a hard time to deserialize the messages.
>>>>
>>>> I am looking for an example on how to deserialize Avro message where
>>>> schema is stored in schema registry.
>>>>
>>>> Any help is appreciated. Thanks in Advance.
>>>>
>>>> Thanks,
>>>> Madhu
>>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: Kafka + confluent schema registry Avro parsing

Maximilian Michels
In reply to this post by Madhukar Thota
You need to initialize the decoder in the deserialize method instead
of in the constructor.

On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota
<[hidden email]> wrote:

> Hi Max/Ewen,
>
> Thank you for the inputs. I was able to solve the serialization issues. Now
> i am seeing the NullPoint Exceptions.
>
> public class MyAvroDeserializer implements DeserializationSchema<String> {
>
>     private transient KafkaAvroDecoder decoder;
>
>     public MyAvroDeserializer(VerifiableProperties vProps) {
>         if (this.decoder == null) {
>             this.decoder = new KafkaAvroDecoder(vProps);
>         }
>     }
>
>     @Override
>     public String deserialize(byte[] message) throws IOException {
>         return (String) decoder.fromBytes(message);
>     }
>
>     @Override
>     public boolean isEndOfStream(String nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<String> getProducedType() {
>         return TypeExtractor.getForClass(String.class);
>     }
>
> }
>
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to
> RUNNING
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to
> FAILED
> java.lang.Exception
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
> at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
>
> 11/19/2015 07:47:39 Job execution switched to status FAILING.
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to
> CANCELING
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to
> CANCELING
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to
> CANCELING
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to
> CANCELED
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to
> CANCELED
> 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to
> CANCELED
> 11/19/2015 07:47:39 Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
> at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
>
>
>
>
> On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Stephan is right, this should do it in deserialize():
>>
>>     if (decoder == null) {
>>         decoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>> Further, you might have to specify the correct return type for
>> getProducedType(). You may use
>>
>>     public TypeInformation<String> getProducedType() {
>>         return TypeExtractor.getForClass(String.class);
>>     }
>>
>> Cheers,
>> Max
>>
>> On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote:
>> > The KafkaAvroDecoder is not serializable, and Flink uses serialization
>> > to
>> > distribute the code to the TaskManagers in the cluster.
>> >
>> > I think you need to "lazily" initialize the decoder, in the first
>> > invocation
>> > of "deserialize()". That should do it.
>> >
>> > Stephan
>> >
>> >
>> > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota
>> > <[hidden email]>
>> > wrote:
>> >>
>> >> Hi Max
>> >>
>> >> Thanks for the example.
>> >>
>> >> Based on your example here is what i did:
>> >>
>> >> public class Streamingkafka {
>> >>     public static void main(String[] args) throws Exception {
>> >>         StreamExecutionEnvironment env =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>         env.enableCheckpointing(500);
>> >>         env.setParallelism(1);
>> >>
>> >>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>> >>         Properties props = new Properties();
>> >>         props.put("schema.registry.url", "http://localhost:8081");
>> >>         props.put("specific.avro.reader", true);
>> >>         VerifiableProperties vProps = new VerifiableProperties(props);
>> >>
>> >>         DeserializationSchema<String> decoder = new
>> >> MyAvroDeserializer(vProps);
>> >>         env.addSource(new
>> >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>> >>                 parameterTool.getProperties())).print();
>> >>
>> >>         env.execute();
>> >>     }
>> >>
>> >> public class MyAvroDeserializer implements
>> >> DeserializationSchema<String> {
>> >>     private KafkaAvroDecoder decoder;
>> >>
>> >>     public MyAvroDeserializer(VerifiableProperties vProps) {
>> >>         this.decoder = new KafkaAvroDecoder(vProps);
>> >>     }
>> >>
>> >>     @Override
>> >>     public String deserialize(byte[] message) throws IOException {
>> >>         return (String) this.decoder.fromBytes(message);
>> >>     }
>> >>
>> >>     @Override
>> >>     public boolean isEndOfStream(String nextElement) {
>> >>         return false;
>> >>     }
>> >>
>> >>     @Override
>> >>     public TypeInformation<String> getProducedType() {
>> >>         return null;
>> >>     }
>> >> }
>> >>
>> >>
>> >> Here is the error i am seeing...
>> >>
>> >>
>> >> Exception in thread "main"
>> >> org.apache.flink.api.common.InvalidProgramException: Object
>> >>
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> >> not serializable
>> >> at
>> >>
>> >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> >> at
>> >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> >> at
>> >>
>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> >> at
>> >>
>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> >> at
>> >>
>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> >> at
>> >>
>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> >> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> at java.lang.reflect.Method.invoke(Method.java:497)
>> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> >> Caused by: java.io.NotSerializableException:
>> >> io.confluent.kafka.serializers.KafkaAvroDecoder
>> >> at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> at
>> >>
>> >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> >> at
>> >>
>> >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> >> ... 11 more
>> >>
>> >>
>> >>
>> >> Am i doing some thing wrong in my code?
>> >>
>> >>
>> >>
>> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]>
>> >> wrote:
>> >>>
>> >>> Hi Madhukar,
>> >>>
>> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>> >>> you supply a DeserializationSchema in the constructor. You simply
>> >>> create a
>> >>> class which implements DeserializationSchema and contains the
>> >>> KafkaAvroDecoder with the schema registry.
>> >>>
>> >>> Like so:
>> >>>
>> >>> public class MyAvroDeserializer implements
>> >>> DeserializationSchema<MyType>
>> >>> {
>> >>>
>> >>>     private KafkaAvroDecoder decoder;
>> >>>
>> >>>     public MyAvroDeserializer() {
>> >>>          SchemaRegistryClient schemaRegistry = new
>> >>> SchemaRegistryClient(...);
>> >>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>> >>>     }
>> >>>
>> >>>     public MyType deserialize(byte[] message) throws Exception {
>> >>>          return (MyType) this.decoder.fromBytes(messages);
>> >>>     }
>> >>>
>> >>>     public boolean isEndOfStream(MyType nextElement) {
>> >>>          return false;
>> >>>     }
>> >>>
>> >>> }
>> >>>
>> >>> Then you supply this class when creating the consumer:
>> >>>
>> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>> >>> Properties props = new Properties();
>> >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>> >>> FetcherType fetcherType =
>> >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>> >>>
>> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"),
>> >>> decoder,
>> >>> props, offsetStore, fetcherType);
>> >>>
>> >>>
>> >>> Let me know if that works for you.
>> >>>
>> >>> Best regards,
>> >>> Max
>> >>>
>> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>> >>> <[hidden email]> wrote:
>> >>>>
>> >>>> Hi
>> >>>>
>> >>>> I am very new to Avro. Currently I am using confluent Kafka version
>> >>>> and
>> >>>> I am able to write an Avro message to Kafka by storing schema in
>> >>>> schema
>> >>>> registry. Now I need to consume those messages using Flink Kafka
>> >>>> Consumer
>> >>>> and having a hard time to deserialize the messages.
>> >>>>
>> >>>> I am looking for an example on how to deserialize Avro message where
>> >>>> schema is stored in schema registry.
>> >>>>
>> >>>> Any help is appreciated. Thanks in Advance.
>> >>>>
>> >>>> Thanks,
>> >>>> Madhu
>> >>>>
>> >>>
>> >>
>> >
>
>