Hi I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. I am looking for an example on how to deserialize Avro message where schema is stored in schema registry. Any help is appreciated. Thanks in Advance. Thanks, Madhu |
Hi Madhukar, Let me know if that works for you.Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry. Like so: public class MyAvroDeserializer implements DeserializationSchema<MyType> { private KafkaAvroDecoder decoder; public MyAvroDeserializer() { SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...); this.decoder = new KafkaAvroDecoder(schemaRegistry); } public MyType deserialize(byte[] message) throws Exception { return (MyType) this.decoder.fromBytes(messages); } public boolean isEndOfStream(MyType nextElement) { return false; } } Then you supply this class when creating the consumer: DeserializationSchema<MyType> decoder = new MyAvroDeserializer() Properties props = new Properties(); OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType); Best regards, Max On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:
|
Hi Max Thanks for the example. Based on your example here is what i did: public class Streamingkafka { public class MyAvroDeserializer implements DeserializationSchema<String> { On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> wrote:
|
The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster. I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it. Stephan On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]> wrote:
|
Stephan is right, this should do it in deserialize():
if (decoder == null) { decoder = new KafkaAvroDecoder(vProps); } Further, you might have to specify the correct return type for getProducedType(). You may use public TypeInformation<String> getProducedType() { return TypeExtractor.getForClass(String.class); } Cheers, Max On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote: > The KafkaAvroDecoder is not serializable, and Flink uses serialization to > distribute the code to the TaskManagers in the cluster. > > I think you need to "lazily" initialize the decoder, in the first invocation > of "deserialize()". That should do it. > > Stephan > > > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]> > wrote: >> >> Hi Max >> >> Thanks for the example. >> >> Based on your example here is what i did: >> >> public class Streamingkafka { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.enableCheckpointing(500); >> env.setParallelism(1); >> >> ParameterTool parameterTool = ParameterTool.fromArgs(args); >> Properties props = new Properties(); >> props.put("schema.registry.url", "http://localhost:8081"); >> props.put("specific.avro.reader", true); >> VerifiableProperties vProps = new VerifiableProperties(props); >> >> DeserializationSchema<String> decoder = new >> MyAvroDeserializer(vProps); >> env.addSource(new >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder, >> parameterTool.getProperties())).print(); >> >> env.execute(); >> } >> >> public class MyAvroDeserializer implements DeserializationSchema<String> { >> private KafkaAvroDecoder decoder; >> >> public MyAvroDeserializer(VerifiableProperties vProps) { >> this.decoder = new KafkaAvroDecoder(vProps); >> } >> >> @Override >> public String deserialize(byte[] message) throws IOException { >> return (String) this.decoder.fromBytes(message); >> } >> >> @Override >> public boolean isEndOfStream(String nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<String> getProducedType() { >> return null; >> } >> } >> >> >> Here is the error i am seeing... >> >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: Object >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e >> not serializable >> at >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) >> at test.flink.Streamingkafka.main(Streamingkafka.java:25) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> Caused by: java.io.NotSerializableException: >> io.confluent.kafka.serializers.KafkaAvroDecoder >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >> at >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >> ... 11 more >> >> >> >> Am i doing some thing wrong in my code? >> >> >> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> >> wrote: >>> >>> Hi Madhukar, >>> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer, >>> you supply a DeserializationSchema in the constructor. You simply create a >>> class which implements DeserializationSchema and contains the >>> KafkaAvroDecoder with the schema registry. >>> >>> Like so: >>> >>> public class MyAvroDeserializer implements DeserializationSchema<MyType> >>> { >>> >>> private KafkaAvroDecoder decoder; >>> >>> public MyAvroDeserializer() { >>> SchemaRegistryClient schemaRegistry = new >>> SchemaRegistryClient(...); >>> this.decoder = new KafkaAvroDecoder(schemaRegistry); >>> } >>> >>> public MyType deserialize(byte[] message) throws Exception { >>> return (MyType) this.decoder.fromBytes(messages); >>> } >>> >>> public boolean isEndOfStream(MyType nextElement) { >>> return false; >>> } >>> >>> } >>> >>> Then you supply this class when creating the consumer: >>> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer() >>> Properties props = new Properties(); >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; >>> FetcherType fetcherType = >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; >>> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, >>> props, offsetStore, fetcherType); >>> >>> >>> Let me know if that works for you. >>> >>> Best regards, >>> Max >>> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota >>> <[hidden email]> wrote: >>>> >>>> Hi >>>> >>>> I am very new to Avro. Currently I am using confluent Kafka version and >>>> I am able to write an Avro message to Kafka by storing schema in schema >>>> registry. Now I need to consume those messages using Flink Kafka Consumer >>>> and having a hard time to deserialize the messages. >>>> >>>> I am looking for an example on how to deserialize Avro message where >>>> schema is stored in schema registry. >>>> >>>> Any help is appreciated. Thanks in Advance. >>>> >>>> Thanks, >>>> Madhu >>>> >>> >> > |
Hi Max/Ewen, Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions. public class MyAvroDeserializer implements DeserializationSchema<String> { 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to FAILED
java.lang.Exception
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
11/19/2015 07:47:39 Job execution switched to status FAILING.
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELING
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELING
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELING
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELED
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELED
11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELED
11/19/2015 07:47:39 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445) On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote: Stephan is right, this should do it in deserialize(): |
The constructor of Java classes after deserialization is not necessarily called. Thus, you should move the check
into the Cheers, On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota <[hidden email]> wrote:
|
In reply to this post by Madhukar Thota
You need to initialize the decoder in the deserialize method instead
of in the constructor. On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota <[hidden email]> wrote: > Hi Max/Ewen, > > Thank you for the inputs. I was able to solve the serialization issues. Now > i am seeing the NullPoint Exceptions. > > public class MyAvroDeserializer implements DeserializationSchema<String> { > > private transient KafkaAvroDecoder decoder; > > public MyAvroDeserializer(VerifiableProperties vProps) { > if (this.decoder == null) { > this.decoder = new KafkaAvroDecoder(vProps); > } > } > > @Override > public String deserialize(byte[] message) throws IOException { > return (String) decoder.fromBytes(message); > } > > @Override > public boolean isEndOfStream(String nextElement) { > return false; > } > > @Override > public TypeInformation<String> getProducedType() { > return TypeExtractor.getForClass(String.class); > } > > } > > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to > RUNNING > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to > FAILED > java.lang.Exception > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23) > at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445) > > 11/19/2015 07:47:39 Job execution switched to status FAILING. > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to > CANCELING > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to > CANCELING > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to > CANCELING > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to > CANCELED > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to > CANCELED > 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to > CANCELED > 11/19/2015 07:47:39 Job execution switched to status FAILED. > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23) > at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445) > > > > > On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote: >> >> Stephan is right, this should do it in deserialize(): >> >> if (decoder == null) { >> decoder = new KafkaAvroDecoder(vProps); >> } >> >> Further, you might have to specify the correct return type for >> getProducedType(). You may use >> >> public TypeInformation<String> getProducedType() { >> return TypeExtractor.getForClass(String.class); >> } >> >> Cheers, >> Max >> >> On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote: >> > The KafkaAvroDecoder is not serializable, and Flink uses serialization >> > to >> > distribute the code to the TaskManagers in the cluster. >> > >> > I think you need to "lazily" initialize the decoder, in the first >> > invocation >> > of "deserialize()". That should do it. >> > >> > Stephan >> > >> > >> > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota >> > <[hidden email]> >> > wrote: >> >> >> >> Hi Max >> >> >> >> Thanks for the example. >> >> >> >> Based on your example here is what i did: >> >> >> >> public class Streamingkafka { >> >> public static void main(String[] args) throws Exception { >> >> StreamExecutionEnvironment env = >> >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.enableCheckpointing(500); >> >> env.setParallelism(1); >> >> >> >> ParameterTool parameterTool = ParameterTool.fromArgs(args); >> >> Properties props = new Properties(); >> >> props.put("schema.registry.url", "http://localhost:8081"); >> >> props.put("specific.avro.reader", true); >> >> VerifiableProperties vProps = new VerifiableProperties(props); >> >> >> >> DeserializationSchema<String> decoder = new >> >> MyAvroDeserializer(vProps); >> >> env.addSource(new >> >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder, >> >> parameterTool.getProperties())).print(); >> >> >> >> env.execute(); >> >> } >> >> >> >> public class MyAvroDeserializer implements >> >> DeserializationSchema<String> { >> >> private KafkaAvroDecoder decoder; >> >> >> >> public MyAvroDeserializer(VerifiableProperties vProps) { >> >> this.decoder = new KafkaAvroDecoder(vProps); >> >> } >> >> >> >> @Override >> >> public String deserialize(byte[] message) throws IOException { >> >> return (String) this.decoder.fromBytes(message); >> >> } >> >> >> >> @Override >> >> public boolean isEndOfStream(String nextElement) { >> >> return false; >> >> } >> >> >> >> @Override >> >> public TypeInformation<String> getProducedType() { >> >> return null; >> >> } >> >> } >> >> >> >> >> >> Here is the error i am seeing... >> >> >> >> >> >> Exception in thread "main" >> >> org.apache.flink.api.common.InvalidProgramException: Object >> >> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e >> >> not serializable >> >> at >> >> >> >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >> >> at >> >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >> >> at >> >> >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >> >> at >> >> >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) >> >> at >> >> >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) >> >> at >> >> >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) >> >> at test.flink.Streamingkafka.main(Streamingkafka.java:25) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:497) >> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> >> Caused by: java.io.NotSerializableException: >> >> io.confluent.kafka.serializers.KafkaAvroDecoder >> >> at >> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> >> at >> >> >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> >> at >> >> >> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> >> at >> >> >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> >> at >> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> >> at >> >> >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> >> at >> >> >> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> >> at >> >> >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> >> at >> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> >> at >> >> >> >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >> >> at >> >> >> >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >> >> ... 11 more >> >> >> >> >> >> >> >> Am i doing some thing wrong in my code? >> >> >> >> >> >> >> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> >> >> wrote: >> >>> >> >>> Hi Madhukar, >> >>> >> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer, >> >>> you supply a DeserializationSchema in the constructor. You simply >> >>> create a >> >>> class which implements DeserializationSchema and contains the >> >>> KafkaAvroDecoder with the schema registry. >> >>> >> >>> Like so: >> >>> >> >>> public class MyAvroDeserializer implements >> >>> DeserializationSchema<MyType> >> >>> { >> >>> >> >>> private KafkaAvroDecoder decoder; >> >>> >> >>> public MyAvroDeserializer() { >> >>> SchemaRegistryClient schemaRegistry = new >> >>> SchemaRegistryClient(...); >> >>> this.decoder = new KafkaAvroDecoder(schemaRegistry); >> >>> } >> >>> >> >>> public MyType deserialize(byte[] message) throws Exception { >> >>> return (MyType) this.decoder.fromBytes(messages); >> >>> } >> >>> >> >>> public boolean isEndOfStream(MyType nextElement) { >> >>> return false; >> >>> } >> >>> >> >>> } >> >>> >> >>> Then you supply this class when creating the consumer: >> >>> >> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer() >> >>> Properties props = new Properties(); >> >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; >> >>> FetcherType fetcherType = >> >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; >> >>> >> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), >> >>> decoder, >> >>> props, offsetStore, fetcherType); >> >>> >> >>> >> >>> Let me know if that works for you. >> >>> >> >>> Best regards, >> >>> Max >> >>> >> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota >> >>> <[hidden email]> wrote: >> >>>> >> >>>> Hi >> >>>> >> >>>> I am very new to Avro. Currently I am using confluent Kafka version >> >>>> and >> >>>> I am able to write an Avro message to Kafka by storing schema in >> >>>> schema >> >>>> registry. Now I need to consume those messages using Flink Kafka >> >>>> Consumer >> >>>> and having a hard time to deserialize the messages. >> >>>> >> >>>> I am looking for an example on how to deserialize Avro message where >> >>>> schema is stored in schema registry. >> >>>> >> >>>> Any help is appreciated. Thanks in Advance. >> >>>> >> >>>> Thanks, >> >>>> Madhu >> >>>> >> >>> >> >> >> > > > |
Free forum by Nabble | Edit this page |