Hi there, I run a test job with filestatebackend and save checkpoints on s3 (via s3a) The job crash when checkpoint triggered. Looking into s3 directory and list objects. I found the directory is create successfully but all checkpoints directory size are empty. The host running task manager shows following error. Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx Has anyone met this issue before? flink 1.0.0 scala 2.10 hadoop-aws 2.7.2 aws-java-sdk 1.7.4 Thanks, Chen Attached full log that shows on web dashboard when job canceled. java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) at java.io.DataOutputStream.write(DataOutputStream.java:88) at java.io.DataOutputStream.write(DataOutputStream.java:88) at org.apache.flink.types.StringValue.writeString(StringValue.java:813) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) at org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481) at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678) ... 8 more Caused by: java.lang.NullPointerException at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) ... 25 more |
Hey Chen Qin,
this seems to be an issue with the S3 file system. The root cause is: Caused by: java.lang.NullPointerException at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) ... 25 more From [1] it looks like you have to specify fs.s3a.buffer.dir in the Hadoop configuration (where you set the S3 file system). The expected value is a comma separated list of local directories used to buffer results prior to transmitting the to S3 (for large files). Does this fix the issue? Please report back so that we can include in the "common issues" section of the AWS docs. – Ufuk [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/ On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote: > Hi there, > > I run a test job with filestatebackend and save checkpoints on s3 (via s3a) > > The job crash when checkpoint triggered. Looking into s3 directory and list > objects. I found the directory is create successfully but all checkpoints > directory size are empty. > > The host running task manager shows following error. > > Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS > Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx > > Has anyone met this issue before? > > flink 1.0.0 > scala 2.10 > hadoop-aws 2.7.2 > aws-java-sdk 1.7.4 > > > Thanks, > Chen > > Attached full log that shows on web dashboard when job canceled. > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could > not open output stream for state backend at > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) > at java.io.DataOutputStream.write(DataOutputStream.java:88) at > java.io.DataOutputStream.write(DataOutputStream.java:88) at > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) > at > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) > at > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) > at > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678) > ... 8 more Caused by: java.lang.NullPointerException at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) > at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) > at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) > at > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) > at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) > at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) > ... 25 more > |
I think I've had this issue too and fixed it as Ufuk suggested in core-site.xml something like <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> On 4 May 2016 at 11:10, Ufuk Celebi <[hidden email]> wrote: Hey Chen Qin, |
Uruk & Igor, Thanks for helping out! Yup, it fixed my issue. Chen On Wed, May 4, 2016 at 12:57 PM, Igor Berman <[hidden email]> wrote:
|
OK, thanks for reporting back. Thanks to Igor as well.
I just updated the docs with a note about this. On Thu, May 5, 2016 at 3:16 AM, Chen Qin <[hidden email]> wrote: > Uruk & Igor, > > Thanks for helping out! Yup, it fixed my issue. > > Chen > > > > On Wed, May 4, 2016 at 12:57 PM, Igor Berman <[hidden email]> wrote: >> >> I think I've had this issue too and fixed it as Ufuk suggested >> in core-site.xml >> >> something like >> <property> >> <name>fs.s3a.buffer.dir</name> >> <value>/tmp</value> >> </property> >> >> >> On 4 May 2016 at 11:10, Ufuk Celebi <[hidden email]> wrote: >>> >>> Hey Chen Qin, >>> >>> this seems to be an issue with the S3 file system. The root cause is: >>> >>> Caused by: java.lang.NullPointerException at >>> >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) >>> at >>> org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at >>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at >>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at >>> >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) >>> at >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) >>> ... 25 more >>> >>> From [1] it looks like you have to specify >>> >>> fs.s3a.buffer.dir >>> >>> in the Hadoop configuration (where you set the S3 file system). >>> >>> The expected value is a comma separated list of local directories used >>> to buffer results prior to transmitting the to S3 (for large files). >>> >>> Does this fix the issue? Please report back so that we can include in >>> the "common issues" section of the AWS docs. >>> >>> – Ufuk >>> >>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/ >>> >>> >>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote: >>> > Hi there, >>> > >>> > I run a test job with filestatebackend and save checkpoints on s3 (via >>> > s3a) >>> > >>> > The job crash when checkpoint triggered. Looking into s3 directory and >>> > list >>> > objects. I found the directory is create successfully but all >>> > checkpoints >>> > directory size are empty. >>> > >>> > The host running task manager shows following error. >>> > >>> > Received error response: >>> > com.amazonaws.services.s3.model.AmazonS3Exception: >>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, >>> > AWS >>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request >>> > ID:xxxxx >>> > >>> > Has anyone met this issue before? >>> > >>> > flink 1.0.0 >>> > scala 2.10 >>> > hadoop-aws 2.7.2 >>> > aws-java-sdk 1.7.4 >>> > >>> > >>> > Thanks, >>> > Chen >>> > >>> > Attached full log that shows on web dashboard when job canceled. >>> > java.lang.RuntimeException: Error triggering a checkpoint as the result >>> > of >>> > receiving checkpoint barrier at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at >>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: >>> > Could >>> > not open output stream for state backend at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) >>> > at java.io.DataOutputStream.write(DataOutputStream.java:88) at >>> > java.io.DataOutputStream.write(DataOutputStream.java:88) at >>> > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at >>> > >>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) >>> > at >>> > >>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) >>> > at >>> > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) >>> > at >>> > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) >>> > at >>> > >>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) >>> > at >>> > >>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) >>> > at >>> > >>> > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) >>> > at >>> > >>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175) >>> > at >>> > >>> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678) >>> > ... 8 more Caused by: java.lang.NullPointerException at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) >>> > at >>> > org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) >>> > at >>> > org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at >>> > >>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) >>> > at >>> > >>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) >>> > ... 25 more >>> > >> >> > |
Free forum by Nabble | Edit this page |