s3 checkpointing issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

s3 checkpointing issue

Chen Qin
Hi there,

I run a test job with filestatebackend and save checkpoints on s3 (via s3a)

The job crash when checkpoint triggered. Looking into s3 directory and list objects. I found the directory is create successfully but all checkpoints directory size are empty. 

The host running task manager shows following error.

Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx

Has anyone met this issue before? 

flink 1.0.0 
scala 2.10 
hadoop-aws 2.7.2 
aws-java-sdk 1.7.4


Thanks,
Chen

Attached full log that shows on web dashboard when job canceled. 
java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) at java.io.DataOutputStream.write(DataOutputStream.java:88) at java.io.DataOutputStream.write(DataOutputStream.java:88) at org.apache.flink.types.StringValue.writeString(StringValue.java:813) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) at org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481) at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678) ... 8 more Caused by: java.lang.NullPointerException at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) ... 25 more

Reply | Threaded
Open this post in threaded view
|

Re: s3 checkpointing issue

Ufuk Celebi
Hey Chen Qin,

this seems to be an issue with the S3 file system. The root cause is:

 Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more

From [1] it looks like you have to specify

fs.s3a.buffer.dir

in the Hadoop configuration (where you set the S3 file system).

The expected value is a comma separated list of local directories used
to buffer results prior to transmitting the to S3 (for large files).

Does this fix the issue? Please report back so that we can include in
the "common issues" section of the AWS docs.

– Ufuk

