Question about Scala Case Class and List in Flink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Scala Case Class and List in Flink

Utopia
Hi folks,

I have two questions about types in Flink when using Scala:

1. scala case class: 

This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: Double)

In documentation, Scala case class is supported:
`Scala case classes (including Scala tuples): null fields not supported`

But the log info shows:
10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class io.github.streamingwithflink.util.SensorReading is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.


2. scala list

This my case class define:

case class SensorReading(var id: String , var timestamp: Long, var temperature: Double, var list: List[String] = List[String]())
log shows:
No fields were detected for class scala.collection.immutable.List so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Does it means that scala list can be serialize that I can ignore this info if I don’t care the performance.
Should I use Java array list instead of scala list or create a custom serializer for SensorReading case class? 

Thanks!


Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Timo Walther
Hi,

Reg. 1:

Scala case classes are supported in the Scala specific version of the
DataStream API. If you are using case classes in the Java API you will
get the INFO below because the Java API uses pure reflection extraction
for analyzing POJOs.

The Scala API tries to analyze Scala classes first, if this is not
possible it will fallback to Java reflection extraction. So in your case
the INFO should not be present because it is a pure Scala case class. Is
it used within a non-case class?

Reg 2:

Most classes can be serialized by Flink. That's why the log lines are
just of type INFO because they might affect the performance slightly. If
you are performance sensitive. I would recommend primtive types, arrays
and case classes.

Regards,
Timo


On 15.01.20 03:47, Utopia wrote:

> Hi folks,
>
> I have two questions about types in Flink when using Scala:
>
> *1. scala case class: *
>
> This my case class define:
>
> case class SensorReading(var id: String , var timestamp: Long, var
> temperature: Double)
>
>
> In documentation, Scala case class is supported:
> `Scala /case classes/(including Scala tuples): null fields not supported`
>
> But the log info shows:
> 10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor -
> class io.github.streamingwithflink.util.SensorReading is missing a
> default constructor so it cannot be used as a POJO type and must be
> processed as GenericType. Please read the Flink documentation on "Data
> Types & Serialization" for details of the effect on performance.
>
>
> *2. scala list*
> *
> *
> This my case class define:
>
> case class SensorReading(var id:String ,var timestamp: Long,var temperature: Double,var list:List[String] =List[String]())
>
> log shows:
> No fields were detected for class scala.collection.immutable.List so it
> cannot be used as a POJO type and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization" for
> details of the effect on performance.
>
> Does it means that scala list can be serialize that I can ignore this
> info if I don’t care the performance.
> Should I use Java array list instead of scala list or create a custom
> serializer for SensorReading case class?
>
> Thanks!
>
>
> Best  regards
> Utopia

Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Averell
Hi Timo,

This is my case class:
/case class Box[T](meta: Metadata, value: T) {
      def function1: A=>B = {...}
      def method2(...):A = {...}
}/

However, I still get that warning "/Class class data.package$Box cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance./"

I imported /org.apache.flink.streaming.api.scala._/ << is this enough to
tell that I am using Scala API?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Xavier
In reply to this post by Utopia
Hi Utopia,
   Have u fixed this problem? I also meet this problem, so I transferred the
case class to Java POJO, then this problem was fixed.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Timo Walther
Hi Xavier,

the Scala API has special implicits in method such as `DataStream.map()`
or `DataStream.keyBy()` to support Scala specifics like case classe. For
Scala one needs to use the macro `createTypeInformation[CaseClass]` for
Java we use reflection via `TypeInformation.of()`. But Scala and Java
analysis is completely different. So you cannot use a case class in Java
API. Scala will fall back to Java though.

I hope this helps.

Regards,
Timo


On 05.02.21 10:54, Xavier wrote:
> Hi Utopia,
>     Have u fixed this problem? I also meet this problem, so I transferred the
> case class to Java POJO, then this problem was fixed.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Xavier
Hi Timo,
    Thank you for ur clarification, it is very useful to me, I am also combining the realization of map function, trying to do implicit conversion of case class, so that I can restore state from FS.

