I’m developing a system where we are trying to sink to an immutable s3 bucket. This bucket has server side encryption set as KMS. The DataStream sink works perfectly fine when I don’t use the immutable bucket but when I use an immutable bucket, I get exceptions regarding multipart upload failures. It says we need to enable md5 hashing for the put object to work.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx; S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 common frames omitted
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxxxx; S3 Extended Request ID: xxxx)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 12 common frames omitted
1. Is this md5 hashing a mandatory rule to support this? The first part of the file always gets uploaded to s3 but next part onward, it fails.
2. If it’s mandatory, how do I set this HTTP header while sinking? I checked most of the documentation and tried going through the source code too but couldn’t really find a provision where we could set the headers for a request that goes in as a sink.