Exception when writing part file to S3

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception when writing part file to S3

Robert Cullen

I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:

2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 35 more
Any ideas?


Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Exception when writing part file to S3

rmetzger0
Hey,


is this problem reproducible, or just happening every now and then (not saying that this makes it less worse). Have you tried the presto s3 connector of Flink? (afaik that's the recommended one)
Which Flink version are you using?
Are you using any custom implementations for the rolling policy, or formats etc.?



On Thu, Feb 11, 2021 at 6:33 PM Robert Cullen <[hidden email]> wrote:

I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:

2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 35 more
Any ideas?


Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Exception when writing part file to S3

Robert Cullen

Hello,

This doesn’t appear to be a permissions issue since I can write a file to a bucket using:

resultStream.writeAsText("s3://my-bucket/output.txt")

Nothing in the logs show authorization or permission exceptions.

The problem is reproducible. The logs show the part file being written to S3 but the file does not render in the bucket from the MINIO UI.

Using Flink Version 1.12.0.  BTW.  I'm using the same S3 instance for savepoints and checkpoints storage.

Here’s the rolling policy implementation:

OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("pre-")
.withPartSuffix(".txt")
.build();

final StreamingFileSink<Tuple2<String, Long>> sink = StreamingFileSink.forRowFormat( new Path("s3://my-bucket/"), new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8")) .withBucketAssigner(new KeyBucketAssigner()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build();

On Fri, Feb 12, 2021 at 11:04 AM Robert Metzger rmetzger@... wrote:

Hey,


is this problem reproducible, or just happening every now and then (not saying that this makes it less worse). Have you tried the presto s3 connector of Flink? (afaik that's the recommended one)
Which Flink version are you using?
Are you using any custom implementations for the rolling policy, or formats etc.?



On Thu, Feb 11, 2021 at 6:33 PM Robert Cullen <[hidden email]> wrote:

I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:

2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 35 more
Any ideas?


Robert Cullen
240-475-4490

--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Exception when writing part file to S3

Guowei Ma
Hi 
I am not an expert on the s3. But from [1]  it seems that  "Increased 500 and 503 error rates can be due to you pushing higher TPS rates on a bucket that is not pre-partitioned."
So could you try to partition your s3 bucket? Or reduce the parallelism of the sink?


On Sat, Feb 13, 2021 at 1:00 AM Robert Cullen <[hidden email]> wrote:

Hello,

This doesn’t appear to be a permissions issue since I can write a file to a bucket using:

resultStream.writeAsText("s3://my-bucket/output.txt")

Nothing in the logs show authorization or permission exceptions.

The problem is reproducible. The logs show the part file being written to S3 but the file does not render in the bucket from the MINIO UI.

Using Flink Version 1.12.0.  BTW.  I'm using the same S3 instance for savepoints and checkpoints storage.

Here’s the rolling policy implementation:

OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("pre-")
.withPartSuffix(".txt")
.build();

final StreamingFileSink<Tuple2<String, Long>> sink = StreamingFileSink.forRowFormat( new Path("s3://my-bucket/"), new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8")) .withBucketAssigner(new KeyBucketAssigner()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build();

On Fri, Feb 12, 2021 at 11:04 AM Robert Metzger rmetzger@... wrote:

Hey,


is this problem reproducible, or just happening every now and then (not saying that this makes it less worse). Have you tried the presto s3 connector of Flink? (afaik that's the recommended one)
Which Flink version are you using?
Are you using any custom implementations for the rolling policy, or formats etc.?



On Thu, Feb 11, 2021 at 6:33 PM Robert Cullen <[hidden email]> wrote:

I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:

2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 35 more
Any ideas?


Robert Cullen
240-475-4490

--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Exception when writing part file to S3

Robert Cullen
So the reason for the 500 and 503 error is that the resources in my S3 MINIO instance have run out of disk space.  The part files are actually being written to disk but not to the bucket I've specified in the StreamingFileSink.  And nowhere else that I can physically search.

On Tue, Feb 16, 2021 at 7:04 AM Guowei Ma <[hidden email]> wrote:
Hi 
I am not an expert on the s3. But from [1]  it seems that  "Increased 500 and 503 error rates can be due to you pushing higher TPS rates on a bucket that is not pre-partitioned."
So could you try to partition your s3 bucket? Or reduce the parallelism of the sink?


On Sat, Feb 13, 2021 at 1:00 AM Robert Cullen <[hidden email]> wrote:

Hello,

This doesn’t appear to be a permissions issue since I can write a file to a bucket using:

resultStream.writeAsText("s3://my-bucket/output.txt")

Nothing in the logs show authorization or permission exceptions.

The problem is reproducible. The logs show the part file being written to S3 but the file does not render in the bucket from the MINIO UI.

Using Flink Version 1.12.0.  BTW.  I'm using the same S3 instance for savepoints and checkpoints storage.

Here’s the rolling policy implementation:

OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("pre-")
.withPartSuffix(".txt")
.build();

final StreamingFileSink<Tuple2<String, Long>> sink = StreamingFileSink.forRowFormat( new Path("s3://my-bucket/"), new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8")) .withBucketAssigner(new KeyBucketAssigner()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build();

On Fri, Feb 12, 2021 at 11:04 AM Robert Metzger rmetzger@... wrote:

Hey,


is this problem reproducible, or just happening every now and then (not saying that this makes it less worse). Have you tried the presto s3 connector of Flink? (afaik that's the recommended one)
Which Flink version are you using?
Are you using any custom implementations for the rolling policy, or formats etc.?



On Thu, Feb 11, 2021 at 6:33 PM Robert Cullen <[hidden email]> wrote:

I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:

2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
    at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
    at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
    at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 35 more
Any ideas?


Robert Cullen
240-475-4490

--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490