|
So the reason for the 500 and 503 error is that the resources in my S3 MINIO instance have run out of disk space. The part files are actually being written to disk but not to the bucket I've specified in the StreamingFileSink. And nowhere else that I can physically search. Hi I am not an expert on the s3. But from [1] it seems that "Increased 500 and 503 error rates can be due to you pushing higher TPS rates on a bucket that is not pre-partitioned." So could you try to partition your s3 bucket? Or reduce the parallelism of the sink?
Hello,
This doesn’t appear to be a permissions issue since I can write a file to a bucket using:
resultStream.writeAsText("s3://my-bucket/output.txt")
Nothing in the logs show authorization or permission exceptions.
The problem is reproducible. The logs show the part file being written to S3 but the file does not render in the bucket from the MINIO UI.
Using Flink Version 1.12.0. BTW. I'm using the same S3 instance for savepoints and checkpoints storage.
Here’s the rolling policy implementation:
OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("pre-") .withPartSuffix(".txt") .build();
final StreamingFileSink<Tuple2<String, Long>> sink =
StreamingFileSink.forRowFormat(
new Path("s3://my-bucket/"),
new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();
On Fri, Feb 12, 2021 at 11:04 AM Robert Metzger rmetzger@... wrote:
Hey,
is this problem reproducible, or just happening every now and then (not saying that this makes it less worse). Have you tried the presto s3 connector of Flink? (afaik that's the recommended one) Which Flink version are you using? Are you using any custom implementations for the rolling policy, or formats etc.?
I’m using a StreamingFileSink to write data to my S3 instance. When writing the part file this exception occurs:
2021-02-11 12:08:39
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.s3a.AWSStatus500Exception: initiate MultiPartUpload on 037f957d-454f-4cc5-bc42-4496f1606e90/prefix-0-108713.ext: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null), S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856:InternalError: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:251)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:33)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:68)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error, please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: 1662C082134AF296; S3 Extended Request ID: 624a965d-25c8-4112-ac45-b64525f70856; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 35 more
Any ideas?
Robert Cullen 240-475-4490
-- Robert Cullen 240-475-4490
-- Robert Cullen 240-475-4490
|