Scala case class TypeInformation and Serializer

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala case class TypeInformation and Serializer

sanmutongzi
Hi, I'm working on write a flink stream job with scala api , how should I find out which class is serialied by flink type serializer and which is falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some  extend the TypeInfo classes of Flink or some other customisations to improve performance.

below is some errors I got when I set disableGenericTypes,so I know if will fall back to Kryo

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type scala.Tuple2 is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
Reply | Threaded
Open this post in threaded view
|

Re: Scala case class TypeInformation and Serializer

Yun Tang

Hi

 

Would you please give related code? I think it might due to insufficient hint to type information.

 

Best

Yun Tang

 

 

 

From: 杨光 <[hidden email]>
Date: Wednesday, December 11, 2019 at 7:20 PM
To: user <[hidden email]>
Subject: Scala case class TypeInformation and Serializer

 

Hi, I'm working on write a flink stream job with scala api , how should I find out which class is serialied by flink type serializer and which is falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some  extend the TypeInfo classes of Flink or some other customisations to improve performance.

 

below is some errors I got when I set disableGenericTypes,so I know if will fall back to Kryo

 

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type scala.Tuple2 is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)

Reply | Threaded
Open this post in threaded view
|

Re: Scala case class TypeInformation and Serializer

sanmutongzi
Actually the original source code have too many third part classes which is hard to simplify , the question I want to ask is there any possible for me to find out which is ser/dser by which Serializer class,then we can tuning or and customer Serializer to improve performance.

Yun Tang <[hidden email]> 于2019年12月12日周四 上午12:45写道:

Hi

 

Would you please give related code? I think it might due to insufficient hint to type information.

 

Best

Yun Tang

 

 

 

From: 杨光 <[hidden email]>
Date: Wednesday, December 11, 2019 at 7:20 PM
To: user <[hidden email]>
Subject: Scala case class TypeInformation and Serializer

 

Hi, I'm working on write a flink stream job with scala api , how should I find out which class is serialied by flink type serializer and which is falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some  extend the TypeInfo classes of Flink or some other customisations to improve performance.

 

below is some errors I got when I set disableGenericTypes,so I know if will fall back to Kryo

 

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type scala.Tuple2 is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)

Reply | Threaded
Open this post in threaded view
|

Re: Scala case class TypeInformation and Serializer

Timo Walther
Hi,

the serializers are created from TypeInformation. So you can simply
inspect the type information. E.g. by using this in the Scala API:

val typeInfo = createTypeInformation[MyClassToAnalyze]

And going through the object using a debugger.

Actually, I don't understand why scala.Tuple2 is treat as a generic
type. The problem might be that an outer class is treated as a Java POJO
thus you are leaving the Scala type analyzer stack and switch to Java
anlayzer stack for fields (currently there is no way back). For
improving the performance, you have 3 options:

1. Make sure every type is a proper Scala type (all case classes, no POJOs).

2. Use the @TypeInfo annotation for specifying a factory. This has
highest precedence in all APIs.

3. Register a Kryo serializer in the execution config. This might be the
easiest way.

I hope this helps.

Regards,
Timo

On 12.12.19 10:38, 杨光 wrote:

