Hi, I'm working on write a flink stream job with scala api , how should I find out which class is serialied by flink type serializer and which is falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some extend the TypeInfo classes of Flink or some other customisations to improve performance. below is some errors I got when I set disableGenericTypes,so I know if will fall back to Kryo Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type scala.Tuple2 is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649) |
Hi Would you please give related code? I think it might due to insufficient hint to type information. Best Yun Tang From: 杨光 <[hidden email]> Hi, I'm working on write a flink stream job with scala api , how should I find out which class is serialied by flink type serializer and which is falled back to generic Kryo serializer. below is some errors I got when I set disableGenericTypes,so I know if will fall back to Kryo Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type scala.Tuple2 is treated as a generic type. |
Actually the original source code have too many third part classes which is hard to simplify , the question I want to ask is there any possible for me to find out which is ser/dser by which Serializer class,then we can tuning or and customer Serializer to improve performance. Yun Tang <[hidden email]> 于2019年12月12日周四 上午12:45写道:
|
Hi,
the serializers are created from TypeInformation. So you can simply inspect the type information. E.g. by using this in the Scala API: val typeInfo = createTypeInformation[MyClassToAnalyze] And going through the object using a debugger. Actually, I don't understand why scala.Tuple2 is treat as a generic type. The problem might be that an outer class is treated as a Java POJO thus you are leaving the Scala type analyzer stack and switch to Java anlayzer stack for fields (currently there is no way back). For improving the performance, you have 3 options: 1. Make sure every type is a proper Scala type (all case classes, no POJOs). 2. Use the @TypeInfo annotation for specifying a factory. This has highest precedence in all APIs. 3. Register a Kryo serializer in the execution config. This might be the easiest way. I hope this helps. Regards, Timo On 12.12.19 10:38, 杨光 wrote: > Actually the original source code have too many third part classes which > is hard to simplify , the question I want to ask is there any > possible for me to find out which is ser/dser by which Serializer > class,then we can tuning or and customer Serializer to improve performance. > > Yun Tang <[hidden email] <mailto:[hidden email]>> 于2019年12月12日周 > 四 上午12:45写道: > > Hi____ > > __ __ > > Would you please give related code? I think it might due to > insufficient hint to type information.____ > > __ __ > > Best____ > > Yun Tang____ > > __ __ > > __ __ > > __ __ > > *From: *杨光<[hidden email] <mailto:[hidden email]>> > *Date: *Wednesday, December 11, 2019 at 7:20 PM > *To: *user <[hidden email] <mailto:[hidden email]>> > *Subject: *Scala case class TypeInformation and Serializer____ > > __ __ > > Hi, I'm working on write a flink stream job with scala api , how > should I find out which class is serialied by flink type serializer > and which is falled back to generic Kryo serializer. > And if one class falls back to Kryo serializer, how can I make some > extend the TypeInfo classes of Flink or some other customisations > to improve performance.____ > > __ __ > > below is some errors I got when I set disableGenericTypes,so I know > if will fall back to Kryo____ > > __ __ > > Exception in thread "main" java.lang.UnsupportedOperationException: > Generic types have been disabled in the ExecutionConfig and type > scala.Tuple2 is treated as a generic type. > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)____ > |
I did some test by using createTypeInformation[MyClassToAnalyze] , it works fine with some simple case class but will throw some "could not find implicit value" or "constructor _UrlType in class _UrlType cannot be accessed in <$anon: org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer" expedition for our third party class, so maybe it is a bit too complicated for flink typeutil to analyze. I think we should take your third option suggestion to Register a customer serializer . Thanks a lot😀 Timo Walther <[hidden email]> 于2019年12月12日周四 下午7:52写道: Hi, |
Free forum by Nabble | Edit this page |