On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <[hidden email]> wrote:
Hi Xavier,

the Scala API has special implicits in method such as `DataStream.map()`
or `DataStream.keyBy()` to support Scala specifics like case classe. For
Scala one needs to use the macro `createTypeInformation[CaseClass]` for
Java we use reflection via `TypeInformation.of()`. But Scala and Java
analysis is completely different. So you cannot use a case class in Java
API. Scala will fall back to Java though.

I hope this helps.

Regards,
Timo


On 05.02.21 10:54, Xavier wrote:
> Hi Utopia,
>     Have u fixed this problem? I also meet this problem, so I transferred the
> case class to Java POJO, then this problem was fixed.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



--

Best Regards,
Xavier
Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Timo Walther
Dealing with types is not always easy in Flink. If you have further
issues, it might make sense to just pass them explicitly. We list all
types in:

org.apache.flink.api.common.typeinfo.Types

org.apache.flink.api.scala.typeutils.Types

Regards,
Timo

On 05.02.21 16:04, Xavier wrote:

> Hi Timo,
>      Thank you for ur clarification, it is very useful to me, I am also
> combining the realization of map function, trying to do implicit
> conversion of case class, so that I can restore state from FS.
>
> On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Xavier,
>
>     the Scala API has special implicits in method such as
>     `DataStream.map()`
>     or `DataStream.keyBy()` to support Scala specifics like case classe.
>     For
>     Scala one needs to use the macro `createTypeInformation[CaseClass]` for
>     Java we use reflection via `TypeInformation.of()`. But Scala and Java
>     analysis is completely different. So you cannot use a case class in
>     Java
>     API. Scala will fall back to Java though.
>
>     I hope this helps.
>
>     Regards,
>     Timo
>
>
>     On 05.02.21 10:54, Xavier wrote:
>      > Hi Utopia,
>      >     Have u fixed this problem? I also meet this problem, so I
>     transferred the
>      > case class to Java POJO, then this problem was fixed.
>      >
>      >
>      >
>      > --
>      > Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>      >
>
>
>
> --
>
> Best Regards,
> *Xavier*

Reply | Threaded
Open this post in threaded view
|

Re: Question about Scala Case Class and List in Flink

Xavier
Thanks for ur suggestions!

On Fri, Feb 5, 2021 at 11:16 PM Timo Walther <[hidden email]> wrote:
Dealing with types is not always easy in Flink. If you have further
issues, it might make sense to just pass them explicitly. We list all
types in:

org.apache.flink.api.common.typeinfo.Types

org.apache.flink.api.scala.typeutils.Types

Regards,
Timo

On 05.02.21 16:04, Xavier wrote:
> Hi Timo,
>      Thank you for ur clarification, it is very useful to me, I am also
> combining the realization of map function, trying to do implicit
> conversion of case class, so that I can restore state from FS.
>
> On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Xavier,
>
>     the Scala API has special implicits in method such as
>     `DataStream.map()`
>     or `DataStream.keyBy()` to support Scala specifics like case classe.
>     For
>     Scala one needs to use the macro `createTypeInformation[CaseClass]` for
>     Java we use reflection via `TypeInformation.of()`. But Scala and Java
>     analysis is completely different. So you cannot use a case class in
>     Java
>     API. Scala will fall back to Java though.
>
>     I hope this helps.
>
>     Regards,
>     Timo
>
>
>     On 05.02.21 10:54, Xavier wrote:
>      > Hi Utopia,
>      >     Have u fixed this problem? I also meet this problem, so I
>     transferred the
>      > case class to Java POJO, then this problem was fixed.
>      >
>      >
>      >
>      > --
>      > Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>      >
>
>
>
> --
>
> Best Regards,
> *Xavier*



--

Best Regards,
Xavier