Hi,
Context: I'm looking into making the Google (BigQuery compatible) HyperLogLog++ implementation available in Flink because it is simply an Apache licensed opensource library - https://issuetracker.google.com/issues/123269269 - https://issues.apache.org/jira/browse/BEAM-7013 - https://github.com/google/zetasketch While doing this I noticed that even though I provided an explicit Kryo Serializer for the core class i.e. I did senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class, HLLSerializer.class); I still see messages like this when registering a new UserDefinedFunction (AggregateFunction / ScalarFunction) that has this class as either input of output: 13:59:57,316 [INFO ] TypeExtractor : 1815: class com.google.zetasketch.HyperLogLogPlusPlus does not contain a getter for field allowedTypes 13:59:57,317 [INFO ] TypeExtractor : 1818: class com.google.zetasketch.HyperLogLogPlusPlus does not contain a setter for field allowedTypes 13:59:57,317 [INFO ] TypeExtractor : 1857: Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. So it is complaining about the serialization performance when done in a different way than was configured. Then I noticed that I see similar messages in other situations too. In this code https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165 I see 13:59:58,478 [INFO ] TypeExtractor : 1815: class org.apache.flink.types.Row does not contain a getter for field fields 13:59:58,478 [INFO ] TypeExtractor : 1818: class org.apache.flink.types.Row does not contain a setter for field fields 13:59:58,479 [INFO ] TypeExtractor : 1857: Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. even though a full TypeInformation instance for that type was provided TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP, STRING, STRING, STRING, STRING, LONG); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType); I checked with my debugger and the code IS using for both mentioned examples the correct serialization classes when running. So what is happening here? Did I forget to do a required call? So is this a bug? Is the provided serialization via TypeInformation 'skipped' during startup and only used during runtime? -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi Niels,
the type handling evolved during the years and is a bit messed up through the different layers. You are almost right with your last assumption "Is the provided serialization via TypeInformation 'skipped' during startup and only used during runtime?". The type extraction returns a Kryo type and the Kryo type is using the configured default serializers during runtime. Therefore, the log entry is just an INFO but not a WARNING. And you did everything correct. Btw there is also the possiblity to insert a custom type into the type extration by using Type Factories [0]. Maybe as a side comment: We are aware of these confusions and the Table & SQL API will hopefully not use the TypeExtractor anymore in 1.10. This is what I am working on at the moment. Regards, Timo [0] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory Am 08.07.19 um 14:17 schrieb Niels Basjes: > Hi, > > Context: > I'm looking into making the Google (BigQuery compatible) HyperLogLog++ > implementation available in Flink because it is simply an Apache > licensed opensource library > - https://issuetracker.google.com/issues/123269269 > - https://issues.apache.org/jira/browse/BEAM-7013 > - https://github.com/google/zetasketch > > While doing this I noticed that even though I provided an explicit > Kryo Serializer for the core class > > i.e. I did senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class, > HLLSerializer.class); > > I still see messages like this when registering a new > UserDefinedFunction (AggregateFunction / ScalarFunction) that has this > class as either input of output: > > 13:59:57,316 [INFO ] TypeExtractor : 1815: > class com.google.zetasketch.HyperLogLogPlusPlus does not contain a > getter for field allowedTypes > 13:59:57,317 [INFO ] TypeExtractor : 1818: > class com.google.zetasketch.HyperLogLogPlusPlus does not contain a > setter for field allowedTypes > 13:59:57,317 [INFO ] TypeExtractor : 1857: > Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used > as a POJO type because not all fields are valid POJO fields, and must > be processed as GenericType. Please read the Flink documentation on > "Data Types & Serialization" for details of the effect on performance. > > So it is complaining about the serialization performance when done in > a different way than was configured. > > Then I noticed that I see similar messages in other situations too. > > In this code > https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165 > > I see > 13:59:58,478 [INFO ] TypeExtractor : 1815: > class org.apache.flink.types.Row does not contain a getter for field > fields > 13:59:58,478 [INFO ] TypeExtractor : 1818: > class org.apache.flink.types.Row does not contain a setter for field > fields > 13:59:58,479 [INFO ] TypeExtractor : 1857: > Class class org.apache.flink.types.Row cannot be used as a POJO type > because not all fields are valid POJO fields, and must be processed as > GenericType. Please read the Flink documentation on "Data Types & > Serialization" for details of the effect on performance. > > even though a full TypeInformation instance for that type was provided > > TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP, > STRING, STRING, STRING, STRING, LONG); > DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType); > > I checked with my debugger and the code IS using for both mentioned > examples the correct serialization classes when running. > > So what is happening here? > Did I forget to do a required call? > So is this a bug? > Is the provided serialization via TypeInformation 'skipped' during > startup and only used during runtime? > |
Hi Timo,
Thanks for the clarification. It reassuring to hear that my code does the right thing. I'll just ignore these messages for now. Niels On Mon, 8 Jul 2019, 15:09 Timo Walther, <[hidden email]> wrote: Hi Niels, |
Free forum by Nabble | Edit this page |