Savepoint incomplete when job was killed after a cancel timeout

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Savepoint incomplete when job was killed after a cancel timeout

Paul Lam
Hi,

We have a Flink job that was stopped erroneously with no available checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the job by yarn kill command.
Usually, this would not cause troubles because we can still use the last checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged “Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only _metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint incomplete when job was killed after a cancel timeout

Till Rohrmann
Hi Paul,

could you share with us the logs of the JobManager? They might help to better understand in which order each operation occurred. 

How big are you expecting the size of the state to be? If it is smaller than state.backend.fs.memory-threshold, then the state data will be stored in the _metadata file.

Cheers,
Till

On Tue, Sep 29, 2020 at 1:52 PM Paul Lam <[hidden email]> wrote:
Hi,

We have a Flink job that was stopped erroneously with no available checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the job by yarn kill command.
Usually, this would not cause troubles because we can still use the last checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged “Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only _metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint incomplete when job was killed after a cancel timeout

Till Rohrmann
Thanks for sharing the logs with me. It looks as if the total size of the savepoint is 335kb for a job with a parallelism of 60 and a total of 120 tasks. Hence, the average size of a state per task is between 2.5kb - 5kb. I think that the state size threshold refers to the size of the per task state. Hence, I believe that the _metadata file should contain all of your state. Have you tried restoring from this savepoint?

Cheers,
Till

On Tue, Sep 29, 2020 at 3:47 PM Paul Lam <[hidden email]> wrote:
Hi Till,

Thanks for your quick reply.

The checkpoint/savepoint size would be around 2MB, which is larger than `state.backend.fs.memory-threshold`.

The jobmanager logs are attached, which looks normal to me.

Thanks again!

Best,
Paul Lam

Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午8:32写道:
Hi Paul,

could you share with us the logs of the JobManager? They might help to better understand in which order each operation occurred. 

How big are you expecting the size of the state to be? If it is smaller than state.backend.fs.memory-threshold, then the state data will be stored in the _metadata file.

Cheers,
Till

On Tue, Sep 29, 2020 at 1:52 PM Paul Lam <[hidden email]> wrote:
Hi,

We have a Flink job that was stopped erroneously with no available checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the job by yarn kill command.
Usually, this would not cause troubles because we can still use the last checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged “Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only _metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint incomplete when job was killed after a cancel timeout

Paul Lam
Hi Till,

Thanks a lot for the pointer! I tried to restore the job using the savepoint in a dry run, and it worked! 

Guess I've misunderstood the configuration option, and confused by the non-existent paths that the metadata contains.

Best, 
Paul Lam

Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午10:30写道:
Thanks for sharing the logs with me. It looks as if the total size of the savepoint is 335kb for a job with a parallelism of 60 and a total of 120 tasks. Hence, the average size of a state per task is between 2.5kb - 5kb. I think that the state size threshold refers to the size of the per task state. Hence, I believe that the _metadata file should contain all of your state. Have you tried restoring from this savepoint?

Cheers,
Till

On Tue, Sep 29, 2020 at 3:47 PM Paul Lam <[hidden email]> wrote:
Hi Till,

Thanks for your quick reply.

The checkpoint/savepoint size would be around 2MB, which is larger than `state.backend.fs.memory-threshold`.

The jobmanager logs are attached, which looks normal to me.

Thanks again!

Best,
Paul Lam

Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午8:32写道:
Hi Paul,

could you share with us the logs of the JobManager? They might help to better understand in which order each operation occurred. 

How big are you expecting the size of the state to be? If it is smaller than state.backend.fs.memory-threshold, then the state data will be stored in the _metadata file.

Cheers,
Till

On Tue, Sep 29, 2020 at 1:52 PM Paul Lam <[hidden email]> wrote:
Hi,

We have a Flink job that was stopped erroneously with no available checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the job by yarn kill command.
Usually, this would not cause troubles because we can still use the last checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged “Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only _metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam

Reply | Threaded
Open this post in threaded view
|

Re: Savepoint incomplete when job was killed after a cancel timeout

Till Rohrmann
Glad to hear that your job data was not lost!

Cheers,
Till

On Tue, Sep 29, 2020 at 7:28 PM Paul Lam <[hidden email]> wrote:
Hi Till,

Thanks a lot for the pointer! I tried to restore the job using the savepoint in a dry run, and it worked! 

Guess I've misunderstood the configuration option, and confused by the non-existent paths that the metadata contains.

Best, 
Paul Lam

Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午10:30写道:
Thanks for sharing the logs with me. It looks as if the total size of the savepoint is 335kb for a job with a parallelism of 60 and a total of 120 tasks. Hence, the average size of a state per task is between 2.5kb - 5kb. I think that the state size threshold refers to the size of the per task state. Hence, I believe that the _metadata file should contain all of your state. Have you tried restoring from this savepoint?

Cheers,
Till

On Tue, Sep 29, 2020 at 3:47 PM Paul Lam <[hidden email]> wrote:
Hi Till,

Thanks for your quick reply.

The checkpoint/savepoint size would be around 2MB, which is larger than `state.backend.fs.memory-threshold`.

The jobmanager logs are attached, which looks normal to me.

Thanks again!

Best,
Paul Lam

Till Rohrmann <[hidden email]> 于2020年9月29日周二 下午8:32写道:
Hi Paul,

could you share with us the logs of the JobManager? They might help to better understand in which order each operation occurred. 

How big are you expecting the size of the state to be? If it is smaller than state.backend.fs.memory-threshold, then the state data will be stored in the _metadata file.

Cheers,
Till

On Tue, Sep 29, 2020 at 1:52 PM Paul Lam <[hidden email]> wrote:
Hi,

We have a Flink job that was stopped erroneously with no available checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the job by yarn kill command.
Usually, this would not cause troubles because we can still use the last checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged “Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only _metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam