NPE when aggregate window.

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

NPE when aggregate window.

Si-li Liu
Hi, 

I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more

My aggregate code is

public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {

@Override
public Map<DataKey, DataIndex> createAccumulator() {
return new HashMap<>();
}

@Override
public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) {
accumulator.merge(value.f0, value.f1, DataIndex::add);
return accumulator;
}

@Override
public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) {
return accumulator;
}

@Override
public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) {
a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add));
return b;
}
}
Could anyone know something about this NPE, thanks!
--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Dawid Wysakowicz-2

Hi,

Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
Hi, 

I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more
My aggregate code is
public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {

    @Override
    public Map<DataKey, DataIndex> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) {
        accumulator.merge(value.f0, value.f1, DataIndex::add);
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) {
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) {
        a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add));
        return b;
    }
}
Could anyone know something about this NPE, thanks!
--
Best regards

Sili Liu

OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Si-li Liu
Hi,Dawid,

Thanks for your help. I use com.google.common.base.Objects.hashCode, pass all fields to it and generate a hashcode, and the equal method also compare all the fields.

Dawid Wysakowicz <[hidden email]> 于2021年4月13日周二 下午8:10写道:

Hi,

Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
Hi, 

I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more
My aggregate code is
public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {

    @Override
    public Map<DataKey, DataIndex> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) {
        accumulator.merge(value.f0, value.f1, DataIndex::add);
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) {
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) {
        a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add));
        return b;
    }
}
Could anyone know something about this NPE, thanks!
--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Arvid Heise-4
To second Dawids question: are all fields final or is it possible that their values are changing?

On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <[hidden email]> wrote:
Hi,Dawid,

Thanks for your help. I use com.google.common.base.Objects.hashCode, pass all fields to it and generate a hashcode, and the equal method also compare all the fields.

Dawid Wysakowicz <[hidden email]> 于2021年4月13日周二 下午8:10写道:

Hi,

Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
Hi, 

I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more
My aggregate code is
public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {

    @Override
    public Map<DataKey, DataIndex> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) {
        accumulator.merge(value.f0, value.f1, DataIndex::add);
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) {
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) {
        a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add));
        return b;
    }
}
Could anyone know something about this NPE, thanks!
--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Si-li Liu
Thanks for your help.

After I replaced com.google.common.base.Objects.hashCode with toString().hashCode(), the NPE problem is solved. 

Arvid Heise <[hidden email]> 于2021年4月13日周二 下午11:40写道:
To second Dawids question: are all fields final or is it possible that their values are changing?

On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <[hidden email]> wrote:
Hi,Dawid,

Thanks for your help. I use com.google.common.base.Objects.hashCode, pass all fields to it and generate a hashcode, and the equal method also compare all the fields.

Dawid Wysakowicz <[hidden email]> 于2021年4月13日周二 下午8:10写道:

Hi,

Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
Hi, 

I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more
My aggregate code is
public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {

    @Override
    public Map<DataKey, DataIndex> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) {
        accumulator.merge(value.f0, value.f1, DataIndex::add);
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) {
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) {
        a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add));
        return b;
    }
}
Could anyone know something about this NPE, thanks!
--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
tuk
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

tuk
Can some provide a bit more explanation why replacing
/com.google.common.base.Objects.hashCode with toString().hashCode(),/ with
/toString().hashCode()/ making it work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Arvid Heise-4
Hi,

I'm assuming it's just a workaround for changing fields. The string representation happens to be stable while the underlying values change.

It's best practice to use completely immutable types if you have similar issues, you should double-check that nothing can be changed in your data type or you make defensive copies before changing.

On Wed, May 5, 2021 at 4:02 PM tuk <[hidden email]> wrote:
Can some provide a bit more explanation why replacing
/com.google.common.base.Objects.hashCode with toString().hashCode(),/ with
/toString().hashCode()/ making it work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

HaochengWang
In reply to this post by Dawid Wysakowicz-2
Hi,
I meet the same exception, and find your suggestion here. I'm confused about
the word 'grouping key', is that refers to the key of the accumulating hash
map, or the key that separate the stream by some information?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: NPE when aggregate window.

Si-li Liu
The key used in the keyBy function.

HaochengWang <[hidden email]> 于2021年6月12日周六 下午11:29写道:
Hi,
I meet the same exception, and find your suggestion here. I'm confused about
the word 'grouping key', is that refers to the key of the accumulating hash
map, or the key that separate the stream by some information?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards

Sili Liu