Hi folks,
I have two questions about types in Flink when using Scala:
1. scala case class:
This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: Double) In documentation, Scala case class is supported:
`Scala case classes (including Scala tuples): null fields not supported`
But the log info shows:
10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class io.github.streamingwithflink.util.SensorReading is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2. scala list
This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: Double, var list: List[String] = List[String]()) log shows:
No fields were detected for class scala.collection.immutable.List so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Does it means that scala list can be serialize that I can ignore this info if I don’t care the performance.
Should I use Java array list instead of scala list or create a custom serializer for SensorReading case class?
Thanks!
Best regards
Utopia
|
Hi,
Reg. 1: Scala case classes are supported in the Scala specific version of the DataStream API. If you are using case classes in the Java API you will get the INFO below because the Java API uses pure reflection extraction for analyzing POJOs. The Scala API tries to analyze Scala classes first, if this is not possible it will fallback to Java reflection extraction. So in your case the INFO should not be present because it is a pure Scala case class. Is it used within a non-case class? Reg 2: Most classes can be serialized by Flink. That's why the log lines are just of type INFO because they might affect the performance slightly. If you are performance sensitive. I would recommend primtive types, arrays and case classes. Regards, Timo On 15.01.20 03:47, Utopia wrote: > Hi folks, > > I have two questions about types in Flink when using Scala: > > *1. scala case class: * > > This my case class define: > > case class SensorReading(var id: String , var timestamp: Long, var > temperature: Double) > > > In documentation, Scala case class is supported: > `Scala /case classes/(including Scala tuples): null fields not supported` > > But the log info shows: > 10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - > class io.github.streamingwithflink.util.SensorReading is missing a > default constructor so it cannot be used as a POJO type and must be > processed as GenericType. Please read the Flink documentation on "Data > Types & Serialization" for details of the effect on performance. > > > *2. scala list* > * > * > This my case class define: > > case class SensorReading(var id:String ,var timestamp: Long,var temperature: Double,var list:List[String] =List[String]()) > > log shows: > No fields were detected for class scala.collection.immutable.List so it > cannot be used as a POJO type and must be processed as GenericType. > Please read the Flink documentation on "Data Types & Serialization" for > details of the effect on performance. > > Does it means that scala list can be serialize that I can ignore this > info if I don’t care the performance. > Should I use Java array list instead of scala list or create a custom > serializer for SensorReading case class? > > Thanks! > > > Best regards > Utopia |
Hi Timo,
This is my case class: /case class Box[T](meta: Metadata, value: T) { def function1: A=>B = {...} def method2(...):A = {...} }/ However, I still get that warning "/Class class data.package$Box cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance./" I imported /org.apache.flink.streaming.api.scala._/ << is this enough to tell that I am using Scala API? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Utopia
Hi Utopia,
Have u fixed this problem? I also meet this problem, so I transferred the case class to Java POJO, then this problem was fixed. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Xavier,
the Scala API has special implicits in method such as `DataStream.map()` or `DataStream.keyBy()` to support Scala specifics like case classe. For Scala one needs to use the macro `createTypeInformation[CaseClass]` for Java we use reflection via `TypeInformation.of()`. But Scala and Java analysis is completely different. So you cannot use a case class in Java API. Scala will fall back to Java though. I hope this helps. Regards, Timo On 05.02.21 10:54, Xavier wrote: > Hi Utopia, > Have u fixed this problem? I also meet this problem, so I transferred the > case class to Java POJO, then this problem was fixed. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Hi Timo, Thank you for ur clarification, it is very useful to me, I am also combining the realization of map function, trying to do implicit conversion of case class, so that I can restore state from FS. On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <[hidden email]> wrote: Hi Xavier, Xavier |
Dealing with types is not always easy in Flink. If you have further
issues, it might make sense to just pass them explicitly. We list all types in: org.apache.flink.api.common.typeinfo.Types org.apache.flink.api.scala.typeutils.Types Regards, Timo On 05.02.21 16:04, Xavier wrote: > Hi Timo, > Thank you for ur clarification, it is very useful to me, I am also > combining the realization of map function, trying to do implicit > conversion of case class, so that I can restore state from FS. > > On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Xavier, > > the Scala API has special implicits in method such as > `DataStream.map()` > or `DataStream.keyBy()` to support Scala specifics like case classe. > For > Scala one needs to use the macro `createTypeInformation[CaseClass]` for > Java we use reflection via `TypeInformation.of()`. But Scala and Java > analysis is completely different. So you cannot use a case class in > Java > API. Scala will fall back to Java though. > > I hope this helps. > > Regards, > Timo > > > On 05.02.21 10:54, Xavier wrote: > > Hi Utopia, > > Have u fixed this problem? I also meet this problem, so I > transferred the > > case class to Java POJO, then this problem was fixed. > > > > > > > > -- > > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> > > > > > > -- > > Best Regards, > *Xavier* |
Thanks for ur suggestions! On Fri, Feb 5, 2021 at 11:16 PM Timo Walther <[hidden email]> wrote: Dealing with types is not always easy in Flink. If you have further Xavier |
Free forum by Nabble | Edit this page |