Using Flink 1.9.2, Java, FsStateBackend. I was getting com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException on a value()
operation on a ValueState variable in a KeyedProcessFunction. The object stored in state contained 2 PriorityQueue fields and the error message indicated these were the culprits.
I assumed I did not need the concurrent version (PriorityBlockingQueue) because it was keyed state so only one task could operate on the variable at a time. And I assumed that checkpointing would not access the variable while I was updating it because the checkpointing would not see what I was doing between the value() and update() Operations. Changing to PriorityBlockingQueue fixed the problem. Given that, could it be that Kryo just had an easier time with the PriorityBlockingQueue underlying fields, or should do we always need to use concurrent versions of objects that are stored
in state? |
Hi Edward, you are right to assume that the non-blocking version is the better fit. You are also correct to assume that kryo just can't handle the underlying fields. I'd just go a different way to solve it: add your custom serializer for PriorityQueue. There is one [1] for the upcoming(?) Kryo version that you can just copy and then register in your code [2]. On Fri, Oct 2, 2020 at 11:15 AM Colletta, Edward <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |