The constructor of Java classes after deserialization is not necessarily called. Thus, you should move the check
if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}
into the deserialize
method of MyAvroDeserializer
.
Cheers,
Till
Hi Max/Ewen,Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions.public class MyAvroDeserializer implements DeserializationSchema<String> {
private transient KafkaAvroDecoder decoder;
public MyAvroDeserializer(VerifiableProperties vProps) {
if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}
}
@Override
public String deserialize(byte[] message) throws IOException {
return (String) decoder.fromBytes(message);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeExtractor.getForClass(String.class);
}
}11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(1/4) switched to FAILED java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23) at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445) 11/19/2015 07:47:39 Job execution switched to status FAILING. 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELING 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELING 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELING 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(2/4) switched to CANCELED 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(4/4) switched to CANCELED 11/19/2015 07:47:39 Source: Custom Source -> Sink: Unnamed(3/4) switched to CANCELED 11/19/2015 07:47:39 Job execution switched to status FAILED. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23) at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <[hidden email]> wrote:Stephan is right, this should do it in deserialize():
if (decoder == null) {
decoder = new KafkaAvroDecoder(vProps);
}
Further, you might have to specify the correct return type for
getProducedType(). You may use
public TypeInformation<String> getProducedType() {
return TypeExtractor.getForClass(String.class);
}
Cheers,
Max
On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <[hidden email]> wrote:
> The KafkaAvroDecoder is not serializable, and Flink uses serialization to
> distribute the code to the TaskManagers in the cluster.
>
> I think you need to "lazily" initialize the decoder, in the first invocation
> of "deserialize()". That should do it.
>
> Stephan
>
>
> On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <[hidden email]>
> wrote:
>>
>> Hi Max
>>
>> Thanks for the example.
>>
>> Based on your example here is what i did:
>>
>> public class Streamingkafka {
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(500);
>> env.setParallelism(1);
>>
>> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>> Properties props = new Properties();
>> props.put("schema.registry.url", "http://localhost:8081");
>> props.put("specific.avro.reader", true);
>> VerifiableProperties vProps = new VerifiableProperties(props);
>>
>> DeserializationSchema<String> decoder = new
>> MyAvroDeserializer(vProps);
>> env.addSource(new
>> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>> parameterTool.getProperties())).print();
>>
>> env.execute();
>> }
>>
>> public class MyAvroDeserializer implements DeserializationSchema<String> {
>> private KafkaAvroDecoder decoder;
>>
>> public MyAvroDeserializer(VerifiableProperties vProps) {
>> this.decoder = new KafkaAvroDecoder(vProps);
>> }
>>
>> @Override
>> public String deserialize(byte[] message) throws IOException {
>> return (String) this.decoder.fromBytes(message);
>> }
>>
>> @Override
>> public boolean isEndOfStream(String nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation<String> getProducedType() {
>> return null;
>> }
>> }
>>
>>
>> Here is the error i am seeing...
>>
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: Object
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> not serializable
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> io.confluent.kafka.serializers.KafkaAvroDecoder
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> ... 11 more
>>
>>
>>
>> Am i doing some thing wrong in my code?
>>
>>
>>
>> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]>
>> wrote:
>>>
>>> Hi Madhukar,
>>>
>>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>>> you supply a DeserializationSchema in the constructor. You simply create a
>>> class which implements DeserializationSchema and contains the
>>> KafkaAvroDecoder with the schema registry.
>>>
>>> Like so:
>>>
>>> public class MyAvroDeserializer implements DeserializationSchema<MyType>
>>> {
>>>
>>> private KafkaAvroDecoder decoder;
>>>
>>> public MyAvroDeserializer() {
>>> SchemaRegistryClient schemaRegistry = new
>>> SchemaRegistryClient(...);
>>> this.decoder = new KafkaAvroDecoder(schemaRegistry);
>>> }
>>>
>>> public MyType deserialize(byte[] message) throws Exception {
>>> return (MyType) this.decoder.fromBytes(messages);
>>> }
>>>
>>> public boolean isEndOfStream(MyType nextElement) {
>>> return false;
>>> }
>>>
>>> }
>>>
>>> Then you supply this class when creating the consumer:
>>>
>>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>>> Properties props = new Properties();
>>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>>> FetcherType fetcherType =
>>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
>>> props, offsetStore, fetcherType);
>>>
>>>
>>> Let me know if that works for you.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>>> <[hidden email]> wrote:
>>>>
>>>> Hi
>>>>
>>>> I am very new to Avro. Currently I am using confluent Kafka version and
>>>> I am able to write an Avro message to Kafka by storing schema in schema
>>>> registry. Now I need to consume those messages using Flink Kafka Consumer
>>>> and having a hard time to deserialize the messages.
>>>>
>>>> I am looking for an example on how to deserialize Avro message where
>>>> schema is stored in schema registry.
>>>>
>>>> Any help is appreciated. Thanks in Advance.
>>>>
>>>> Thanks,
>>>> Madhu
>>>>
>>>
>>
>
Free forum by Nabble | Edit this page |