Hi,
I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1. The NPE exception stack is: 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED. at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ... 13 more My aggregate code is public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> { Could anyone know something about this NPE, thanks! -- Best regards Sili Liu |
Hi, Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager. Best, Dawid On 13/04/2021 08:47, Si-li Liu wrote:
OpenPGP_signature (855 bytes) Download Attachment |
Hi,Dawid, Thanks for your help. I use com.google.common.base.Objects.hashCode, pass all fields to it and generate a hashcode, and the equal method also compare all the fields. Dawid Wysakowicz <[hidden email]> 于2021年4月13日周二 下午8:10写道:
Best regards
Sili Liu |
To second Dawids question: are all fields final or is it possible that their values are changing? On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <[hidden email]> wrote:
|
Thanks for your help. After I replaced com.google.common.base.Objects.hashCode with toString().hashCode(), the NPE problem is solved. Arvid Heise <[hidden email]> 于2021年4月13日周二 下午11:40写道:
Best regards
Sili Liu |
Can some provide a bit more explanation why replacing
/com.google.common.base.Objects.hashCode with toString().hashCode(),/ with /toString().hashCode()/ making it work? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, I'm assuming it's just a workaround for changing fields. The string representation happens to be stable while the underlying values change. It's best practice to use completely immutable types if you have similar issues, you should double-check that nothing can be changed in your data type or you make defensive copies before changing. On Wed, May 5, 2021 at 4:02 PM tuk <[hidden email]> wrote: Can some provide a bit more explanation why replacing |
In reply to this post by Dawid Wysakowicz-2
Hi,
I meet the same exception, and find your suggestion here. I'm confused about the word 'grouping key', is that refers to the key of the accumulating hash map, or the key that separate the stream by some information? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The key used in the keyBy function. HaochengWang <[hidden email]> 于2021年6月12日周六 下午11:29写道: Hi, Best regards
Sili Liu |
Free forum by Nabble | Edit this page |