Flink running in k8s pod - pod is able to access S3 bucket, flink does not

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink running in k8s pod - pod is able to access S3 bucket, flink does not

OranShuster
So i'm really stumped on this for a couple of days now
Some general info -
Flink version 1.12.1, using k8s HA service. The k8s is self managed on AWS
our checkpoints and savepoints are on s3, i created a new bucket just for it and set the proper permissions to the k8s node

The job manager is working, i can access the UI and upload a job. Looking at the startup logs i can see the bucket i set with no errors

2021-01-27 14:46:38,740 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Creating highly available BLOB storage directory at s3:/<bucketName>/ha-storage/default/blob

(while there is no error, i can't find that directory in the bucket)


However, once i submit the job i get an exception. Looking at the job manager logs im getting S3 access denied

2021-01-27 14:28:08,628 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT operation failed
java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 8W0N0T2R4P8P7YBT; S3 Extended Request ID: B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=; Proxy: null), S3 Extended Request ID: B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=

So i created a new image based on the flink image with the aws cli installed and tried doing some s3 actions from the flink user through the shell

flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 ls s3://<bucketName>
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ touch oran.txt
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 cp oran.txt s3://<bucketName>/oran.txt
upload: ./oran.txt to s3://houzz-flink-1-12-session-cluster/oran.txt

Some more information - we already have an older version of flink running on the same cluster/namespace (version 1.9.1) and it also uses s3 (a different bucket) and it's working. we used a homebrewed image for that version but it is closely based on how the original flink image is created (no funny buisness)

Also, the s3 plugin im using is flink-s3-fs-presto-1.12.1.jar using the ENABLE_BUILT_IN_PLUGINS env variable. i tried using the hadoop one but got an error message it's missing, not sure what's up with that.

totally working... and here i'm stuck. This makes 0 sense to me so i thought i should ask in the mailing list
Thanks for all the help


Reply | Threaded
Open this post in threaded view
|

Re: Flink running in k8s pod - pod is able to access S3 bucket, flink does not

OranShuster
I made some more tests and the issue is still not resolved

Since the submitted job main method is executed before the execution graph
is submitted i added the aws sdk as an dependency and used it to upload
files to the bucket in the main method

Once with the default credentials provider, this works
```
        Region region = Region.US_WEST_2;
        S3Client s3 = S3Client.builder().region(region)).build();
        PutObjectRequest request =
PutObjectRequest.builder().bucket(<bucket>).key("test").build();
        RequestBody body = RequestBody.empty();
        s3.putObject(request, body);
```
And now with the hardcoded credentials, this also works
```
        Region region = Region.US_WEST_2;
        AwsBasicCredentials awsBasicCredentials =
AwsBasicCredentials.create(<key>, <secret>);
        S3Client s3 =
S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials)).build();
        PutObjectRequest request =
PutObjectRequest.builder().bucket("houzz-flink-1-12-session-cluster").key("test").build();
        RequestBody body = RequestBody.empty();
        s3.putObject(request, body);
```

However, for both of these jobs i still get the following exception
```
21-01-31 14:48:23,289 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
- Job 419843d1aabcc8195b419b180fe5685c is submitted.
2021-01-31 14:48:23,289 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
- Submitting Job with JobId=419843d1aabcc8195b419b180fe5685c.
2021-01-31 14:48:23,797 ERROR
org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT
operation failed
java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
AccessDenied; Request ID: 1E0BFE7D27E24A3E; S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=;
Proxy: null), S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=
        at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1045)
~[?:?]
        at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:996)
~[?:?]
        at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
~[?:?]
        at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
~[?:?]
        at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
~[?:?]
        at
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:80)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:679)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID:
1E0BFE7D27E24A3E; S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=;
Proxy: null)
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:394)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:5950)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1812)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1772)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:168)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:148)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45)
~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_275]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-01-31 14:48:23,800 WARN
org.apache.flink.client.deployment.application.DetachedApplicationRunner []
- Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute job 'Flink Streaming Java API Skeleton'.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_275]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_275]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_275]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_275]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'Flink Streaming Java API Skeleton'.
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.orantests.StreamingJob.main(StreamingJob.java:64) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        ... 13 more
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: PUT operation failed: Could not transfer
error message
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:356)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: Could not transfer error message
        at
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:293)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_275]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
~[?:1.8.0_275]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_275]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
~[?:1.8.0_275]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_275]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_275]
        at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:633)
~[?:1.8.0_275]
        at java.lang.Throwable.readObject(Throwable.java:915) ~[?:1.8.0_275]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
        at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
~[?:1.8.0_275]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
~[?:1.8.0_275]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_275]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:290)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
2021-01-31 14:48:23,803 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception
occurred in REST handler: Could not execute application.
```

