Getting NPE in collector.collect() call in flatMap

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting NPE in collector.collect() call in flatMap

Ranjit
This post was updated on .
I am getting Null PointerException in collector.collect(Map) method call.

I tried to debug this issue, internally it makes call to collect method in TimestampCollector class.
in that method there is a line:   output.collect(reuse.replace(record))

I tried to display the value of reuse and its like : Record @ (undef): null

After that i am getting NPE with below stack trace:

Caused by: java.lang.RuntimeException
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.mypackage.QuoteRequest39Converter.decode(QuoteRequest39Converter.java:71)
        at com.mypackage.flatmap.myFlatMap.flatMap(myFlatMap.java:142)
        at com.mypackage.flatmap.myFlatMap.flatMap(myFlatMap.java:1)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
        ... 10 more
Caused by: java.lang.NullPointerException
        at org.apache.flink.streaming.runtime.partitioner.HashPartitioner.selectChannels(HashPartitioner.java:52)
        at org.apache.flink.streaming.runtime.partitioner.HashPartitioner.selectChannels(HashPartitioner.java:32)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:79)
        at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
        ... 17 more


I also verified that there is no null field in data. I am using flink-1.1.1
Reply | Threaded
Open this post in threaded view
|

Re: Getting NPE in collector.collect() call in flatMap

Ranjit
Hi,
Got the solution, this is because of operator chaining Apache Flink was doing after flatmap.
After flatmap we have groupBy operator. The field on which we are doing group by is null in data.

NPE was occurring in groupBy but because of chaining it was showing this in flatmap.