We've added flink-s3-fs-hadoop library to plugins folder and trying to
bootstrap state to S3 using S3A protocol. The following exception happens (unless hadoop library is put to lib folder instead of plugins). Looks like S3A filesystem is trying to use "local" filesystem for temporary files and fails: java.lang.Exception: Could not write timer service of MapPartition (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59) at org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84) at org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209) at org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58) at org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163) at org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 14 common frames omitted Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file" at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) ... 32 common frames omitted -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Thanks for reporting the issue, I’ve created the jira ticket for that [1]. We will investigate it and try to address it somehow. Could you try out if the same issue happen when you use flink-s3-fs-presto [2]? Piotrek [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#shaded-hadooppresto-s3-file-systems
|
Actually, I forgot to mention that it happens when there's also a presto
library in plugins folder (we are using presto for checkpoints and hadoop for file sinks in the job itself) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
But from the stack trace that you have posted it looks like you are using Hadoop’s S3 implementation for the checkpointing? If so, can you try using Presto and check whether you still encounter the same issue?
Also, could you explain how to reproduce the issue? What configuration are you using? Does it happens always? On the first checkpoint? Piotrek > On 30 Oct 2019, at 17:43, spoganshev <[hidden email]> wrote: > > Actually, I forgot to mention that it happens when there's also a presto > library in plugins folder (we are using presto for checkpoints and hadoop > for file sinks in the job itself) > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The problem happens in batch jobs (the ones that use ExecutionEnvironment)
that use state processor api for bootstrapping initial savepoint for streaming job. We are building a single docker image for streaming and batch versions of the job. In that image we put both presto (which we use for checkpoints in streaming job) and hadoop to separate plugin folders. When we run a batch job using this image the aforementioned exception happens. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Ok, thanks for the explanation now it makes sense. Previously I haven’t noticed that those snapshot state calls visible in your stack trace come from State Processor API. We will try to reproduce it, so we might have more questions later, but those information might be enough.
One more question for now, have you tried using Presto for the bootstrapping/batch job as well? Piotrek > On 31 Oct 2019, at 23:49, spoganshev <[hidden email]> wrote: > > The problem happens in batch jobs (the ones that use ExecutionEnvironment) > that use state processor api for bootstrapping initial savepoint for > streaming job. > > We are building a single docker image for streaming and batch versions of > the job. In that image we put both presto (which we use for checkpoints in > streaming job) and hadoop to separate plugin folders. When we run a batch > job using this image the aforementioned exception happens. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
No, I didn't because it's inconvenient for us to have 2 different docker
images for streaming and batch jobs. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |