Exception when trying to change StreamingFileSink S3 bucket

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception when trying to change StreamingFileSink S3 bucket

sidsaurav
Hi,

We are trying to change our StreamingFileSink S3 bucket, say from s3://eu1/output_old to s3://eu2/output_new. When we do so we get an exception and the taskmanger goes into a restart loop.

We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception.

Logs:

> "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...",

> "Task.java","line":"910","message":"... switched from RUNNING to FAILED."

> java.nio.file.AccessDeniedException:
output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden
flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
flink-taskmanager at java.lang.Thread.run(Thread.java:748)

We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified.

Thanks
Sidhartha
Reply | Threaded
Open this post in threaded view
|

Re: Exception when trying to change StreamingFileSink S3 bucket

sidsaurav
Hi,

Can someone suggest a workaround so that we do not get this issue while changing the S3 bucket ?

On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <[hidden email]> wrote:
Hi,

We are trying to change our StreamingFileSink S3 bucket, say from s3://eu1/output_old to s3://eu2/output_new. When we do so we get an exception and the taskmanger goes into a restart loop.

We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception.

Logs:

> "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...",

> "Task.java","line":"910","message":"... switched from RUNNING to FAILED."

> java.nio.file.AccessDeniedException:
output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden
flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
flink-taskmanager at java.lang.Thread.run(Thread.java:748)

We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified.

Thanks
Sidhartha
Reply | Threaded
Open this post in threaded view
|

Re: Exception when trying to change StreamingFileSink S3 bucket

Fabian Hueske-2
Hi,

Kostas (in CC) might be able to help.

Best, Fabian

Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <[hidden email]>:
Hi,

Can someone suggest a workaround so that we do not get this issue while changing the S3 bucket ?

On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <[hidden email]> wrote:
Hi,

We are trying to change our StreamingFileSink S3 bucket, say from s3://eu1/output_old to s3://eu2/output_new. When we do so we get an exception and the taskmanger goes into a restart loop.

We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception.

Logs:

> "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...",

> "Task.java","line":"910","message":"... switched from RUNNING to FAILED."

> java.nio.file.AccessDeniedException:
output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden
flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
flink-taskmanager at java.lang.Thread.run(Thread.java:748)

We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified.

Thanks
Sidhartha
Reply | Threaded
Open this post in threaded view
|

Re: Exception when trying to change StreamingFileSink S3 bucket

Kostas Kloudas-2
Hi Sidhartha,

Your explanation is correct.
If you stopped the job with a savepoint and then you try to restore
from that savepoint, then Flink will try to restore its state
which is, of course, included in its old bucket.

But new data will go to the new bucket.

One solution is either to restart your job from scratch, if you do not
care about your "old" state.

Cheers,
Kostas

On Thu, Sep 5, 2019 at 1:27 PM Fabian Hueske <[hidden email]> wrote:

>
> Hi,
>
> Kostas (in CC) might be able to help.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <[hidden email]>:
>>
>> Hi,
>>
>> Can someone suggest a workaround so that we do not get this issue while changing the S3 bucket ?
>>
>> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> We are trying to change our StreamingFileSink S3 bucket, say from s3://eu1/output_old to s3://eu2/output_new. When we do so we get an exception and the taskmanger goes into a restart loop.
>>>
>>> We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception.
>>>
>>> Logs:
>>>
>>> > "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...",
>>>
>>> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED."
>>>
>>> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden
>>>
>>> flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>>> flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>>> flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>>> flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified.
>>>
>>> Thanks
>>> Sidhartha