What S3 Permissions does StreamingFileSink need?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

What S3 Permissions does StreamingFileSink need?

Li Peng-2
Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles for auth. Does anyone know what permissions the role should have for the specified s3 bucket to work properly? I've been getting some auth errors, and I suspect I'm missing some permissions:

data "aws_iam_policy_document" "s3_policy_document" {
version = "2012-10-17"

statement {
actions = [
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:DeleteObject",
"s3:Get*",
"s3:List*",
"s3:PutBucketVersioning",
"s3:PutObject",
"s3:PutObjectTagging"
]

resources = [
"arn:aws:s3:::bucket-name",
"arn:aws:s3:::bucket-name/*"
]
}
}
Maybe the CreateBucket permission doesn't work for create buckets within subbuckets?

Thanks,
Li
Reply | Threaded
Open this post in threaded view
|

Re: What S3 Permissions does StreamingFileSink need?

r_khachatryan
Hi Li,

Could you please list the permissions you see and the error message you
receive from AWS?


Li Peng-2 wrote
> Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> for
> auth. Does anyone know what permissions the role should have for the
> specified s3 bucket to work properly? I've been getting some auth errors,
> and I suspect I'm missing some permissions:

Regards,
Roman



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What S3 Permissions does StreamingFileSink need?

Li Peng-2
Hey Roman, my permissions is as listed above, and here's the error message I get:

ava.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1(DataStream.scala:675)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1$adapted(DataStream.scala:675)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 13 more
Caused by: java.nio.file.AccessDeniedException: 2019-12-05--02/part-1-0: initiate MultiPartUpload on 2019-12-05--02/part-1-0: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX), S3 Extended Request ID: XXXX=:AccessDenied
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 26 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX, S3 Extended Request ID: XXXX
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 42 more

Maybe I need to add the directory level as a resource?

resources = [
 "arn:aws:s3:::bucket-name",
 "arn:aws:s3:::bucket-name/",
 "arn:aws:s3:::bucket-name/*" 
]

Thanks,
Li

On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan <[hidden email]> wrote:
Hi Li,

Could you please list the permissions you see and the error message you
receive from AWS?


Li Peng-2 wrote
> Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> for
> auth. Does anyone know what permissions the role should have for the
> specified s3 bucket to work properly? I've been getting some auth errors,
> and I suspect I'm missing some permissions:

Regards,
Roman



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What S3 Permissions does StreamingFileSink need?

r_khachatryan
Hey Li,

> my permissions is as listed above
As I understand it, it's a terraform script above. But what are the actual permissions in AWS? 
And it also makes sense to make sure that they are associated with the right role and role with user.

> Maybe I need to add the directory level as a resource?
You don't have to.

If it's possible in your setup, you can debug by granting all s3 permissions to all objects, like this:
    actions   = ["s3:*"]
    resources = ["*"]

Regards,
Roman


On Fri, Dec 6, 2019 at 12:15 AM Li Peng <[hidden email]> wrote:
Hey Roman, my permissions is as listed above, and here's the error message I get:

ava.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1(DataStream.scala:675)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1$adapted(DataStream.scala:675)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 13 more
Caused by: java.nio.file.AccessDeniedException: 2019-12-05--02/part-1-0: initiate MultiPartUpload on 2019-12-05--02/part-1-0: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX), S3 Extended Request ID: XXXX=:AccessDenied
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 26 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX, S3 Extended Request ID: XXXX
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 42 more

Maybe I need to add the directory level as a resource?

resources = [
 "arn:aws:s3:::bucket-name",
 "arn:aws:s3:::bucket-name/",
 "arn:aws:s3:::bucket-name/*" 
]

Thanks,
Li

On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan <[hidden email]> wrote:
Hi Li,

Could you please list the permissions you see and the error message you
receive from AWS?


Li Peng-2 wrote
> Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> for
> auth. Does anyone know what permissions the role should have for the
> specified s3 bucket to work properly? I've been getting some auth errors,
> and I suspect I'm missing some permissions:

Regards,
Roman



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: What S3 Permissions does StreamingFileSink need?

Li Peng-2
Ah, I figured it out after all, turns out it was due to KMS encryption on the bucket; needed to add KMS permissions for the IAM role, otherwise there is an unauthorized error. Thanks for your help!

On Fri, Dec 6, 2019 at 2:34 AM Khachatryan Roman <[hidden email]> wrote:
Hey Li,

> my permissions is as listed above
As I understand it, it's a terraform script above. But what are the actual permissions in AWS? 
And it also makes sense to make sure that they are associated with the right role and role with user.

> Maybe I need to add the directory level as a resource?
You don't have to.

If it's possible in your setup, you can debug by granting all s3 permissions to all objects, like this:
    actions   = ["s3:*"]
    resources = ["*"]

Regards,
Roman


On Fri, Dec 6, 2019 at 12:15 AM Li Peng <[hidden email]> wrote:
Hey Roman, my permissions is as listed above, and here's the error message I get:

ava.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1(DataStream.scala:675)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1$adapted(DataStream.scala:675)
at scala.collection.immutable.List.foreach(List.scala:388)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 13 more
Caused by: java.nio.file.AccessDeniedException: 2019-12-05--02/part-1-0: initiate MultiPartUpload on 2019-12-05--02/part-1-0: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX), S3 Extended Request ID: XXXX=:AccessDenied
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 26 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX, S3 Extended Request ID: XXXX
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 42 more

Maybe I need to add the directory level as a resource?

resources = [
 "arn:aws:s3:::bucket-name",
 "arn:aws:s3:::bucket-name/",
 "arn:aws:s3:::bucket-name/*" 
]

Thanks,
Li

On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan <[hidden email]> wrote:
Hi Li,

Could you please list the permissions you see and the error message you
receive from AWS?


Li Peng-2 wrote
> Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> for
> auth. Does anyone know what permissions the role should have for the
> specified s3 bucket to work properly? I've been getting some auth errors,
> and I suspect I'm missing some permissions:

Regards,
Roman



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/