Is the provided Serializer/TypeInformation checked "too late"?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Is the provided Serializer/TypeInformation checked "too late"?

Niels Basjes
Hi,

Context:
I'm looking into making the Google (BigQuery compatible) HyperLogLog++
implementation available in Flink because it is simply an Apache
licensed opensource library
- https://issuetracker.google.com/issues/123269269
- https://issues.apache.org/jira/browse/BEAM-7013
- https://github.com/google/zetasketch

While doing this I noticed that even though I provided an explicit
Kryo Serializer for the core class

i.e. I did senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
HLLSerializer.class);

I still see messages like this when registering a new
UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
class as either input of output:

13:59:57,316 [INFO ] TypeExtractor                           : 1815:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
getter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor                           : 1818:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
setter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor                           : 1857:
Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.

So it is complaining about the serialization performance when done in
a different way than was configured.

Then I noticed that I see similar messages in other situations too.

In this code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165

I see
13:59:58,478 [INFO ] TypeExtractor                           : 1815:
class org.apache.flink.types.Row does not contain a getter for field
fields
13:59:58,478 [INFO ] TypeExtractor                           : 1818:
class org.apache.flink.types.Row does not contain a setter for field
fields
13:59:58,479 [INFO ] TypeExtractor                           : 1857:
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

even though a full TypeInformation instance for that type was provided

TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP,
STRING, STRING, STRING, STRING, LONG);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);

I checked with my debugger and the code IS using for both mentioned
examples the correct serialization classes when running.

So what is happening here?
Did I forget to do a required call?
So is this a bug?
Is the provided serialization via TypeInformation 'skipped' during
startup and only used during runtime?

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Is the provided Serializer/TypeInformation checked "too late"?

Timo Walther
Hi Niels,

the type handling evolved during the years and is a bit messed up
through the different layers. You are almost right with your last
assumption "Is the provided serialization via TypeInformation 'skipped'
during startup and only used during runtime?". The type extraction
returns a Kryo type and the Kryo type is using the configured default
serializers during runtime. Therefore, the log entry is just an INFO but
not a WARNING. And you did everything correct.

Btw there is also the possiblity to insert a custom type into the type
extration by using Type Factories [0].

Maybe as a side comment: We are aware of these confusions and the Table
& SQL API will hopefully not use the TypeExtractor anymore in 1.10. This
is what I am working on at the moment.

Regards,
Timo

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory

Am 08.07.19 um 14:17 schrieb Niels Basjes:

> Hi,
>
> Context:
> I'm looking into making the Google (BigQuery compatible) HyperLogLog++
> implementation available in Flink because it is simply an Apache
> licensed opensource library
> - https://issuetracker.google.com/issues/123269269
> - https://issues.apache.org/jira/browse/BEAM-7013
> - https://github.com/google/zetasketch
>
> While doing this I noticed that even though I provided an explicit
> Kryo Serializer for the core class
>
> i.e. I did senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
> HLLSerializer.class);
>
> I still see messages like this when registering a new
> UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
> class as either input of output:
>
> 13:59:57,316 [INFO ] TypeExtractor                           : 1815:
> class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
> getter for field allowedTypes
> 13:59:57,317 [INFO ] TypeExtractor                           : 1818:
> class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
> setter for field allowedTypes
> 13:59:57,317 [INFO ] TypeExtractor                           : 1857:
> Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
> as a POJO type because not all fields are valid POJO fields, and must
> be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
>
> So it is complaining about the serialization performance when done in
> a different way than was configured.
>
> Then I noticed that I see similar messages in other situations too.
>
> In this code
> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165
>
> I see
> 13:59:58,478 [INFO ] TypeExtractor                           : 1815:
> class org.apache.flink.types.Row does not contain a getter for field
> fields
> 13:59:58,478 [INFO ] TypeExtractor                           : 1818:
> class org.apache.flink.types.Row does not contain a setter for field
> fields
> 13:59:58,479 [INFO ] TypeExtractor                           : 1857:
> Class class org.apache.flink.types.Row cannot be used as a POJO type
> because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
>
> even though a full TypeInformation instance for that type was provided
>
> TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP,
> STRING, STRING, STRING, STRING, LONG);
> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
>
> I checked with my debugger and the code IS using for both mentioned
> examples the correct serialization classes when running.
>
> So what is happening here?
> Did I forget to do a required call?
> So is this a bug?
> Is the provided serialization via TypeInformation 'skipped' during
> startup and only used during runtime?
>

Reply | Threaded
Open this post in threaded view
|

Re: Is the provided Serializer/TypeInformation checked "too late"?

Niels Basjes-2
Hi Timo,

Thanks for the clarification. 
It reassuring to hear that my code does the right thing.
I'll just ignore these messages for now.

Niels


On Mon, 8 Jul 2019, 15:09 Timo Walther, <[hidden email]> wrote:
Hi Niels,

the type handling evolved during the years and is a bit messed up
through the different layers. You are almost right with your last
assumption "Is the provided serialization via TypeInformation 'skipped'
during startup and only used during runtime?". The type extraction
returns a Kryo type and the Kryo type is using the configured default
serializers during runtime. Therefore, the log entry is just an INFO but
not a WARNING. And you did everything correct.

Btw there is also the possiblity to insert a custom type into the type
extration by using Type Factories [0].

Maybe as a side comment: We are aware of these confusions and the Table
& SQL API will hopefully not use the TypeExtractor anymore in 1.10. This
is what I am working on at the moment.

Regards,
Timo

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory

Am 08.07.19 um 14:17 schrieb Niels Basjes:
> Hi,
>
> Context:
> I'm looking into making the Google (BigQuery compatible) HyperLogLog++
> implementation available in Flink because it is simply an Apache
> licensed opensource library
> - https://issuetracker.google.com/issues/123269269
> - https://issues.apache.org/jira/browse/BEAM-7013
> - https://github.com/google/zetasketch
>
> While doing this I noticed that even though I provided an explicit
> Kryo Serializer for the core class
>
> i.e. I did senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
> HLLSerializer.class);
>
> I still see messages like this when registering a new
> UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
> class as either input of output:
>
> 13:59:57,316 [INFO ] TypeExtractor                           : 1815:
> class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
> getter for field allowedTypes
> 13:59:57,317 [INFO ] TypeExtractor                           : 1818:
> class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
> setter for field allowedTypes
> 13:59:57,317 [INFO ] TypeExtractor                           : 1857:
> Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
> as a POJO type because not all fields are valid POJO fields, and must
> be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on performance.
>
> So it is complaining about the serialization performance when done in
> a different way than was configured.
>
> Then I noticed that I see similar messages in other situations too.
>
> In this code
> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165
>
> I see
> 13:59:58,478 [INFO ] TypeExtractor                           : 1815:
> class org.apache.flink.types.Row does not contain a getter for field
> fields
> 13:59:58,478 [INFO ] TypeExtractor                           : 1818:
> class org.apache.flink.types.Row does not contain a setter for field
> fields
> 13:59:58,479 [INFO ] TypeExtractor                           : 1857:
> Class class org.apache.flink.types.Row cannot be used as a POJO type
> because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
>
> even though a full TypeInformation instance for that type was provided
>
> TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP,
> STRING, STRING, STRING, STRING, LONG);
> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
>
> I checked with my debugger and the code IS using for both mentioned
> examples the correct serialization classes when running.
>
> So what is happening here?
> Did I forget to do a required call?
> So is this a bug?
> Is the provided serialization via TypeInformation 'skipped' during
> startup and only used during runtime?
>