pojo warning when using auto generated protobuf class

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pojo warning when using auto generated protobuf class

Prashant Deva
I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema<Trace.APITrace>, SerializationSchema<Trace.APITrace> {

    override fun getProducedType(): TypeInformation<Trace.APITrace> {
        return TypeInformation.of(Trace.APITrace::class.java)
    }

    override fun deserialize(message: ByteArray): Trace.APITrace {
        return Trace.APITrace.parseFrom(message)
    }

    override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
        return false
    }

    override fun serialize(element: Trace.APITrace): ByteArray {
        return element.toByteArray()
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: pojo warning when using auto generated protobuf class

Yun Gao
Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun


------------------Original Mail ------------------
Sender:Prashant Deva <[hidden email]>
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User <[hidden email]>
Subject:pojo warning when using auto generated protobuf class
I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema<Trace.APITrace>, SerializationSchema<Trace.APITrace> {

    override fun getProducedType(): TypeInformation<Trace.APITrace> {
        return TypeInformation.of(Trace.APITrace::class.java)
    }

    override fun deserialize(message: ByteArray): Trace.APITrace {
        return Trace.APITrace.parseFrom(message)
    }

    override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
        return false
    }

    override fun serialize(element: Trace.APITrace): ByteArray {
        return element.toByteArray()
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: pojo warning when using auto generated protobuf class

Prashant Deva
so i did  register the type with Kryo and the ProtobufSerializer. However I am still continuing to see the warnings. is this a bug in Flink?

env.config.registerTypeWithKryoSerializer(Trace.APITrace::class.java, ProtobufSerializer::class.java)

 val stream: DataStreamSource<Trace.APITrace> = env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props))

Sent via Superhuman


On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao <[hidden email]> wrote:
Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun


------------------Original Mail ------------------
Sender:Prashant Deva <[hidden email]>
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User <[hidden email]>
Subject:pojo warning when using auto generated protobuf class
I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema<Trace.APITrace>, SerializationSchema<Trace.APITrace> {

    override fun getProducedType(): TypeInformation<Trace.APITrace> {
        return TypeInformation.of(Trace.APITrace::class.java)
    }

    override fun deserialize(message: ByteArray): Trace.APITrace {
        return Trace.APITrace.parseFrom(message)
    }

    override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
        return false
    }

    override fun serialize(element: Trace.APITrace): ByteArray {
        return element.toByteArray()
    }
}

Reply | Threaded
Open this post in threaded view
|

Re: Re: pojo warning when using auto generated protobuf class

Yun Gao
Hi Prashant,

Flink should always give warnings as long as the deduced result 
is GenericType, no matter it uses the default kryo serializer or
the register one, thus if you have registered the type, you may 
simply ignore the warnings. To make sure it works, you may 
find the tm that the source tasks resides, and use jmap to 
see if ProtoSerializer is created or not.

Best,
Yun



------------------Original Mail ------------------
Sender:Prashant Deva <[hidden email]>
Send Date:Sun Apr 25 01:18:41 2021
Recipients:Yun Gao <[hidden email]>
CC:User <[hidden email]>
Subject:Re: pojo warning when using auto generated protobuf class
so i did  register the type with Kryo and the ProtobufSerializer. However I am still continuing to see the warnings. is this a bug in Flink?

env.config.registerTypeWithKryoSerializer(Trace.APITrace::class.java, ProtobufSerializer::class.java)

 val stream: DataStreamSource<Trace.APITrace> = env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props))

Sent via Superhuman


On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao <[hidden email]> wrote:
Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun


------------------Original Mail ------------------
Sender:Prashant Deva <[hidden email]>
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User <[hidden email]>
Subject:pojo warning when using auto generated protobuf class
I am seeing this warning msg when trying to use a custom protobuf de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.xx.APITrace cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema<Trace.APITrace>, SerializationSchema<Trace.APITrace> {

    override fun getProducedType(): TypeInformation<Trace.APITrace> {
        return TypeInformation.of(Trace.APITrace::class.java)
    }

    override fun deserialize(message: ByteArray): Trace.APITrace {
        return Trace.APITrace.parseFrom(message)
    }

    override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
        return false
    }

    override fun serialize(element: Trace.APITrace): ByteArray {
        return element.toByteArray()
    }
}