Scala ListBuffer cannot be used as a POJO type in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala ListBuffer cannot be used as a POJO type in Flink

Utopia
Hello everyone,

When I run the code below. The log print:

> class scala.collection.mutable.ListBuffer does not contain a setter for field scala$collection$mutable$ListBuffer$$start

> Class class scala.collection.mutable.ListBuffer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType.  
 
Code:
    
    private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])
    
    
    context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))

Class define:

    class School {
       var classes: ListBuffer[Class] = ListBuffer()
    }
    
    class Class {
       var students: ListBuffer[Class] = ListBuffer()
    }
    
    class Student {
       var name = “”
    }

What should I do if POJO has ListBuffer type field, and the element of ListBuffer also has ListBuffer type field?



Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Scala ListBuffer cannot be used as a POJO type in Flink

Andrey Zagrebin-4
Hi Utopia,

There were already couple of hints in comments to your stack overflow questions about immutability.
In general, I would also recommend this because when you work with Flink state the general API contract is
that if you update the your state object (schoolDescriptor) you have to call state#update with it.
This might work for heap state without calling update (not always guaranteed by API) but will not work e.g. for RocksDB state backend.
The serialisation is also much easier if you use pure POJOs [1].

In your case non-POJO, the general approach is to implement your custom org.apache.flink.api.common.typeutils.TypeSerializer or register a custom serialiser [2]
to use another state descriptor constructor:  ListStateDescriptor(String name, TypeSerializer<T> typeSerializer)
or refactor your classes to support out of the box serialisation [3].

Best,
Andrey


On 16 Dec 2019, at 08:42, Utopia <[hidden email]> wrote:

Hello everyone,

When I run the code below. The log print:

> class scala.collection.mutable.ListBuffer does not contain a setter for field scala$collection$mutable$ListBuffer$$start

> Class class scala.collection.mutable.ListBuffer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType.  
 
Code:
    
    private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])
    
    
    context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))

Class define:

    class School {
       var classes: ListBuffer[Class] = ListBuffer()
    }
    
    class Class {
       var students: ListBuffer[Class] = ListBuffer()
    }
    
    class Student {
       var name = “”
    }

What should I do if POJO has ListBuffer type field, and the element of ListBuffer also has ListBuffer type field?



Best  regards
Utopia