StreamingFileSink causing AmazonS3Exception

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink causing AmazonS3Exception

Hausmann, Steffen

Hi there,

 

I'm trying to persist events coming from a Kinesis Stream to S3 using the new StreamingFileSink.

 

    final StreamingFileSink<TripEvent> bulkFormatSink = StreamingFileSink

        .forBulkFormat(

            new Path(...),

            ParquetAvroWriters.forSpecificRecord(TripEvent.class)

        )

        .withBucketAssigner(...)

        .build();

 

I'm ingesting 50-60k events/sec into a Kinesis stream with 64 shards. Flink is deployed on an EMR cluster with 4 m5.xlarge nodes and configured to use three nodes with two slots each. The job itself is running with a parallelism of 6 and checkpoints are triggered ever 15 minutes. This works, but I'm occasionally seeing checkpoints fail (roughly every 10 hours), which is caused by an AmazonS3Exception (see attachment for the whole trace):

 

Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: D2BDE02E02977189; S3 Extended Request ID: +elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A=), S3 Extended Request ID: +elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A=

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)

       at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)

 

From the attached screenshot you can see that a couple of minutes before the exception occurs, around the time the checkpoint is triggered, the number of records that are read from the Kinesis stream is substantially dropping relative to the number of records that are ingested into the stream. Eventually the checkpoint fails, but as HA is configured for the cluster the job manages to recover from the failure.

 

My initial thought was that the cluster is overloaded, but the average cpu utilization is around 20%, the network isn’t saturated either, and scaling the cluster to larger nodes still showed a similar behavior.

 

Is this a known issue? It would be great if someone could give me some pointers on how to debug this further.

 

Thanks, Steffen

 




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094



s3-exception-ganglia.png (423K) Download Attachment
s3-exception-cloudwatch.png (401K) Download Attachment
s3-exception-job-manager.log (28K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Kostas Kloudas
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Addison Higham
Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) 


I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed
and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6


However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]> wrote:
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Padarn Wilson
Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without modifying the Flink codebase? If possible it would be better not patch the code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:
Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) 


I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed
and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6


However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]> wrote:
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Kostas Kloudas-4
Hi Padarn,

and the fix, as you can see, was first included in version 1.7.2.

Cheers,
Kostas

On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <[hidden email]> wrote:
Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without modifying the Flink codebase? If possible it would be better not patch the code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:
Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) 


I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed
and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6


However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]> wrote:
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.


--

Kostas Kloudas | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Padarn Wilson
Thanks Kostas!

On Mon, Feb 18, 2019 at 5:10 PM Kostas Kloudas <[hidden email]> wrote:
Hi Padarn,

and the fix, as you can see, was first included in version 1.7.2.

Cheers,
Kostas

On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <[hidden email]> wrote:
Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without modifying the Flink codebase? If possible it would be better not patch the code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:
Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) 


I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed
and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6


However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]> wrote:
Hi Steffen, 

Thanks for reporting this.

Internally Flink does not keep any open connections to S3.  It only keeps buffers data internally up 
till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of 
an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout.

If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper 
class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, 
you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream.

I will keep looking into it. In the meantime, if you find anything let us know.

Cheers,
Kostas


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.


--

Kostas Kloudas | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Grab is hiring. Learn more at https://grab.careers

By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.