Hello everyone,
When I run the code below. The log print:
> class scala.collection.mutable.ListBuffer does not contain a setter for field scala$collection$mutable$ListBuffer$$start
> Class class scala.collection.mutable.ListBuffer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType.
Code:
private lazy val schoolDescriptor = new ListStateDescriptor[School]("schoolDescriptor", classOf[School])
context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new School))
Class define:
class School {
var classes: ListBuffer[Class] = ListBuffer()
}
class Class {
var students: ListBuffer[Class] = ListBuffer()
}
class Student {
var name = “”
}
What should I do if POJO has ListBuffer type field, and the element of ListBuffer also has ListBuffer type field?
Best regards
Utopia
|
Hi Utopia,
There were already couple of hints in comments to your stack overflow questions about immutability. In general, I would also recommend this because when you work with Flink state the general API contract is that if you update the your state object (schoolDescriptor) you have to call state#update with it. This might work for heap state without calling update (not always guaranteed by API) but will not work e.g. for RocksDB state backend. The serialisation is also much easier if you use pure POJOs [1]. In your case non-POJO, the general approach is to implement your custom org.apache.flink.api.common.typeutils.TypeSerializer or register a custom serialiser [2] to use another state descriptor constructor: ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) or refactor your classes to support out of the box serialisation [3]. Best, Andrey
|
Free forum by Nabble | Edit this page |