Hi all - sorry this seems like a silly question, but I can't figure it out.
I'm using an AvroInputFormat in order to read an Avro file like this: val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[GenericRecord]) This works fine in local mode, but when submitted to a flink cluster I get serialisation errors that look like this: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$StringSchema Serialization trace: schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) elementType (org.apache.avro.Schema$ArraySchema) schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ... 7 more Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$StringSchema with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Constructor.newInstance(Constructor.java:413) at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123) ... 37 more I realise this is an issue that is mentioned in the documentation, but given that it looks like it is a problem with some class insider the AvroInputFormat that is having trouble being serialised, I'm not sure on what he best solution is. This works fine if I specify the class not to be generic - i.e val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[Example]) However I can't get this to run in local mode with a case class `Example` that is nested, which is required as the Avro files have very nested fields. |
Hi Padarn,
usually people are using the AvroInputFormat with the Avro class generated by an Avro schema. But after looking into the implementation, one should also be able to use the GenericRecord class as a parameter. So your exception seems to be a bug if it works locally but not distributed. What do you mean with "this is an issue that is mentioned in the documentation" where is this issue documented? Regards, Timo Am 14.05.18 um 18:53 schrieb Padarn Wilson:
|
Flink should not interact poorly with
heavily nested schemas. So this might be another bug that is worth
investigating. Can you share an example that reproduces your
issues with us? Which Flink version are you using?
Contributors are always welcome :) I will also take a look into the serialization issue otherwise. Regards, Timo Am 15.05.18 um 17:33 schrieb Padarn Wilson:
|
In reply to this post by Padarn Wilson-2
Hi,
I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat AvroOutputFormat<GenericRecord> tuple2AvroOutputFormat = new AvroOutputFormat<>( new Path("<path>"), GenericRecord.class); testDataSet .map(new GenerateGenericRecord()) .returns(AvroTypeInfo.of(GenericRecord.class)) .output(tuple2AvroOutputFormat); Following is the exception (I have enabled forceAvro config , not sure why it still goes to Kyro Serializer) com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList Serialization trace: types (org.apache.avro.Schema$UnionSchema) schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) Please let me know if there is a fix for this issue as I have not faced this problem for DataStreams. Regards, Vinay Patil -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Changing the classloader config to parent-first solved the issue. Regards, Vinay Patil On Wed, Nov 7, 2018 at 7:25 AM Vinay Patil <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |