Hi, We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception. Logs: > "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...", > "Task.java","line":"910","message":"... switched from RUNNING to FAILED." > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified. Thanks Sidhartha |
Hi, Can someone suggest a workaround so that we do not get this issue while changing the S3 bucket ? On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <[hidden email]> wrote:
|
Hi, Kostas (in CC) might be able to help. Best, Fabian Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <[hidden email]>:
|
Hi Sidhartha,
Your explanation is correct. If you stopped the job with a savepoint and then you try to restore from that savepoint, then Flink will try to restore its state which is, of course, included in its old bucket. But new data will go to the new bucket. One solution is either to restart your job from scratch, if you do not care about your "old" state. Cheers, Kostas On Thu, Sep 5, 2019 at 1:27 PM Fabian Hueske <[hidden email]> wrote: > > Hi, > > Kostas (in CC) might be able to help. > > Best, Fabian > > Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <[hidden email]>: >> >> Hi, >> >> Can someone suggest a workaround so that we do not get this issue while changing the S3 bucket ? >> >> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <[hidden email]> wrote: >>> >>> Hi, >>> >>> We are trying to change our StreamingFileSink S3 bucket, say from s3://eu1/output_old to s3://eu2/output_new. When we do so we get an exception and the taskmanger goes into a restart loop. >>> >>> We suspect that it tries to restore state and gets the bucketid from saved state [<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception. >>> >>> Logs: >>> >>> > "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...", >>> >>> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED." >>> >>> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden >>> >>> flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) >>> flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126) >>> flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) >>> flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) >>> flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>> flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >>> flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>> flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >>> flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >>> flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >>> flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>> flink-taskmanager at java.lang.Thread.run(Thread.java:748) >>> >>> >>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified. >>> >>> Thanks >>> Sidhartha |
Free forum by Nabble | Edit this page |