Not able to force Avro serialization

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Not able to force Avro serialization

cslotterback

Hey guys,

 

I have been trying to get avro deserialization to work, but I’ve run into the issue where flink (1.10) is trying to serialize the avro classes with kryo:

 

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Serialization trace:

reserved (org.apache.avro.Schema$Field)

fieldMap (org.apache.avro.Schema$RecordSchema)

schema (org.apache.avro.generic.GenericData$Record)

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]

                […]

Caused by: java.lang.UnsupportedOperationException

                at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_265]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]

                ... 29 more

 

I am setting forced avro serialization on the execution env (running in local cluster right now):

if(localCluster){
  env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);

}else{

  env = StreamExecutionEnvironment.getExecutionEnvironment();

}

env.getConfig().disableForceKryo();

env.getConfig().enableForceAvro();

 

and here is where I have the avro schema defined in my AvroSerializationSchema class:

 

private Schema schema;

public AvroDeserializationSchema(String schema) {

  this.schema = new Schema.Parser().parse(schema);

}

 

I have the flink-avro dependency added to the pom.

 

Any ideas why kryo is still trying to serialize the avro GenericData class?

 

Thanks

Chris

Reply | Threaded
Open this post in threaded view
|

Re: Not able to force Avro serialization

cslotterback

And here is the deserde block where the Schema is used to generate a GenericRecord:

 

@Override

public Map<String, Object> deserialize(byte[] bytes) throws IOException {

  DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

  GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));

  Map<String, Object> map = new HashMap<>();

  record.getSchema().getFields().forEach(field -> map.put(field.name(), record.get(field.name())));

  return map;

}

 

Chris

 

From: "Slotterback, Chris" <[hidden email]>
Date: Sunday, August 23, 2020 at 2:07 AM
To: user <[hidden email]>
Subject: Not able to force Avro serialization

 

Hey guys,

 

I have been trying to get avro deserialization to work, but I’ve run into the issue where flink (1.10) is trying to serialize the avro classes with kryo:

 

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Serialization trace:

reserved (org.apache.avro.Schema$Field)

fieldMap (org.apache.avro.Schema$RecordSchema)

schema (org.apache.avro.generic.GenericData$Record)

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]

                […]

Caused by: java.lang.UnsupportedOperationException

                at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_265]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]

                ... 29 more

 

I am setting forced avro serialization on the execution env (running in local cluster right now):

if(localCluster){
  env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);

}else{

  env = StreamExecutionEnvironment.getExecutionEnvironment();

}

env.getConfig().disableForceKryo();

env.getConfig().enableForceAvro();

 

and here is where I have the avro schema defined in my AvroSerializationSchema class:

 

private Schema schema;

public AvroDeserializationSchema(String schema) {

  this.schema = new Schema.Parser().parse(schema);

}

 

I have the flink-avro dependency added to the pom.

 

Any ideas why kryo is still trying to serialize the avro GenericData class?

 

Thanks

Chris

Reply | Threaded
Open this post in threaded view
|

Re: Not able to force Avro serialization

cslotterback

I figured it out for those interested; I actually had an embedded report in my avro schema, so my loop was incorrectly building a single dimension map with a GenericRecord value, which was throwing off the map’s serialization. After recursing the embedded GenericRecords to build the fully realized multi-d map, kryo stopped choking.

 

Chris

 

From: "Slotterback, Chris" <[hidden email]>
Date: Sunday, August 23, 2020 at 2:17 AM
To: user <[hidden email]>
Subject: Re: Not able to force Avro serialization

 

And here is the deserde block where the Schema is used to generate a GenericRecord:

 

@Override

public Map<String, Object> deserialize(byte[] bytes) throws IOException {

  DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

  GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));

  Map<String, Object> map = new HashMap<>();

  record.getSchema().getFields().forEach(field -> map.put(field.name(), record.get(field.name())));

  return map;

}

 

Chris

 

From: "Slotterback, Chris" <[hidden email]>
Date: Sunday, August 23, 2020 at 2:07 AM
To: user <[hidden email]>
Subject: Not able to force Avro serialization

 

Hey guys,

 

I have been trying to get avro deserialization to work, but I’ve run into the issue where flink (1.10) is trying to serialize the avro classes with kryo:

 

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Serialization trace:

reserved (org.apache.avro.Schema$Field)

fieldMap (org.apache.avro.Schema$RecordSchema)

schema (org.apache.avro.generic.GenericData$Record)

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]

                […]

Caused by: java.lang.UnsupportedOperationException

                at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_265]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]

                at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]

                ... 29 more

 

I am setting forced avro serialization on the execution env (running in local cluster right now):

if(localCluster){
  env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);

}else{

  env = StreamExecutionEnvironment.getExecutionEnvironment();

}

env.getConfig().disableForceKryo();

env.getConfig().enableForceAvro();

 

and here is where I have the avro schema defined in my AvroSerializationSchema class:

 

private Schema schema;

public AvroDeserializationSchema(String schema) {

  this.schema = new Schema.Parser().parse(schema);

}

 

I have the flink-avro dependency added to the pom.

 

Any ideas why kryo is still trying to serialize the avro GenericData class?

 

Thanks

Chris