Hi,
I'm trying to implement a custom deserialiser to deserialise data from a kafka sink. So I'm implementing a KeyedDeserializedSchema<Tuple6<String, String, Date, String, String, Double>> which ask me to override the method: @Override public TypeInformation<Tuple6<String, String, Date, String, String, Double>> getProducedType() { //to do } Honestly I investigated in link but I didn't understand what is this method and how to implement it. |
From ResultTypeQueryable : * Gets the data type (as a {@link TypeInformation}) produced by this function or input format. * * @return The data type produced by this function or input format. */ TypeInformation<T> getProducedType(); You can look at classes which implement this method as examples. flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java On Sun, Aug 13, 2017 at 7:31 AM, AndreaKinn <[hidden email]> wrote: Hi, |
But I'm using Java primitive type like String, Double plus Date types. Flink doesn't know how to handle them?
|
Please take a look at the following in flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java public TypeInformation<Tuple2<K, V>> getProducedType() { return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); FYI On Sun, Aug 13, 2017 at 8:23 AM, AndreaKinn <[hidden email]> wrote: But I'm using Java primitive type like String, Double plus Date types. Flink |
In reply to this post by AndreaKinn
You should be able to implement this using a TypeHint (see the Creating a TypeInformation or TypeSerializer section from the linked page):
return TypeInformation.of(new TypeHint<Tuple6<String, String, Date, String, String, Double>>(){}); > On Aug 13, 2017, at 10:31 AM, AndreaKinn <[hidden email]> wrote: > > Hi, > I'm trying to implement a custom deserialiser to deserialise data from a > kafka sink. > So I'm implementing a KeyedDeserializedSchema<Tuple6<String, String, > Date, String, String, Double>> > which ask me to override the method: > > @Override > public TypeInformation<Tuple6<String, String, Date, String, String, > Double>> getProducedType() { > //to do > } > > Honestly I investigated in link > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html> > but I didn't understand what is this method and how to implement it. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Thank you, I solved in this way
|
Free forum by Nabble | Edit this page |