Hi all,
I am using the filesystem state backend with checkpointing to S3. From the JobManager logs, I can see that it works most of the time, e.g., 2016-07-26 17:49:07,311 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 @ 1469555347310 2016-07-26 17:49:11,128 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 (in 3335 ms) However, taking the checkpoint fails with the following exception from time to time: 2016-07-26 17:50:07,310 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 @ 1469555407310 2016-07-26 17:50:12,225 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - TriggerWindow(SlidingEventTimeWindows(3600000, 1000), ListStateDescriptor{name=window-contents, defaultValue=null, serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@103b8046}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:226)) -> Sink: Unnamed (1/1) (0ec242b46c49039f673dc902fd983f49) switched from RUNNING to FAILED 2016-07-26 17:50:12,227 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job bd2930a4d6e7cf8d04d3bbafe22e386b ([...]) changed to FAILING. java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701) #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691) #011at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) #011at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) #011at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) #011at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) #011at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) #011at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) #011at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Failed to fetch state handle size #011at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234) #011at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:528) #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695) #011... 8 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: [...] #011at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) #011at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) #011at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) #011at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) #011at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) #011at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) #011at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892) #011at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) #011at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) #011at org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) #011at org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) #011at org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428) #011at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) #011at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231) #011... 10 more All logs are from the machine running the JobManager. The status code suggests that this is a problem with permissions. However, I can see checkpoints stored correctly in the configured S3 bucket. Also, sometimes old checkpoints are not removed. Does anybody here experience the same problems? Can it be that S3 is flaky? Find below my configuration: Flink 1.0.3 libs/ aws-java-sdk-1.7.4.jar hadoop-aws-2.7.2.jar httpclient-4.2.5.jar httpcore-4.2.5.jar contents of core-site.xml <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/data/tmp/hadoop</value> </property> <property> <name>fs.s3a.attempts.maximum</name> <value>10</value> </property> <property> <name>fs.s3a.endpoint</name> <value>s3-eu-west-1.amazonaws.com</value> </property> </configuration> Best, Gary signature.asc (859 bytes) Download Attachment |
Hey Gary,
your configuration looks good to me. I think that it could be an issue with S3 as you suggest. It might help to decrease the checkpointing interval (if you use case requirements allow for this) in order to have less interaction with S3. In general, your program should still continue as expected even when checkpoints fail intermittently. Did you ever try DEBUG logging for org.apache.hadoop.fs.s3a.S3AFileSystem? Maybe those give some hints about what's happening there. If you have time to provide those logs, I would like to take a look at them. – Ufuk On Wed, Jul 27, 2016 at 10:56 AM, Gary Yao <[hidden email]> wrote: > Hi all, > > I am using the filesystem state backend with checkpointing to S3. > From the JobManager logs, I can see that it works most of the time, e.g., > > 2016-07-26 17:49:07,311 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 @ 1469555347310 > 2016-07-26 17:49:11,128 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 (in 3335 ms) > > However, taking the checkpoint fails with the > following exception from time to time: > > 2016-07-26 17:50:07,310 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 @ 1469555407310 > 2016-07-26 17:50:12,225 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - TriggerWindow(SlidingEventTimeWindows(3600000, 1000), ListStateDescriptor{name=window-contents, defaultValue=null, serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@103b8046}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:226)) -> Sink: Unnamed (1/1) (0ec242b46c49039f673dc902fd983f49) switched from RUNNING to FAILED > 2016-07-26 17:50:12,227 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job bd2930a4d6e7cf8d04d3bbafe22e386b ([...]) changed to FAILING. > java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier > #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701) > #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691) > #011at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > #011at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > #011at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) > #011at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > #011at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > #011at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > #011at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Failed to fetch state handle size > #011at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234) > #011at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:528) > #011at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695) > #011... 8 more > Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: [...] > #011at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) > #011at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) > #011at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) > #011at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) > #011at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) > #011at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) > #011at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892) > #011at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > #011at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > #011at org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > #011at org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > #011at org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428) > #011at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > #011at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231) > #011... 10 more > > All logs are from the machine running the JobManager. > > The status code suggests that this is a problem with permissions. > However, I can see checkpoints stored correctly in the configured > S3 bucket. Also, sometimes old checkpoints are not removed. Does anybody > here experience the same problems? Can it be that S3 is flaky? > > Find below my configuration: > > Flink 1.0.3 > > libs/ > aws-java-sdk-1.7.4.jar > hadoop-aws-2.7.2.jar > httpclient-4.2.5.jar > httpcore-4.2.5.jar > > contents of core-site.xml > <configuration> > <property> > <name>fs.s3.impl</name> > <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> > </property> > <property> > <name>hadoop.tmp.dir</name> > <value>/data/tmp/hadoop</value> > </property> > <property> > <name>fs.s3a.attempts.maximum</name> > <value>10</value> > </property> > <property> > <name>fs.s3a.endpoint</name> > <value>s3-eu-west-1.amazonaws.com</value> > </property> > </configuration> > > > Best, > Gary |
Free forum by Nabble | Edit this page |