Dear Flink Users, I have two environments, one old, one new. I'm trying to migrate my flink job. The exact same application code that runs in one environment, fails in the second with a very confusing NPE (listed below). I can garuntee the application code only calls ValueState.get() inside of a keyed context (and works perfectly in my other environment). I get the same error when I try with rocksDB other than the stack trace changes just a little bit. I jumped into the source code, but I sort of lose it when InternalKeyedContext is passed via the constructor (I'm not sure where to jump to from there). Other than the obvious (keyed context) why else might I get this error? (application code follows exception) java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:30) at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:12) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Application code--- override fun open(config: Configuration) { val descriptor = ValueStateDescriptor <HashMap<Long, ArrayList<Long>>>(dataName, TypeInformation.of(object : TypeHint<HashMap<Long, ArrayList<Long>>>() {})) descriptor.setQueryable(dataName) coverageData = runtimeContext.getState(descriptor) } override fun flatMap(value: CoverageWithPartitionInfo, out: Collector<String>) { var currentCoverage = coverageData.value() --- this is the line that blows up on one environment but not others ^^ --- |
Free forum by Nabble | Edit this page |