valueState.value throwing null pointer exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

valueState.value throwing null pointer exception

Colletta, Edward

Using Flink 1.9.2, Java, FsStateBackend.

 

I was getting com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException on a value() operation on a ValueState variable in a KeyedProcessFunction.

The object stored in state contained 2 PriorityQueue fields and the error message indicated these were the culprits. 

I assumed I did not need the concurrent version (PriorityBlockingQueue) because it was keyed state so only one task could operate on the variable at a time.

And I assumed that checkpointing would not access the variable while I was updating it because the checkpointing would not see what I was doing between the value() and update()

Operations.  Changing to PriorityBlockingQueue fixed the problem.

 

Given that, could it be that Kryo just had an easier time with the PriorityBlockingQueue underlying fields, or should do we always need to use concurrent versions of objects that are stored in state?

 

 

Reply | Threaded
Open this post in threaded view
|

Re: valueState.value throwing null pointer exception

Arvid Heise-3
Hi Edward,

you are right to assume that the non-blocking version is the better fit. You are also correct to assume that kryo just can't handle the underlying fields.

I'd just go a different way to solve it: add your custom serializer for PriorityQueue.

There is one [1] for the upcoming(?) Kryo version that you can just copy and then register in your code [2].


On Fri, Oct 2, 2020 at 11:15 AM Colletta, Edward <[hidden email]> wrote:

Using Flink 1.9.2, Java, FsStateBackend.

 

I was getting com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException on a value() operation on a ValueState variable in a KeyedProcessFunction.

The object stored in state contained 2 PriorityQueue fields and the error message indicated these were the culprits. 

I assumed I did not need the concurrent version (PriorityBlockingQueue) because it was keyed state so only one task could operate on the variable at a time.

And I assumed that checkpointing would not access the variable while I was updating it because the checkpointing would not see what I was doing between the value() and update()

Operations.  Changing to PriorityBlockingQueue fixed the problem.

 

Given that, could it be that Kryo just had an easier time with the PriorityBlockingQueue underlying fields, or should do we always need to use concurrent versions of objects that are stored in state?

 

 



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng