Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal





Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal





Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal





Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Chesnay Schepler
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Timothy Victor
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
So there a re 2 scenerios 

1. If JM goes down ( exits ) and k8s re launches the Job Cluster ( the JM ),  it is using the exact command, the JM was brought up in the first place.

2. If the pipe is restarted for any other reason by the JM ( the JM has not exited but  Job Kafka-to-HDFS (00000000000000000000000000000005) switched from state RESTARTING to CREATED. ) ,  it does the right thing.

Not what is the right way to handle 1. apart from  
spec:
  restartPolicy: Never
and manually restart...


On Sat, Jun 29, 2019 at 9:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Why did JM fail on K8s (see original thread below)

Timothy Victor
In reply to this post by Vishal Santoshi
This is slightly off topic, so I'm changing the subject to not conflate the original issue you brought up.   But do we know why JM crashed in the first place?

We are also thinking of moving to K8s, but to be honest we had tons of stability issues in our first rodeo.  That could just be our lack of a deeper understanding of K8s (a lot had to do with DNS).  But cases like this make me wonder what happened to cause the crash in the first place.

Thanks

Tim


On Sat, Jun 29, 2019, 8:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Why did JM fail on K8s (see original thread below)

Vishal Santoshi
We are investigating that.   But is the above theory plausible ( flink gurus ) even though this, as in forcing restartPolicy: Never pretty much nullifies HA on JM is it is a Job cluster ( at leats on k8s ) 


As for the reason we are investigating that. 

One thing we looking as the QOS ( https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/ ) ,  which was Burstable, a limit was not set on the job. The below for example would have a QOS of guaranteed. So one reason could be this. 

resources:
requests:
cpu: "8.0"
memory: "16Gi"
limits:
cpu: "8.0"
memory: "16Gi"




On Sat, Jun 29, 2019 at 9:35 AM Timothy Victor <[hidden email]> wrote:
This is slightly off topic, so I'm changing the subject to not conflate the original issue you brought up.   But do we know why JM crashed in the first place?

We are also thinking of moving to K8s, but to be honest we had tons of stability issues in our first rodeo.  That could just be our lack of a deeper understanding of K8s (a lot had to do with DNS).  But cases like this make me wonder what happened to cause the crash in the first place.

Thanks

Tim


On Sat, Jun 29, 2019, 8:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Why did JM fail on K8s (see original thread below)

Vishal Santoshi
This is strange, the retry strategy was 20 times with 4 minute delay.  This job tried once ( we had a hadoop Name Node hiccup )  but I think it could not even get to NN and gave up ( as in did not retry the next 19 times ) ....

019-06-29 00:33:13,680 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job Kafka-to-HDFS (00000000000000000000000000000005) because the restart strategy prevented it.

On Sat, Jun 29, 2019 at 10:03 AM Vishal Santoshi <[hidden email]> wrote:
We are investigating that.   But is the above theory plausible ( flink gurus ) even though this, as in forcing restartPolicy: Never pretty much nullifies HA on JM is it is a Job cluster ( at leats on k8s ) 


As for the reason we are investigating that. 

One thing we looking as the QOS ( https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/ ) ,  which was Burstable, a limit was not set on the job. The below for example would have a QOS of guaranteed. So one reason could be this. 

resources:
requests:
cpu: "8.0"
memory: "16Gi"
limits:
cpu: "8.0"
memory: "16Gi"




On Sat, Jun 29, 2019 at 9:35 AM Timothy Victor <[hidden email]> wrote:
This is slightly off topic, so I'm changing the subject to not conflate the original issue you brought up.   But do we know why JM crashed in the first place?

We are also thinking of moving to K8s, but to be honest we had tons of stability issues in our first rodeo.  That could just be our lack of a deeper understanding of K8s (a lot had to do with DNS).  But cases like this make me wonder what happened to cause the crash in the first place.

Thanks

Tim


On Sat, Jun 29, 2019, 8:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Why did JM fail on K8s (see original thread below)

Vishal Santoshi
even though 
Max. number of execution retriesRestart with fixed delay (240000 ms). #20 restart attempts.

On Sat, Jun 29, 2019 at 10:44 AM Vishal Santoshi <[hidden email]> wrote:
This is strange, the retry strategy was 20 times with 4 minute delay.  This job tried once ( we had a hadoop Name Node hiccup )  but I think it could not even get to NN and gave up ( as in did not retry the next 19 times ) ....

019-06-29 00:33:13,680 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job Kafka-to-HDFS (00000000000000000000000000000005) because the restart strategy prevented it.

On Sat, Jun 29, 2019 at 10:03 AM Vishal Santoshi <[hidden email]> wrote:
We are investigating that.   But is the above theory plausible ( flink gurus ) even though this, as in forcing restartPolicy: Never pretty much nullifies HA on JM is it is a Job cluster ( at leats on k8s ) 


As for the reason we are investigating that. 

