Hi MaxThanks for the example.Based on your example here is what i did:public class Streamingkafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.setParallelism(1);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties props = new Properties();
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", true);
VerifiableProperties vProps = new VerifiableProperties(props);
DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps);
env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
parameterTool.getProperties())).print();
env.execute();
}public class MyAvroDeserializer implements DeserializationSchema<String> {
private KafkaAvroDecoder decoder;
public MyAvroDeserializer(VerifiableProperties vProps) {
this.decoder = new KafkaAvroDecoder(vProps);
}
@Override
public String deserialize(byte[] message) throws IOException {
return (String) this.decoder.fromBytes(message);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return null;
}
}Here is the error i am seeing...Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) at test.flink.Streamingkafka.main(Streamingkafka.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDecoder at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) ... 11 more
Am i doing some thing wrong in my code?On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <[hidden email]> wrote:Hi Madhukar,Let me know if that works for you.
Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.
Like so:
public class MyAvroDeserializer implements DeserializationSchema<MyType> {
private KafkaAvroDecoder decoder;
public MyAvroDeserializer() {
SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...);
this.decoder = new KafkaAvroDecoder(schemaRegistry);
}
public MyType deserialize(byte[] message) throws Exception {
return (MyType) this.decoder.fromBytes(messages);
}
public boolean isEndOfStream(MyType nextElement) {
return false;
}
}
Then you supply this class when creating the consumer:
DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType);
Best regards,
MaxOn Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <[hidden email]> wrote:HiI am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages.I am looking for an example on how to deserialize Avro message where schema is stored in schema registry.Any help is appreciated. Thanks in Advance.Thanks,Madhu
Free forum by Nabble | Edit this page |