Can't call getProducedType on Avro messages with array types

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't call getProducedType on Avro messages with array types

Kyle Hamlin
Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.
Reply | Threaded
Open this post in threaded view
|

Re: Can't call getProducedType on Avro messages with array types

Aljoscha Krettek
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha

On 3. Jan 2018, at 21:34, Kyle Hamlin <[hidden email]> wrote:

Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[<a href="akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259" class="">akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.

Reply | Threaded
Open this post in threaded view
|

Re: Can't call getProducedType on Avro messages with array types

Kyle Hamlin
I can add that dependency. So I would replace

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

with something like:

override def getProducedType: TypeInformation[T] = {
    new AvroTypeInfo(classOf[T])
}

On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha


On 3. Jan 2018, at 21:34, Kyle Hamlin <[hidden email]> wrote:

Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.

Reply | Threaded
Open this post in threaded view
|

Re: Can't call getProducedType on Avro messages with array types

Aljoscha Krettek
Yes, that should do the trick.

On 5. Jan 2018, at 18:37, Kyle Hamlin <[hidden email]> wrote:

I can add that dependency. So I would replace

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

with something like:

override def getProducedType: TypeInformation[T] = {
    new AvroTypeInfo(classOf[T])
}

On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha


On 3. Jan 2018, at 21:34, Kyle Hamlin <[hidden email]> wrote:

Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.


Reply | Threaded
Open this post in threaded view
|

Re: Can't call getProducedType on Avro messages with array types

Kyle Hamlin
So I just added the dependency but didn't change the getProducedType method and it worked fine. Would you expect that to be the case?

On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <[hidden email]> wrote:
Yes, that should do the trick.


On 5. Jan 2018, at 18:37, Kyle Hamlin <[hidden email]> wrote:

I can add that dependency. So I would replace

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

with something like:

override def getProducedType: TypeInformation[T] = {
    new AvroTypeInfo(classOf[T])
}

On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha


On 3. Jan 2018, at 21:34, Kyle Hamlin <[hidden email]> wrote:

Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.


Reply | Threaded
Open this post in threaded view
|

Re: Can't call getProducedType on Avro messages with array types

Aljoscha Krettek
Yes, there is some magic in the KryoSerializer and other serialisers that detect whether the flink-avro dependency is there and then use special TypeSerializers from there.

(Specifically, this is AvroUtils which has a default implementation that doesn't do much and a special implementation called AvroKryoSerializerUtils that is available in flink-avro and which is dynamically loaded.)

On 5. Jan 2018, at 18:53, Kyle Hamlin <[hidden email]> wrote:

So I just added the dependency but didn't change the getProducedType method and it worked fine. Would you expect that to be the case?

On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <[hidden email]> wrote:
Yes, that should do the trick.


On 5. Jan 2018, at 18:37, Kyle Hamlin <[hidden email]> wrote:

I can add that dependency. So I would replace

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

with something like:

override def getProducedType: TypeInformation[T] = {
    new AvroTypeInfo(classOf[T])
}

On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha


On 3. Jan 2018, at 21:34, Kyle Hamlin <[hidden email]> wrote:

Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type?

Avro Schema:
{
    "type": "record",
    "name": "Recieved",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "time", "type": "long"},
        {"name": "urls", "type": {"type": "array", "items": "string"}},
    ]
}

Deserializer:
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

@transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
true)
deserializer
}

// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
// other schema-registry configuration parameters can be passed, see the configure() code
// for details (among other things, schema cache size)
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
deserializer
}

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
}

override def isEndOfStream(nextElement: T): Boolean = false

override def getProducedType: TypeInformation[T] = {
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}

}
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 20 more

01/03/2018 15:19:59 Job execution switched to status FAILING.