S3 Write Execption

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 Write Execption

Aneesha Kaushal-2
Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

I am unable to find the cause of this error. Also, I have the following questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha


Reply | Threaded
Open this post in threaded view
|

Re: S3 Write Execption

Fabian Hueske-2
Hi Aneesha,

the logs would show that Flink is going through a recovery cycle. Recovery means to cancel running tasks and start them again.
If you don't see something like that in the logs, Flink continues to processing.

I'm not familiar with the details of S3, so I can't tell if the exception indicates data loss.

Best, Fabian

2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <[hidden email]>:
Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

I am unable to find the cause of this error. Also, I have the following questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha



Reply | Threaded
Open this post in threaded view
|

Re: S3 Write Execption

Stephan Ewen
It is very important to point out that the Bucketing sink can currently NOT work properly on S3. It assumes a consistent file system (like listing / renaming works consistently), and S3 is only eventually consistent. I assume that this eventual consistency of S3 is the cause of your error.

There is a pull request for a bucketing sink on eventually consistent FS: https://github.com/apache/flink/pull/3752
Hope we can merge this once we are done with the 1.3.2 release.

(cc-ing Gordon and Aljoscha, FYI)

On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske <[hidden email]> wrote:
Hi Aneesha,

the logs would show that Flink is going through a recovery cycle. Recovery means to cancel running tasks and start them again.
If you don't see something like that in the logs, Flink continues to processing.

I'm not familiar with the details of S3, so I can't tell if the exception indicates data loss.

Best, Fabian

2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <[hidden email]>:
Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

I am unable to find the cause of this error. Also, I have the following questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha




Reply | Threaded
Open this post in threaded view
|

Re: S3 Write Execption

Vinay Patil
Hi Stephan,

In our case as well we are writing to S3 sink by enabling S3 consistency. I am not facing any consistency related issues.
But Eventual consistency bucketing sink will certainly help, I will test it out with another project where we are facing consistency issue.

@Aneesha : Looking at the logs I don't think this is a consistency related issue. The exception gives you 404 code, can you please verify the bucket name you are trying to access.

Regards,
Vinay Patil

On Wed, Aug 2, 2017 at 4:43 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
It is very important to point out that the Bucketing sink can currently NOT work properly on S3. It assumes a consistent file system (like listing / renaming works consistently), and S3 is only eventually consistent. I assume that this eventual consistency of S3 is the cause of your error.

There is a pull request for a bucketing sink on eventually consistent FS: https://github.com/apache/flink/pull/3752
Hope we can merge this once we are done with the 1.3.2 release.

(cc-ing Gordon and Aljoscha, FYI)

On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske <[hidden email]> wrote:
Hi Aneesha,

the logs would show that Flink is going through a recovery cycle. Recovery means to cancel running tasks and start them again.
If you don't see something like that in the logs, Flink continues to processing.

I'm not familiar with the details of S3, so I can't tell if the exception indicates data loss.

Best, Fabian

2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <[hidden email]>:
Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

I am unable to find the cause of this error. Also, I have the following questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha







If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-Write-Execption-tp14615p14630.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: S3 Write Execption

Vinay Patil
In reply to this post by Stephan Ewen
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 Write Execption

Vinay Patil
In reply to this post by Stephan Ewen
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/