Exactly Once Guarantees with StreamingFileSink to S3

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Exactly Once Guarantees with StreamingFileSink to S3

Kaustubh Rudrawar
Hi,

I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 
2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload.
4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state.

Given this flow, I'm having trouble understanding how the following scenario works:
  • Step 4: The commit on the MPU succeeds, 
  • Step 5: Before this step completes, the task crashes. So at this point, S3 has successfully completed the MPU but to the client (the Flink job), it has not completed. 
  • Flink will then recover from the checkpoint we just took and steps 3 and 4 will be repeated. My understanding is that, since the MPU succeeded previously, any attempts at re-committing that upload will result in a 404 ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get retried by the framework and this process repeats itself.
So how is this case handled? 

Really appreciate the help!
-Kaustubh 


Reply | Threaded
Open this post in threaded view
|

Re: Exactly Once Guarantees with StreamingFileSink to S3

Kostas Kloudas-2
Hi Kaustubh,

Your general understanding is correct.

In this case though, the sink will call the S3Committer#commitAfterRecovery() method.
This method, after failing to commit the MPU, it will check if the file is there and if the length
is correct, and if everything is ok (which is the case in your example), then it will 
continue to normal execution.

I hope this helps.

Kostas

On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <[hidden email]> wrote:
Hi,

I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 
2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload.
4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state.

Given this flow, I'm having trouble understanding how the following scenario works:
  • Step 4: The commit on the MPU succeeds, 
  • Step 5: Before this step completes, the task crashes. So at this point, S3 has successfully completed the MPU but to the client (the Flink job), it has not completed. 
  • Flink will then recover from the checkpoint we just took and steps 3 and 4 will be repeated. My understanding is that, since the MPU succeeded previously, any attempts at re-committing that upload will result in a 404 ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get retried by the framework and this process repeats itself.
So how is this case handled? 

Really appreciate the help!
-Kaustubh 


Reply | Threaded
Open this post in threaded view
|

Re: Exactly Once Guarantees with StreamingFileSink to S3

Kaustubh Rudrawar
Hi Kostas,

Thanks for the response! Yes - I see the commitAfterRecovery being called when a Bucket is restored. I confused myself in thinking that 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which led me to believe that we were only calling commit and not commitAfterRecovery. 

Thanks for the clarification!
-Kaustubh

On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <[hidden email]> wrote:
Hi Kaustubh,

Your general understanding is correct.

In this case though, the sink will call the S3Committer#commitAfterRecovery() method.
This method, after failing to commit the MPU, it will check if the file is there and if the length
is correct, and if everything is ok (which is the case in your example), then it will 
continue to normal execution.

I hope this helps.

Kostas

On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <[hidden email]> wrote:
Hi,

I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 
2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload.
4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state.

Given this flow, I'm having trouble understanding how the following scenario works:
  • Step 4: The commit on the MPU succeeds, 
  • Step 5: Before this step completes, the task crashes. So at this point, S3 has successfully completed the MPU but to the client (the Flink job), it has not completed. 
  • Flink will then recover from the checkpoint we just took and steps 3 and 4 will be repeated. My understanding is that, since the MPU succeeded previously, any attempts at re-committing that upload will result in a 404 ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get retried by the framework and this process repeats itself.
So how is this case handled? 

Really appreciate the help!
-Kaustubh 


Reply | Threaded
Open this post in threaded view
|

Re: Exactly Once Guarantees with StreamingFileSink to S3

Kostas Kloudas-3
No problem!

On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar <[hidden email]> wrote:
Hi Kostas,

Thanks for the response! Yes - I see the commitAfterRecovery being called when a Bucket is restored. I confused myself in thinking that 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which led me to believe that we were only calling commit and not commitAfterRecovery. 

Thanks for the clarification!
-Kaustubh

On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <[hidden email]> wrote:
Hi Kaustubh,

Your general understanding is correct.

In this case though, the sink will call the S3Committer#commitAfterRecovery() method.
This method, after failing to commit the MPU, it will check if the file is there and if the length
is correct, and if everything is ok (which is the case in your example), then it will 
continue to normal execution.

I hope this helps.

Kostas

On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <[hidden email]> wrote:
Hi,

I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 
2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload.
4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state.

Given this flow, I'm having trouble understanding how the following scenario works:
  • Step 4: The commit on the MPU succeeds, 
  • Step 5: Before this step completes, the task crashes. So at this point, S3 has successfully completed the MPU but to the client (the Flink job), it has not completed. 
  • Flink will then recover from the checkpoint we just took and steps 3 and 4 will be repeated. My understanding is that, since the MPU succeeded previously, any attempts at re-committing that upload will result in a 404 ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get retried by the framework and this process repeats itself.
So how is this case handled? 

Really appreciate the help!
-Kaustubh