Hi there, I'm trying to persist events coming from a Kinesis Stream to S3 using the new StreamingFileSink.
final StreamingFileSink<TripEvent> bulkFormatSink = StreamingFileSink .forBulkFormat( new Path(...), ParquetAvroWriters.forSpecificRecord(TripEvent.class) ) .withBucketAssigner(...) .build(); I'm ingesting 50-60k events/sec into a Kinesis stream with 64 shards. Flink is deployed on an EMR cluster with 4 m5.xlarge nodes and configured to use three nodes with two slots each. The job itself is running
with a parallelism of 6 and checkpoints are triggered ever 15 minutes. This works, but I'm occasionally seeing checkpoints fail (roughly every 10 hours), which is caused by an AmazonS3Exception (see attachment for the whole trace): Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period.
Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: D2BDE02E02977189; S3 Extended Request ID: +elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A=), S3 Extended Request ID:
+elMrRg5E4VHqCeqzRqeFAeVbjkFanwFIDF+lUACCXLGlO989SzSoyuqZMEAjEBn+siC3s++48A= at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) From the attached screenshot you can see that a couple of minutes before the exception occurs, around the time the checkpoint is triggered, the number of records that are read from the Kinesis stream is substantially
dropping relative to the number of records that are ingested into the stream. Eventually the checkpoint fails, but as HA is configured for the cluster the job manages to recover from the failure. My initial thought was that the cluster is overloaded, but the average cpu utilization is around 20%, the network isn’t saturated either, and scaling the cluster to larger nodes still showed a similar behavior. Is this a known issue? It would be great if someone could give me some pointers on how to debug this further. Thanks, Steffen Amazon Web Services EMEA SARL 38 avenue John F. Kennedy, L-1855 Luxembourg Sitz der Gesellschaft: L-1855 Luxemburg eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 Amazon Web Services EMEA SARL, Niederlassung Deutschland Marcel-Breuer-Str. 12, D-80807 Muenchen Sitz der Zweigniederlassung: Muenchen eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, USt-ID DE317013094 s3-exception-ganglia.png (423K) Download Attachment s3-exception-cloudwatch.png (401K) Download Attachment s3-exception-job-manager.log (28K) Download Attachment |
Hi Steffen,
Thanks for reporting this. Internally Flink does not keep any open connections to S3. It only keeps buffers data internally up till the point they reach a min-size limit (by default 5MB) and then uploads them as a part of an MPU on one go. Given this, I will have to dig a bit dipper to see why a connection would timeout. If you are willing to dig into the code, all interactions with S3 pass through the S3AccessHelper class and its implementation, the HadoopS3AccessHelper. For the buffering and uploading logic, you could have a look at the S3RecoverableWriter and the S3RecoverableFsDataOutputStream. I will keep looking into it. In the meantime, if you find anything let us know. Cheers, Kostas |
Oh this is timely! I hope I can save you some pain Kostas! (cc-ing to flink dev to get feedback there for what I believe to be a confirmed bug) I was just about to open up a flink issue for this after digging (really) deep and figuring out the issue over the weekend. The problem arises due the flink hands input streams to the S3AccessHelper. If you turn on debug logs for s3, you will eventually see this stack trace: 2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - FYI: failed to reset content inputstream before throwing up java.io.IOException: Resetting to invalid mark at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319) at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) From this, you can see that for (some reason) AWS fails to write a multi-part chunk and then tries to reset the input stream in order to retry but fails (because the InputStream is not mark-able) That exception is swallowed (it seems like it should be raised up to client, but isn't for an unknown reason). The s3-client then tries to repeat the request using it's built in retry logic, however, because the InputStream is consumed and has no more bytes to write, we never fill up the expected content-length that the s3 put request is expecting. Eventually, after it hits the max number of retries, it fails and you get the error above. I just started running a fix for this (which is a hack not the real solution) here: https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6 This whole thing is documented here: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html However, I found that just using the documented property didn't appear to work and I had to wrap the InputStream in the BufferedInputStream for it to work. I think the real fix is either to: 1. Use the BufferedInputStream but make it configurable 2. Refactor S3AccessHelper to have another signature that takes a File object and change the RefCountedFSOutputStream to also be able to give a reference the the underlying file. I can pretty easily do this work, but would be curious the direction that the maintainers would prefer. Thanks, Addison! On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]> wrote:
|
Hi Addison, Kostas, Steffan, I am also encountering this exact issue. I cannot find a JIRA ticket on this, is there some planned work on implementing a fix? @Addison - Did you manage to find a fix that you could apply without modifying the Flink codebase? If possible it would be better not patch the code base and compile a custom image. Thanks, Padarn On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:
Grab is hiring. Learn more at https://grab.careers By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law. |
Hi Padarn, This is the jira issue: https://issues.apache.org/jira/browse/FLINK-11187 and the fix, as you can see, was first included in version 1.7.2. Cheers, Kostas On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <[hidden email]> wrote:
Kostas Kloudas | Software Engineer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Thanks Kostas! On Mon, Feb 18, 2019 at 5:10 PM Kostas Kloudas <[hidden email]> wrote:
Grab is hiring. Learn more at https://grab.careers By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email and notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law. |
Free forum by Nabble | Edit this page |