One thing we looking as the QOS ( https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/ ) ,  which was Burstable, a limit was not set on the job. The below for example would have a QOS of guaranteed. So one reason could be this. 

resources:
requests:
cpu: "8.0"
memory: "16Gi"
limits:
cpu: "8.0"
memory: "16Gi"




On Sat, Jun 29, 2019 at 9:35 AM Timothy Victor <[hidden email]> wrote:
This is slightly off topic, so I'm changing the subject to not conflate the original issue you brought up.   But do we know why JM crashed in the first place?

We are also thinking of moving to K8s, but to be honest we had tons of stability issues in our first rodeo.  That could just be our lack of a deeper understanding of K8s (a lot had to do with DNS).  But cases like this make me wonder what happened to cause the crash in the first place.

Thanks

Tim


On Sat, Jun 29, 2019, 8:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal






Reply | Threaded
Open this post in threaded view
|

Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

Vishal Santoshi
In reply to this post by Vishal Santoshi
I guess using a session cluster rather then a job cluster will decouple the job from the container and may be the only option as of today?

On Sat, Jun 29, 2019, 9:34 AM Vishal Santoshi <[hidden email]> wrote:
So there a re 2 scenerios 

1. If JM goes down ( exits ) and k8s re launches the Job Cluster ( the JM ),  it is using the exact command, the JM was brought up in the first place.

2. If the pipe is restarted for any other reason by the JM ( the JM has not exited but  Job Kafka-to-HDFS (00000000000000000000000000000005) switched from state RESTARTING to CREATED. ) ,  it does the right thing.

Not what is the right way to handle 1. apart from  
spec:
  restartPolicy: Never
and manually restart...


On Sat, Jun 29, 2019 at 9:25 AM Vishal Santoshi <[hidden email]> wrote:
Another point the JM had terminated. The policy on K8s for Job Cluster is

spec:
restartPolicy: OnFailure

2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.



On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <[hidden email]> wrote:
I have not tried on bare metal. We have no option but k8s.

And this is a job cluster.

On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <[hidden email]> wrote:
Hi Vishal, can this be reproduced on a bare metal instance as well?   Also is this a job or a session cluster?

Thanks 

Tim

On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <[hidden email]> wrote:
OK this happened again and it is bizarre ( and is definitely not what I think should happen ) 




The job failed and I see these logs  ( In essence it is keeping the last 5 externalized checkpoints )  but deleting the zk checkpoints directory

06.28.2019 20:33:13.738    2019-06-29 00:33:13,736 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5654 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' not discarded.
    06.28.2019 20:33:13.788    2019-06-29 00:33:13,786 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5655 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' not discarded.
    06.28.2019 20:33:13.838    2019-06-29 00:33:13,836 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5656 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' not discarded.
    06.28.2019 20:33:13.888    2019-06-29 00:33:13,886 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5657 at 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 5658 at 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' not discarded.
    06.28.2019 20:33:13.938    2019-06-29 00:33:13,936 INFO  org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore  - Removing /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 from ZooKeeper


The job restarts and this is bizzare. It does not find the ZK checkpoint directory but instead of going to the state.checkpoints.dir to get it's last checkpoint, it restarts from a save point that we started this job with ( resetting the checkpoint id )  like about 15 days ago 


    06.28.2019 20:33:20.047    2019-06-29 00:33:20,045 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 0 checkpoints from storage.
    06.28.2019 20:33:20.053    2019-06-29 00:33:20,051 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 0 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.054    2019-06-29 00:33:20,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 00000000000000000000000000000005 from savepoint hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f ()
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 00000000000000000000000000000005 to 4203.
    06.28.2019 20:33:20.540    2019-06-29 00:33:20,538 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 4202.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,548 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
    06.28.2019 20:33:20.550    2019-06-29 00:33:20,549 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to fetch 1 checkpoints from storage.




This just does not make sense....













On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi <[hidden email]> wrote:
Ok, I will do that. 

On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <[hidden email]> wrote:
Can you provide us the jobmanager logs?

After the first restart the JM should have started deleting older checkpoints as new ones were created.
After the second restart the JM should have recovered all 10 checkpoints, start from the latest, and start pruning old ones as new ones were created.

So you're running into 2 separate issues here, which is a bit odd.

On 05/06/2019 13:44, Vishal Santoshi wrote:
Any one?

On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi <[hidden email]> wrote:
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi <[hidden email]> wrote:
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

As expected I have 5 latest checkpoints retained in my hdfs backend.
    

* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s  job restores from the latest checkpoint ( I think ) but as it creates new checkpoints it does not delete the older chk point. At the end there are now 10 chkpoints,  5 from the old run which remain static and 5 latest representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like it is taking the 5th checkpoint from the beginning based on num-retained value, Note that it has the same job id and does not scope to a new directory. 


Please tell me if this does not make sense.

Vishal