TypeInformation in Custom Deserializer

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

TypeInformation in Custom Deserializer

AndreaKinn
Hi,
I'm trying to implement a custom deserialiser to deserialise data from a kafka sink.
So I'm implementing a KeyedDeserializedSchema<Tuple6<String, String, Date, String, String, Double>>
which ask me to override the method:

@Override
public TypeInformation<Tuple6<String, String, Date, String, String, Double>> getProducedType() {
        //to do
}

Honestly I investigated in link but I didn't understand what is this method and how to implement it.
Reply | Threaded
Open this post in threaded view
|

Re: TypeInformation in Custom Deserializer

Ted Yu
From ResultTypeQueryable :

   * Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
   *
   * @return The data type produced by this function or input format.
   */
  TypeInformation<T> getProducedType();

You can look at classes which implement this method as examples.

flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java


On Sun, Aug 13, 2017 at 7:31 AM, AndreaKinn <[hidden email]> wrote:
Hi,
I'm trying to implement a custom deserialiser to deserialise data from a
kafka sink.
So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
Date, String, String, Double>>
which ask me to override the method:

@Override
public TypeInformation<Tuple6&lt;String, String, Date, String, String,
Double>> getProducedType() {
        //to do
}

Honestly I investigated in  link
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html>
but I didn't understand what is this method and how to implement it.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: TypeInformation in Custom Deserializer

AndreaKinn
But I'm using Java primitive type like String, Double plus Date types. Flink doesn't know how to handle them?
Reply | Threaded
Open this post in threaded view
|

Re: TypeInformation in Custom Deserializer

Ted Yu
Please take a look at the following in flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java

  public TypeInformation<Tuple2<K, V>> getProducedType() {
    return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));

FYI

On Sun, Aug 13, 2017 at 8:23 AM, AndreaKinn <[hidden email]> wrote:
But I'm using Java primitive type like String, Double plus Date types. Flink
doesn't know how to handle them?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861p14863.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: TypeInformation in Custom Deserializer

Greg Hogan
In reply to this post by AndreaKinn
You should be able to implement this using a TypeHint (see the Creating a TypeInformation or TypeSerializer section from the linked page):

        return TypeInformation.of(new TypeHint<Tuple6<String, String, Date, String, String, Double>>(){});


> On Aug 13, 2017, at 10:31 AM, AndreaKinn <[hidden email]> wrote:
>
> Hi,
> I'm trying to implement a custom deserialiser to deserialise data from a
> kafka sink.
> So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
> Date, String, String, Double>>
> which ask me to override the method:
>
> @Override
> public TypeInformation<Tuple6&lt;String, String, Date, String, String,
> Double>> getProducedType() {
> //to do
> }
>
> Honestly I investigated in  link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html>  
> but I didn't understand what is this method and how to implement it.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: TypeInformation in Custom Deserializer

AndreaKinn
Thank you, I solved in this way

Greg Hogan wrote
You should be able to implement this using a TypeHint (see the Creating a TypeInformation or TypeSerializer section from the linked page):

        return TypeInformation.of(new TypeHint<Tuple6<String, String, Date, String, String, Double>>(){});


> On Aug 13, 2017, at 10:31 AM, AndreaKinn <[hidden email]> wrote:
>
> Hi,
> I'm trying to implement a custom deserialiser to deserialise data from a
> kafka sink.
> So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
> Date, String, String, Double>>
> which ask me to override the method:
>
> @Override
> public TypeInformation<Tuple6&lt;String, String, Date, String, String,
> Double>> getProducedType() {
> //to do
> }
>
> Honestly I investigated in  link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html> 
> but I didn't understand what is this method and how to implement it.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.