Something that's a bit odd from the logs
```
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.model.AmazonS3Exception
```

But since we get the request IDs in the log this probably means the sdk is
indeed in the class path



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink running in k8s pod - pod is able to access S3 bucket, flink does not

Itamar Syn-Hershko
Hi Oran,

How is that k8s deployed? Are you sure all nodes have the same IAM role? can you try and see if this is fixed by granting permissions to that bucket to the IAM role in use?


On Sun, Jan 31, 2021 at 5:15 PM OranShuster <[hidden email]> wrote:
I made some more tests and the issue is still not resolved

Since the submitted job main method is executed before the execution graph
is submitted i added the aws sdk as an dependency and used it to upload
files to the bucket in the main method

Once with the default credentials provider, this works
```
        Region region = Region.US_WEST_2;
        S3Client s3 = S3Client.builder().region(region)).build();
        PutObjectRequest request =
PutObjectRequest.builder().bucket(<bucket>).key("test").build();
        RequestBody body = RequestBody.empty();
        s3.putObject(request, body);
```
And now with the hardcoded credentials, this also works
```
        Region region = Region.US_WEST_2;
        AwsBasicCredentials awsBasicCredentials =
AwsBasicCredentials.create(<key>, <secret>);
        S3Client s3 =
S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials)).build();
        PutObjectRequest request =
PutObjectRequest.builder().bucket("houzz-flink-1-12-session-cluster").key("test").build();
        RequestBody body = RequestBody.empty();
        s3.putObject(request, body);
```

However, for both of these jobs i still get the following exception
```
21-01-31 14:48:23,289 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
- Job 419843d1aabcc8195b419b180fe5685c is submitted.
2021-01-31 14:48:23,289 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
- Submitting Job with JobId=419843d1aabcc8195b419b180fe5685c.
2021-01-31 14:48:23,797 ERROR
org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT
operation failed
java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
AccessDenied; Request ID: 1E0BFE7D27E24A3E; S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=;
Proxy: null), S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=
        at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1045)
~[?:?]
        at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:996)
~[?:?]
        at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
~[?:?]
        at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
~[?:?]
        at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
~[?:?]
        at
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:80)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:679)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID:
1E0BFE7D27E24A3E; S3 Extended Request ID:
HZ6T7Xwf/6p8hZvHWIy+TVT88g97oDnEr2OeNl75LuO+ny0c6PH/DvvmMDttUfv726o9Px0izrY=;
Proxy: null)
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[?:?]
        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:394)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:5950)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1812)
~[?:?]
        at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1772)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:168)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:148)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115)
~[?:?]
        at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45)
~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_275]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-01-31 14:48:23,800 WARN
org.apache.flink.client.deployment.application.DetachedApplicationRunner []
- Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute job 'Flink Streaming Java API Skeleton'.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_275]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_275]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_275]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_275]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'Flink Streaming Java API Skeleton'.
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at org.orantests.StreamingJob.main(StreamingJob.java:64) ~[?:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        ... 13 more
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: PUT operation failed: Could not transfer
error message
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:356)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.io.IOException: Could not transfer error message
        at
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:293)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_275]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
~[?:1.8.0_275]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_275]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
~[?:1.8.0_275]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_275]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_275]
        at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:633)
~[?:1.8.0_275]
        at java.lang.Throwable.readObject(Throwable.java:915) ~[?:1.8.0_275]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
        at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
~[?:1.8.0_275]
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
~[?:1.8.0_275]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
~[?:1.8.0_275]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_275]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:290)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:176)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_275]
        at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_275]
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:264)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.OnComplete.internal(Future.scala:261)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
2021-01-31 14:48:23,803 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception
occurred in REST handler: Could not execute application.
```

Something that's a bit odd from the logs
```
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.model.AmazonS3Exception
```

But since we get the request IDs in the log this probably means the sdk is
indeed in the class path



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
logo
Itamar Syn-Hershko
CTO, Founder
Email:[hidden email]
Website:https://bigdataboutique.com
        

Reply | Threaded
Open this post in threaded view
|

Re: Flink running in k8s pod - pod is able to access S3 bucket, flink does not

OranShuster
K8s is self managed on ec2 nodes
After submitting the job and getting an exception I checked:
1.ssh into the machine and verify using the cli the pod has access.
2. In the job main method I instantiate a s3 client from the sdk (once with
default credential chain and once with access key and secret)
3.use a public bucket

All these worked but the job still fails with an s3 exception access denied





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/