Job continuously failing after Checkpoint Restore

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Job continuously failing after Checkpoint Restore

Laura Uzcátegui
Hi, 

We are currently running a Flink Job that has 3 operators. 

Source ---> Filter ---> Sink 

As soon as the job is started it tries to recover from the latest Checkpoint 

[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which reads from Kafka topics using a regex pattern, the job starts to fail with the following exception: 


[05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. java.lang.NullPointerException     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)     at java.util.TreeMap.put(Unknown Source)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)     at java.lang.Thread.run(Unknown Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:


  • Flink version: 1.4.2
  • Docker image with the job embedded
  • Job Parallelism : 8 
Cheers, 

Laura 

Reply | Threaded
Open this post in threaded view
|

Re: Job continuously failing after Checkpoint Restore

Yun Tang
Hi Laura

From the exception stack, there exist two possible reasons causing this NPE. Either the KafkaTopicPartition is null or field topic of that KafkaTopicPartition form the union state is null. No matter what reason, the problem might existed in the KryoSerializer which used to de/serialize the KafkaTopicPartition class. Gordon (in CC) who is expert at serialization might offer more insights.

Before further discussion, would you please offer more information:
  1. what version of Kafka did you use?
  2. Did you ever meet this problem ever?
  3. Have you ever changed anything before resuming your job?
  4. If trying to restore checkpoint-60 again by submitting another job, will you also meet this NPE continuously again?
Best
Yun Tang

From: Laura Uzcátegui <[hidden email]>
Sent: Wednesday, March 6, 2019 21:35
To: user
Subject: Job continuously failing after Checkpoint Restore
 
Hi, 

We are currently running a Flink Job that has 3 operators. 

Source ---> Filter ---> Sink 

As soon as the job is started it tries to recover from the latest Checkpoint 

[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which reads from Kafka topics using a regex pattern, the job starts to fail with the following exception: 


[05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. java.lang.NullPointerException     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)     at java.util.TreeMap.put(Unknown Source)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)     at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)     at java.lang.Thread.run(Unknown Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:


  • Flink version: 1.4.2
  • Docker image with the job embedded
  • Job Parallelism : 8 
Cheers, 

Laura