flink 1.4.2 NPE after job startup when using managed ValueState

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink 1.4.2 NPE after job startup when using managed ValueState

Diomedes Tydeus
Dear Flink Users,
I have two environments, one old, one new.  I'm trying to migrate my flink job.  The exact same application code that runs in one environment, fails in the second with a very confusing NPE (listed below).  I can garuntee the application code only calls ValueState.get() inside of a keyed context (and works perfectly in my other environment).  I get the same error when I try with rocksDB other than the stack trace changes just a little bit.  I jumped into the source code, but I sort of lose it when InternalKeyedContext is passed via the constructor (I'm not sure where to jump to from there).  

Other than the obvious (keyed context) why else might I get this error?  (application code follows exception)

java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
    at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:30)
    at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:12)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)


Application code---

    override fun open(config: Configuration) {
        val descriptor = ValueStateDescriptor <HashMap<Long, ArrayList<Long>>>(dataName, TypeInformation.of(object : TypeHint<HashMap<Long, ArrayList<Long>>>() {}))
        descriptor.setQueryable(dataName)
        coverageData = runtimeContext.getState(descriptor)
    }

    override fun flatMap(value: CoverageWithPartitionInfo, out: Collector<String>) {

        var currentCoverage = coverageData.value()
---  this is the line that blows up on one environment but not others ^^ ---