Kafka avro recordschema serializing / deserializing

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka avro recordschema serializing / deserializing

turif
Hi,

I tried to read avro (RecordSchema) data from Kafka using the flink-kafka connector but I have problems:

Exception says at  program startup:

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

I know RecordSchema is not serializable so It's ok but how to add serializer for RecordSchema?

My Flink initialization:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new MyDeserializer())).print();

The deserializer:

public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
    private static final long serialVersionUID = -8314881700393464119L;
    private static final EncoderFactory avroEncoderFactory = EncoderFactory.get();   
    private Schema _schema;

    public MyDeserializer(){
        System.out.println("Creating MyDeserializer");
        Schema.Parser parser = new Schema.Parser();
        try {
            InputStream is = getClass().getResourceAsStream("/avro_schema.json");
            if (is != null){
                _schema = parser.parse(is);
            }else{
                System.out.println("Unable to load schema file!");
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
   
    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

    public String deserialize(byte[] message) {
        String data = null;
        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(message, null);
            GenericRecord result = reader.read(null, decoder);
            AvroKafkaData ad = new AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
            System.out.println("Read kafka data: " + data);
            data = ad.toString();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    public byte[] serialize(String element) {
        System.out.println("Serializing element = " + element);
        byte[] data = null;
        try {
            GenericDatumWriter writer = new GenericDatumWriter(_schema);           
           
            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            DatumReader<GenericRecord> reader=new GenericDatumReader<GenericRecord>(_schema);           
            Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, element);
           
            GenericRecord r=reader.read(null,decoder);
           
            BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);
           
            writer.write(r, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeStream(stream);
           
            data = stream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

}

Unfortunately as I see only the constructor of MySerializer is called.

Can somebody could suggest something?

Thanks,

Ferenc



--
Kind Regards,

Ferenc


Reply | Threaded
Open this post in threaded view
|

Re: Kafka avro recordschema serializing / deserializing

rmetzger0
Hi,
yes, the Avro Schema is not serializable.

Can you make the "_schema" field "transient" and then lazily initialize the field when serialize()/deserialize() is called?
That way, you initialize the schema on the cluster, so there is no need to transfer it over the network.


I think Flink's own serialization stack should also be able to handle Avro types with Kafka. I'm trying to get the required tooling into Flink 0.10-SNAPSHOT.

Let me know if you need more help.


Best,
Robert




On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <[hidden email]> wrote:
Hi,

I tried to read avro (RecordSchema) data from Kafka using the flink-kafka connector but I have problems:

Exception says at  program startup:

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

I know RecordSchema is not serializable so It's ok but how to add serializer for RecordSchema?

My Flink initialization:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new MyDeserializer())).print();

The deserializer:

public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
    private static final long serialVersionUID = -8314881700393464119L;
    private static final EncoderFactory avroEncoderFactory = EncoderFactory.get();   
    private Schema _schema;

    public MyDeserializer(){
        System.out.println("Creating MyDeserializer");
        Schema.Parser parser = new Schema.Parser();
        try {
            InputStream is = getClass().getResourceAsStream("/avro_schema.json");
            if (is != null){
                _schema = parser.parse(is);
            }else{
                System.out.println("Unable to load schema file!");
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
   
    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

    public String deserialize(byte[] message) {
        String data = null;
        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(message, null);
            GenericRecord result = reader.read(null, decoder);
            AvroKafkaData ad = new AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
            System.out.println("Read kafka data: " + data);
            data = ad.toString();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    public byte[] serialize(String element) {
        System.out.println("Serializing element = " + element);
        byte[] data = null;
        try {
            GenericDatumWriter writer = new GenericDatumWriter(_schema);           
           
            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            DatumReader<GenericRecord> reader=new GenericDatumReader<GenericRecord>(_schema);           
            Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, element);
           
            GenericRecord r=reader.read(null,decoder);
           
            BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);
           
            writer.write(r, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeStream(stream);
           
            data = stream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

}

Unfortunately as I see only the constructor of MySerializer is called.

Can somebody could suggest something?

Thanks,

Ferenc



--
Kind Regards,

Ferenc



Reply | Threaded
Open this post in threaded view
|

Re: Kafka avro recordschema serializing / deserializing

Stephan Ewen
The functions/sources have an open() method that is exactly intended for this type of initialization (constructing the Avro Schema).

You can try and subclass the kafka source and override the open() method to initialize the schema there. Make sure you call super.open().


Greetings,
Stephan


On Sat, Aug 29, 2015 at 11:58 AM, Robert Metzger <[hidden email]> wrote:
Hi,
yes, the Avro Schema is not serializable.

Can you make the "_schema" field "transient" and then lazily initialize the field when serialize()/deserialize() is called?
That way, you initialize the schema on the cluster, so there is no need to transfer it over the network.


I think Flink's own serialization stack should also be able to handle Avro types with Kafka. I'm trying to get the required tooling into Flink 0.10-SNAPSHOT.

Let me know if you need more help.


Best,
Robert




On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <[hidden email]> wrote:
Hi,

I tried to read avro (RecordSchema) data from Kafka using the flink-kafka connector but I have problems:

Exception says at  program startup:

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

I know RecordSchema is not serializable so It's ok but how to add serializer for RecordSchema?

My Flink initialization:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new MyDeserializer())).print();

The deserializer:

public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
    private static final long serialVersionUID = -8314881700393464119L;
    private static final EncoderFactory avroEncoderFactory = EncoderFactory.get();   
    private Schema _schema;

    public MyDeserializer(){
        System.out.println("Creating MyDeserializer");
        Schema.Parser parser = new Schema.Parser();
        try {
            InputStream is = getClass().getResourceAsStream("/avro_schema.json");
            if (is != null){
                _schema = parser.parse(is);
            }else{
                System.out.println("Unable to load schema file!");
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
   
    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

    public String deserialize(byte[] message) {
        String data = null;
        try {
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(message, null);
            GenericRecord result = reader.read(null, decoder);
            AvroKafkaData ad = new AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
            System.out.println("Read kafka data: " + data);
            data = ad.toString();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    public byte[] serialize(String element) {
        System.out.println("Serializing element = " + element);
        byte[] data = null;
        try {
            GenericDatumWriter writer = new GenericDatumWriter(_schema);           
           
            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            DatumReader<GenericRecord> reader=new GenericDatumReader<GenericRecord>(_schema);           
            Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, element);
           
            GenericRecord r=reader.read(null,decoder);
           
            BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);
           
            writer.write(r, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeStream(stream);
           
            data = stream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

}

Unfortunately as I see only the constructor of MySerializer is called.

Can somebody could suggest something?

Thanks,

Ferenc



--
Kind Regards,

Ferenc