Issue using Flink on EMR

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Issue using Flink on EMR

Ayush Verma-2
Hello,

We have a Flink on EMR setup following this guide. YARN, apparently changes the io.tmp.dirs property to /mnt/yarn & /mnt1/yarn. When using these directories, the flink job gets the following error.
2019-05-22 12:23:12,515 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job my-flink-job (e92bf814c495e2c713e24f1d37aa3afd) switched from state RUNNING to FAILING.
java.nio.file.NoSuchFileException: /mnt/yarn/usercache/hadoop/appcache/application_1558347223117_0001,/mnt1/yarn/usercache/hadoop/appcache/application_1558347223117_0001/.tmp_c729afcc-7bd7-4422-8232-306e28bc62c1
 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
 at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
 at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
 at java.nio.file.Files.newOutputStream(Files.java:216)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:80)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:39)
 at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.openNew(RefCountedBufferingFileStream.java:174)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.boundedBufferingFileStream(S3RecoverableFsDataOutputStream.java:271)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.newStream(S3RecoverableFsDataOutputStream.java:236)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:74)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
 at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
 at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:565)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
 at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:96)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:92)
 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
 at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:551)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:344)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

If I change the io.tmp.dirs to /tmp, for eg., the job works fine. BUT my understanding is that, YARN creates a shared directory, by mounting it on all the containers so that these files can survive container terminations. Looking for advice on how to investigate further into this issue and hopefully resolve it.