Hi Padarn
Will there be these errors if the jobgraph is not modified?
In addition, is this error stack all? Is it possible that other errors caused the stream to be closed?
Hi all,
We have a job that has a medium size state (around 4GB) and after adding a new part of the job graph (which should not impact the job too much) we found that every single checkpoint restore has the following error:
Caused by: java.io.IOException: s3a://<REDACTED>: Stream is closed!
at org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:781)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
... 17 more
I haven't really got any clues on what this is caused by. You notice we are using the Hadoop file system, but switching to Presto is a bit tricky for us because of some of the bucket permissions that would need to change.
Anyone have tips on debugging (or solving this)?