Hi,
We are currently running a Flink Job that has 3 operators. Source ---> Filter ---> Sink As soon as the job is started it tries to recover from the latest Checkpoint [05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f. Then under operators initialization, specifically Source operator which reads from Kafka topics using a regex pattern, the job starts to fail with the following exception: [05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126) at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123) at java.util.TreeMap.put(Unknown Source) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) I was wondering if anyone has seen this before? My assumption will be We are currently running with the following settings:
Cheers, Laura |
Hi Laura
From the exception stack, there exist two possible reasons causing this NPE. Either the
KafkaTopicPartition is null or field
topic of that
KafkaTopicPartition form the union state is null. No matter what reason, the problem might existed in the
KryoSerializer which used to de/serialize the
KafkaTopicPartition class. Gordon (in CC) who is expert at serialization might offer more insights.
Before further discussion, would you please offer more information:
Best
Yun Tang
From: Laura Uzcátegui <[hidden email]>
Sent: Wednesday, March 6, 2019 21:35 To: user Subject: Job continuously failing after Checkpoint Restore Hi,
We are currently running a Flink Job that has 3 operators.
Source ---> Filter ---> Sink
As soon as the job is started it tries to recover from the latest Checkpoint
[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f.
Then under operators initialization, specifically Source operator which reads from Kafka topics using a regex pattern, the job starts to fail with the following exception:
[05-Mar-2019
13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)
at java.util.TreeMap.put(Unknown Source)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
I was wondering if anyone has seen this before?
My assumption will be
We are currently running with the following settings:
Cheers,
Laura
|
Free forum by Nabble | Edit this page |