> Actually the original source code have too many third part classes which
> is hard to simplify , the question I want to ask is there any
> possible for me to find out which is ser/dser by which Serializer
> class,then we can tuning or and customer Serializer to improve performance.
>
> Yun Tang <[hidden email] <mailto:[hidden email]>> 于2019年12月12日周
> 四 上午12:45写道:
>
>     Hi____
>
>     __ __
>
>     Would you please give related code? I think it might due to
>     insufficient hint to type information.____
>
>     __ __
>
>     Best____
>
>     Yun Tang____
>
>     __ __
>
>     __ __
>
>     __ __
>
>     *From: *杨光<[hidden email] <mailto:[hidden email]>>
>     *Date: *Wednesday, December 11, 2019 at 7:20 PM
>     *To: *user <[hidden email] <mailto:[hidden email]>>
>     *Subject: *Scala case class TypeInformation and Serializer____
>
>     __ __
>
>     Hi, I'm working on write a flink stream job with scala api , how
>     should I find out which class is serialied by flink type serializer
>     and which is falled back to generic Kryo serializer.
>     And if one class falls back to Kryo serializer, how can I make some
>       extend the TypeInfo classes of Flink or some other customisations
>     to improve performance.____
>
>     __ __
>
>     below is some errors I got when I set disableGenericTypes,so I know
>     if will fall back to Kryo____
>
>     __ __
>
>     Exception in thread "main" java.lang.UnsupportedOperationException:
>     Generic types have been disabled in the ExecutionConfig and type
>     scala.Tuple2 is treated as a generic type.
>     at
>     org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>     at
>     org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
>     at
>     org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)____
>

Reply | Threaded
Open this post in threaded view
|

Re: Scala case class TypeInformation and Serializer

sanmutongzi
I did some test by using createTypeInformation[MyClassToAnalyze] , it works fine with some simple case class but will throw some "could not find implicit value" or "constructor _UrlType in class _UrlType cannot be accessed in <$anon: org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer" expedition for our third party class, so maybe it is a bit too complicated for flink typeutil to analyze. I think we should take your third option suggestion to Register a customer serializer .
Thanks a lot😀

Timo Walther <[hidden email]> 于2019年12月12日周四 下午7:52写道:
Hi,

the serializers are created from TypeInformation. So you can simply
inspect the type information. E.g. by using this in the Scala API:

val typeInfo = createTypeInformation[MyClassToAnalyze]

And going through the object using a debugger.

Actually, I don't understand why scala.Tuple2 is treat as a generic
type. The problem might be that an outer class is treated as a Java POJO
thus you are leaving the Scala type analyzer stack and switch to Java
anlayzer stack for fields (currently there is no way back). For
improving the performance, you have 3 options:

1. Make sure every type is a proper Scala type (all case classes, no POJOs).

2. Use the @TypeInfo annotation for specifying a factory. This has
highest precedence in all APIs.

3. Register a Kryo serializer in the execution config. This might be the
easiest way.

I hope this helps.

Regards,
Timo

On 12.12.19 10:38, 杨光 wrote:
> Actually the original source code have too many third part classes which
> is hard to simplify , the question I want to ask is there any
> possible for me to find out which is ser/dser by which Serializer
> class,then we can tuning or and customer Serializer to improve performance.
>
> Yun Tang <[hidden email] <mailto:[hidden email]>> 于2019年12月12日周
> 四 上午12:45写道:
>
>     Hi____
>
>     __ __
>
>     Would you please give related code? I think it might due to
>     insufficient hint to type information.____
>
>     __ __
>
>     Best____
>
>     Yun Tang____
>
>     __ __
>
>     __ __
>
>     __ __
>
>     *From: *杨光<[hidden email] <mailto:[hidden email]>>
>     *Date: *Wednesday, December 11, 2019 at 7:20 PM
>     *To: *user <[hidden email] <mailto:[hidden email]>>
>     *Subject: *Scala case class TypeInformation and Serializer____
>
>     __ __
>
>     Hi, I'm working on write a flink stream job with scala api , how
>     should I find out which class is serialied by flink type serializer
>     and which is falled back to generic Kryo serializer.
>     And if one class falls back to Kryo serializer, how can I make some
>       extend the TypeInfo classes of Flink or some other customisations
>     to improve performance.____
>
>     __ __
>
>     below is some errors I got when I set disableGenericTypes,so I know
>     if will fall back to Kryo____
>
>     __ __
>
>     Exception in thread "main" java.lang.UnsupportedOperationException:
>     Generic types have been disabled in the ExecutionConfig and type
>     scala.Tuple2 is treated as a generic type.
>     at
>     org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>     at
>     org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
>     at
>     org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)____
>