Hi, It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type? Avro Schema: { "type": "record", "name": "Recieved", "fields": [ {"name": "id", "type": "int"}, {"name": "time", "type": "long"}, {"name": "urls", "type": {"type": "array", "items": "string"}}, ] } Deserializer: import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer} Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8082 Starting execution of program Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000. 01/03/2018 15:19:57 Job execution switched to status RUNNING. 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING 01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 20 more 01/03/2018 15:19:59 Job execution switched to status FAILING. |
Hi,
I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you? Best, Aljoscha
|
I can add that dependency. So I would replace
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) } with something like: override def getProducedType: TypeInformation[T] = { new AvroTypeInfo(classOf[T]) } On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <[hidden email]> wrote:
|
Yes, that should do the trick.
|
So I just added the dependency but didn't change the getProducedType method and it worked fine. Would you expect that to be the case? On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <[hidden email]> wrote:
|
Yes, there is some magic in the KryoSerializer and other serialisers that detect whether the flink-avro dependency is there and then use special TypeSerializers from there.
(Specifically, this is AvroUtils which has a default implementation that doesn't do much and a special implementation called AvroKryoSerializerUtils that is available in flink-avro and which is dynamically loaded.)
|
Free forum by Nabble | Edit this page |