[1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/


On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote:

> Hi there,
>
> I run a test job with filestatebackend and save checkpoints on s3 (via s3a)
>
> The job crash when checkpoint triggered. Looking into s3 directory and list
> objects. I found the directory is create successfully but all checkpoints
> directory size are empty.
>
> The host running task manager shows following error.
>
> Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
> Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx
>
> Has anyone met this issue before?
>
> flink 1.0.0
> scala 2.10
> hadoop-aws 2.7.2
> aws-java-sdk 1.7.4
>
>
> Thanks,
> Chen
>
> Attached full log that shows on web dashboard when job canceled.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
> not open output stream for state backend at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
> at java.io.DataOutputStream.write(DataOutputStream.java:88) at
> java.io.DataOutputStream.write(DataOutputStream.java:88) at
> org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at
> org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
> ... 8 more Caused by: java.lang.NullPointerException at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
> at
> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
> ... 25 more
>
Reply | Threaded
Open this post in threaded view
|

Re: s3 checkpointing issue

igor.berman
I think I've had this issue too and fixed it as Ufuk suggested
in core-site.xml

something like
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>


On 4 May 2016 at 11:10, Ufuk Celebi <[hidden email]> wrote:
Hey Chen Qin,

this seems to be an issue with the S3 file system. The root cause is:

 Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more

From [1] it looks like you have to specify

fs.s3a.buffer.dir

in the Hadoop configuration (where you set the S3 file system).

The expected value is a comma separated list of local directories used
to buffer results prior to transmitting the to S3 (for large files).

Does this fix the issue? Please report back so that we can include in
the "common issues" section of the AWS docs.

– Ufuk

[1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/


On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote:
> Hi there,
>
> I run a test job with filestatebackend and save checkpoints on s3 (via s3a)
>
> The job crash when checkpoint triggered. Looking into s3 directory and list
> objects. I found the directory is create successfully but all checkpoints
> directory size are empty.
>
> The host running task manager shows following error.
>
> Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
> Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx
>
> Has anyone met this issue before?
>
> flink 1.0.0
> scala 2.10
> hadoop-aws 2.7.2
> aws-java-sdk 1.7.4
>
>
> Thanks,
> Chen
>
> Attached full log that shows on web dashboard when job canceled.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
> not open output stream for state backend at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
> at java.io.DataOutputStream.write(DataOutputStream.java:88) at
> java.io.DataOutputStream.write(DataOutputStream.java:88) at
> org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at
> org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
> ... 8 more Caused by: java.lang.NullPointerException at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
> at
> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
> ... 25 more
>

Reply | Threaded
Open this post in threaded view
|

Re: s3 checkpointing issue

Chen Qin
Uruk & Igor,

Thanks for helping out!  Yup, it fixed my issue.

Chen



On Wed, May 4, 2016 at 12:57 PM, Igor Berman <[hidden email]> wrote:
I think I've had this issue too and fixed it as Ufuk suggested
in core-site.xml

something like
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>


On 4 May 2016 at 11:10, Ufuk Celebi <[hidden email]> wrote:
Hey Chen Qin,

this seems to be an issue with the S3 file system. The root cause is:

 Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more

From [1] it looks like you have to specify

fs.s3a.buffer.dir

in the Hadoop configuration (where you set the S3 file system).

The expected value is a comma separated list of local directories used
to buffer results prior to transmitting the to S3 (for large files).

Does this fix the issue? Please report back so that we can include in
the "common issues" section of the AWS docs.

– Ufuk

[1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/


On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote:
> Hi there,
>
> I run a test job with filestatebackend and save checkpoints on s3 (via s3a)
>
> The job crash when checkpoint triggered. Looking into s3 directory and list
> objects. I found the directory is create successfully but all checkpoints
> directory size are empty.
>
> The host running task manager shows following error.
>
> Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
> Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx
>
> Has anyone met this issue before?
>
> flink 1.0.0
> scala 2.10
> hadoop-aws 2.7.2
> aws-java-sdk 1.7.4
>
>
> Thanks,
> Chen
>
> Attached full log that shows on web dashboard when job canceled.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
> not open output stream for state backend at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
> at java.io.DataOutputStream.write(DataOutputStream.java:88) at
> java.io.DataOutputStream.write(DataOutputStream.java:88) at
> org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at
> org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
> ... 8 more Caused by: java.lang.NullPointerException at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
> at
> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
> ... 25 more
>


Reply | Threaded
Open this post in threaded view
|

Re: s3 checkpointing issue

Ufuk Celebi
OK, thanks for reporting back. Thanks to Igor as well.

I just updated the docs with a note about this.

On Thu, May 5, 2016 at 3:16 AM, Chen Qin <[hidden email]> wrote:

> Uruk & Igor,
>
> Thanks for helping out!  Yup, it fixed my issue.
>
> Chen
>
>
>
> On Wed, May 4, 2016 at 12:57 PM, Igor Berman <[hidden email]> wrote:
>>
>> I think I've had this issue too and fixed it as Ufuk suggested
>> in core-site.xml
>>
>> something like
>> <property>
>> <name>fs.s3a.buffer.dir</name>
>> <value>/tmp</value>
>> </property>
>>
>>
>> On 4 May 2016 at 11:10, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> Hey Chen Qin,
>>>
>>> this seems to be an issue with the S3 file system. The root cause is:
>>>
>>>  Caused by: java.lang.NullPointerException at
>>>
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>>
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>>> ... 25 more
>>>
>>> From [1] it looks like you have to specify
>>>
>>> fs.s3a.buffer.dir
>>>
>>> in the Hadoop configuration (where you set the S3 file system).
>>>
>>> The expected value is a comma separated list of local directories used
>>> to buffer results prior to transmitting the to S3 (for large files).
>>>
>>> Does this fix the issue? Please report back so that we can include in
>>> the "common issues" section of the AWS docs.
>>>
>>> – Ufuk
>>>
>>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
>>>
>>>
>>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin <[hidden email]> wrote:
>>> > Hi there,
>>> >
>>> > I run a test job with filestatebackend and save checkpoints on s3 (via
>>> > s3a)
>>> >
>>> > The job crash when checkpoint triggered. Looking into s3 directory and
>>> > list
>>> > objects. I found the directory is create successfully but all
>>> > checkpoints
>>> > directory size are empty.
>>> >
>>> > The host running task manager shows following error.
>>> >
>>> > Received error response:
>>> > com.amazonaws.services.s3.model.AmazonS3Exception:
>>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549,
>>> > AWS
>>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request
>>> > ID:xxxxx
>>> >
>>> > Has anyone met this issue before?
>>> >
>>> > flink 1.0.0
>>> > scala 2.10
>>> > hadoop-aws 2.7.2
>>> > aws-java-sdk 1.7.4
>>> >
>>> >
>>> > Thanks,
>>> > Chen
>>> >
>>> > Attached full log that shows on web dashboard when job canceled.
>>> > java.lang.RuntimeException: Error triggering a checkpoint as the result
>>> > of
>>> > receiving checkpoint barrier at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
>>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
>>> > Could
>>> > not open output stream for state backend at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
>>> > at java.io.DataOutputStream.write(DataOutputStream.java:88) at
>>> > java.io.DataOutputStream.write(DataOutputStream.java:88) at
>>> > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
>>> >
>>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
>>> > at
>>> >
>>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>> > at
>>> >
>>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>>> > at
>>> >
>>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
>>> > at
>>> >
>>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
>>> > at
>>> >
>>> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
>>> > ... 8 more Caused by: java.lang.NullPointerException at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>>> > at
>>> > org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
>>> > at
>>> > org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>> >
>>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>>> > at
>>> >
>>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>>> > ... 25 more
>>> >
>>
>>
>