Hi, I'm trying to understand the exactly once semantics of the StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it guarantees exactly once under a very specific failure scenario. For simplicity, lets say we will roll the current part file on checkpoint (and only on checkpoint), the process is as follows: 1. Framework tells the sink to prepare for a checkpoint. This ultimately results in 'onReceptionOfCheckpoint' being called on Bucket.java. 2. This takes the current file, and based on our roll policy of rolling on checkpoint, it closes and uploads it to S3 as part of a MPU and the reference to this upload is stored as part of 'pendingPartsPerCheckpoint'. 3. Once the checkpoint successfully completes, the bucket is notified via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes through all pendingPartsPerCheckpoint and for each of them: recovers the in progress part (which doesn't exist in this scenario) and then commits the upload. 4. The AmazonS3Client is ultimately called to perform the upload and it retries the attempt up to N times. If it exhausts retries, it will throw an Exception. 5. Upon successful commit of the MPU, Bucket clears out its references to these uploads from its state. Given this flow, I'm having trouble understanding how the following scenario works:
So how is this case handled? Really appreciate the help! -Kaustubh |
Hi Kaustubh, Your general understanding is correct. In this case though, the sink will call the S3Committer#commitAfterRecovery() method. This method, after failing to commit the MPU, it will check if the file is there and if the length is correct, and if everything is ok (which is the case in your example), then it will continue to normal execution. I hope this helps. Kostas On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <[hidden email]> wrote:
|
Hi Kostas, Thanks for the response! Yes - I see the commitAfterRecovery being called when a Bucket is restored. I confused myself in thinking that 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which led me to believe that we were only calling commit and not commitAfterRecovery. Thanks for the clarification! -Kaustubh On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <[hidden email]> wrote:
|
No problem! On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |