Does flink support retries on checkpoint write failures

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Does flink support retries on checkpoint write failures

Richard Deurwaarder
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

Till Rohrmann
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

Richard Deurwaarder
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have a fix in the works that would solve the root problem.
In the meanwhile `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very promising! 
Thank you for this. I'm going to try this tomorrow to see if that helps. I will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

wvl
Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

wvl
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason for which somehow caused a record to be lost (e.g. rocksdb exception / taskmanager crash / etc), there would be no Source rewind to the last successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <[hidden email]> wrote:
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have a fix in the works that would solve the root problem.
In the meanwhile `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very promising! 
Thank you for this. I'm going to try this tomorrow to see if that helps. I will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

Arvid Heise-3
If a checkpoint is not successful, it cannot be used for recovery.
That means Flink will restart to the last successful checkpoint and hence not lose any data.

On Wed, Jan 29, 2020 at 9:52 PM wvl <[hidden email]> wrote:
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason for which somehow caused a record to be lost (e.g. rocksdb exception / taskmanager crash / etc), there would be no Source rewind to the last successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <[hidden email]> wrote:
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have a fix in the works that would solve the root problem.
In the meanwhile `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very promising! 
Thank you for this. I'm going to try this tomorrow to see if that helps. I will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

Richard Deurwaarder
Hi Till & others,

We enabled setFailOnCheckpointingErrors (setTolerableCheckpointFailureNumber isn't available in 1.8) and this indeed prevents the large number of restarts.

Hopefully a solution for the reported issue[1] with google gets found but for now this solved our immediate problem. 

Thanks again!


Regards,

Richard

On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise <[hidden email]> wrote:
If a checkpoint is not successful, it cannot be used for recovery.
That means Flink will restart to the last successful checkpoint and hence not lose any data.

On Wed, Jan 29, 2020 at 9:52 PM wvl <[hidden email]> wrote:
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason for which somehow caused a record to be lost (e.g. rocksdb exception / taskmanager crash / etc), there would be no Source rewind to the last successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <[hidden email]> wrote:
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have a fix in the works that would solve the root problem.
In the meanwhile `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very promising! 
Thank you for this. I'm going to try this tomorrow to see if that helps. I will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Does flink support retries on checkpoint write failures

Till Rohrmann
Glad to hear that you could solve/mitigate the problem and thanks for letting us know.

Cheers,
Till

On Sat, Feb 1, 2020 at 2:45 PM Richard Deurwaarder <[hidden email]> wrote:
Hi Till & others,

We enabled setFailOnCheckpointingErrors (setTolerableCheckpointFailureNumber isn't available in 1.8) and this indeed prevents the large number of restarts.

Hopefully a solution for the reported issue[1] with google gets found but for now this solved our immediate problem. 

Thanks again!


Regards,

Richard

On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise <[hidden email]> wrote:
If a checkpoint is not successful, it cannot be used for recovery.
That means Flink will restart to the last successful checkpoint and hence not lose any data.

On Wed, Jan 29, 2020 at 9:52 PM wvl <[hidden email]> wrote:
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason for which somehow caused a record to be lost (e.g. rocksdb exception / taskmanager crash / etc), there would be no Source rewind to the last successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <[hidden email]> wrote:
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have a fix in the works that would solve the root problem.
In the meanwhile `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very promising! 
Thank you for this. I'm going to try this tomorrow to see if that helps. I will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2, 3]. The proposed solution/workaround so far is to retry the whole upload operation as part of the application logic. Since I assume that you are writing to GCS via Hadoop's file system this should actually fall into the realm of the Hadoop file system implementation and not Flink. 

What you could do to mitigate the problem a bit is to set the number of tolerable checkpoint failures to a non-zero value via `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` means that the job will only fail and then restart after `n` checkpoint failures. Unfortunately, we do not support a failure rate yet.


Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <[hidden email]> wrote:
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large amount of state (500gb range) are becoming *very* unstable. In the order of restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors from Google Cloud Storage. This indicates an upload (write from Flink's perspective) took place and it wanted to resume the write[2] but could not find the file which it needed to resume. My guess is this is because the previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all flushes have been done based on an atomic guarantee provided by HDFS that does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,