hadoop-free hdfs config

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

hadoop-free hdfs config

Oleksandr Baliev
Hello guys,

want to clarify for myself: since flink 1.4.0 allows to use hadoop-free distribution and dynamic hadoop dependencies loading, I suppose that if to download hadoop-free distribution, start cluster without any hadoop and then load any job's jar which has some hadoop dependencies (i used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start job which accesses hdfs via source/sink/etc. or making checkpoints can be run on such hadoop-free cluster.

But when I start a job during config initialization for checkpoint I have "Hadoop is not in the classpath/dependencies.":

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
...


 What I've found seems in org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema registered and FALLBACK_FACTORY which should be loaded with hadoop factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when taskmanager is starting (when there should be no hadoop dependencies), so that should be ok.

so as I understand hadoop file system is not recongnised by flink if it was not loaded at the beginning, is it correct or maybe I just messed up with something / somewhere? 

Thanks,
Sasha
Reply | Threaded
Open this post in threaded view
|

Re: hadoop-free hdfs config

Till Rohrmann
Hi Sasha,

you're right that if you want to access HDFS from the user code only it should be possible to use the Hadoop free Flink version and bundle the Hadoop dependencies with your user code. However, if you want to use Flink's file system state backend as you did, then you have to start the Flink cluster with the Hadoop dependency in its classpath. The reason is that the FsStateBackend is part of the Flink distribution and will be loaded using the system class loader.

One thing you could try out is to use the RocksDB state backend instead. Since the RocksDBStateBackend is loaded dynamically, I think it should use the Hadoop dependencies when trying to load the filesystem. 

Cheers,
Till

On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <[hidden email]> wrote:
Hello guys,

want to clarify for myself: since flink 1.4.0 allows to use hadoop-free distribution and dynamic hadoop dependencies loading, I suppose that if to download hadoop-free distribution, start cluster without any hadoop and then load any job's jar which has some hadoop dependencies (i used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start job which accesses hdfs via source/sink/etc. or making checkpoints can be run on such hadoop-free cluster.

But when I start a job during config initialization for checkpoint I have "Hadoop is not in the classpath/dependencies.":

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
...


 What I've found seems in org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema registered and FALLBACK_FACTORY which should be loaded with hadoop factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when taskmanager is starting (when there should be no hadoop dependencies), so that should be ok.

so as I understand hadoop file system is not recongnised by flink if it was not loaded at the beginning, is it correct or maybe I just messed up with something / somewhere? 

Thanks,
Sasha

Reply | Threaded
Open this post in threaded view
|

Re: hadoop-free hdfs config

Oleksandr Baliev
Hi Till,

thanks for your reply and clarification! With RocksDBStateBackend btw the same story, looks like a wrapper over FsStateBackend:

01/11/2018 09:27:22 Job execution switched to status FAILING.
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)


Then I also changed url for fs state backend to file:// which is ok, but then I have the same issue in BucketingSink:

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
...<some our simple wrapper class call>.initializeState(...)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 more


I was using for tests clean "Without bundled Hadood" flink binaries and didn't change anything in configs.

Currently we have to persist checkpoints on "hdfs" so we will use some flink-shaded-hadoop2-uber*.jar anyway, thanks.

Best,
Sasha

2018-01-10 10:47 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Sasha,

you're right that if you want to access HDFS from the user code only it should be possible to use the Hadoop free Flink version and bundle the Hadoop dependencies with your user code. However, if you want to use Flink's file system state backend as you did, then you have to start the Flink cluster with the Hadoop dependency in its classpath. The reason is that the FsStateBackend is part of the Flink distribution and will be loaded using the system class loader.

One thing you could try out is to use the RocksDB state backend instead. Since the RocksDBStateBackend is loaded dynamically, I think it should use the Hadoop dependencies when trying to load the filesystem. 

Cheers,
Till

On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <[hidden email]> wrote:
Hello guys,

want to clarify for myself: since flink 1.4.0 allows to use hadoop-free distribution and dynamic hadoop dependencies loading, I suppose that if to download hadoop-free distribution, start cluster without any hadoop and then load any job's jar which has some hadoop dependencies (i used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start job which accesses hdfs via source/sink/etc. or making checkpoints can be run on such hadoop-free cluster.

But when I start a job during config initialization for checkpoint I have "Hadoop is not in the classpath/dependencies.":

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
...


 What I've found seems in org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema registered and FALLBACK_FACTORY which should be loaded with hadoop factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when taskmanager is starting (when there should be no hadoop dependencies), so that should be ok.

so as I understand hadoop file system is not recongnised by flink if it was not loaded at the beginning, is it correct or maybe I just messed up with something / somewhere? 

Thanks,
Sasha


Reply | Threaded
Open this post in threaded view
|

Re: hadoop-free hdfs config

Till Rohrmann
Thanks for trying it out and letting us know.

Cheers,
Till

On Thu, Jan 11, 2018 at 9:56 AM, Oleksandr Baliev <[hidden email]> wrote:
Hi Till,

thanks for your reply and clarification! With RocksDBStateBackend btw the same story, looks like a wrapper over FsStateBackend:

01/11/2018 09:27:22 Job execution switched to status FAILING.
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)


Then I also changed url for fs state backend to file:// which is ok, but then I have the same issue in BucketingSink:

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
...<some our simple wrapper class call>.initializeState(...)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 more


I was using for tests clean "Without bundled Hadood" flink binaries and didn't change anything in configs.

Currently we have to persist checkpoints on "hdfs" so we will use some flink-shaded-hadoop2-uber*.jar anyway, thanks.

Best,
Sasha

2018-01-10 10:47 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Sasha,

you're right that if you want to access HDFS from the user code only it should be possible to use the Hadoop free Flink version and bundle the Hadoop dependencies with your user code. However, if you want to use Flink's file system state backend as you did, then you have to start the Flink cluster with the Hadoop dependency in its classpath. The reason is that the FsStateBackend is part of the Flink distribution and will be loaded using the system class loader.

One thing you could try out is to use the RocksDB state backend instead. Since the RocksDBStateBackend is loaded dynamically, I think it should use the Hadoop dependencies when trying to load the filesystem. 

Cheers,
Till

On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <[hidden email]> wrote:
Hello guys,

want to clarify for myself: since flink 1.4.0 allows to use hadoop-free distribution and dynamic hadoop dependencies loading, I suppose that if to download hadoop-free distribution, start cluster without any hadoop and then load any job's jar which has some hadoop dependencies (i used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start job which accesses hdfs via source/sink/etc. or making checkpoints can be run on such hadoop-free cluster.

But when I start a job during config initialization for checkpoint I have "Hadoop is not in the classpath/dependencies.":

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
...


 What I've found seems in org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema registered and FALLBACK_FACTORY which should be loaded with hadoop factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when taskmanager is starting (when there should be no hadoop dependencies), so that should be ok.

so as I understand hadoop file system is not recongnised by flink if it was not loaded at the beginning, is it correct or maybe I just messed up with something / somewhere? 

Thanks,
Sasha