Checkpoint loading failure

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint loading failure

Padarn Wilson-2
Hi all,

We have a job that has a medium size state (around 4GB) and after adding a new part of the job graph (which should not impact the job too much) we found that every single checkpoint restore has the following error: 

Caused by: java.io.IOException: s3a://<REDACTED>: Stream is closed!
at org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
... 17 more

I haven't really got any clues on what this is caused by. You notice we are using the Hadoop file system, but switching to Presto is a bit tricky for us because of some of the bucket permissions that would need to change.

Anyone have tips on debugging (or solving this)? 
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint loading failure

Guowei Ma
Hi Padarn
Will there be these errors if the jobgraph is not modified?
In addition, is this error stack all? Is it possible that other errors caused the stream to be closed?
Best,
Guowei


On Tue, Jun 15, 2021 at 9:54 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

We have a job that has a medium size state (around 4GB) and after adding a new part of the job graph (which should not impact the job too much) we found that every single checkpoint restore has the following error: 

Caused by: java.io.IOException: s3a://<REDACTED>: Stream is closed!
at org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
... 17 more

I haven't really got any clues on what this is caused by. You notice we are using the Hadoop file system, but switching to Presto is a bit tricky for us because of some of the bucket permissions that would need to change.

Anyone have tips on debugging (or solving this)?