Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property <property> <name>fs.defaultFS</name> <value>hdfs://myServerX:8020</value> </property> I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio |
Hi, Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf" On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
|
Hi Flavio, there is a file called "conf/flink-conf.yaml" Add a new line in the file with the following contents: fs.hdfs.hadoopconf: /path/to/your/hadoop/config This should fix the problem. Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader. What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines). I think the easiest approach is using Flink's configuration file. On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
|
fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
|
It represents the folder containing the hadoop config files. :)
Regards, Chiwan Park > On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote: > > fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)? > > On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote: > Hi Flavio, > > there is a file called "conf/flink-conf.yaml" > Add a new line in the file with the following contents: > > fs.hdfs.hadoopconf: /path/to/your/hadoop/config > > This should fix the problem. > Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader. > > What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines). > > I think the easiest approach is using Flink's configuration file. > > > On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote: > Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar? > > On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote: > Hi, > > Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf" > > On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote: > Hi to all, > I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. > The code I use is > myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString()); > > If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: > > Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) > at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) > at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) > ... 25 more > > The core-site.xml is present in the fat jar and contains the property > > <property> > <name>fs.defaultFS</name> > <value>hdfs://myServerX:8020</value> > </property> > > I compiled flink with the following command: > > mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos > > How can I fix that? > > Best, > Flavio > |
Do I have to put the hadoop conf file on each task manager or just on the job-manager?
On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <[hidden email]> wrote: It represents the folder containing the hadoop config files. :) |
You have to put it into all machines On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <[hidden email]> wrote:
|
You could also just qualify the HDFS URL, if that is simpler (put host and port of the namenode in there): "hdfs://myhost:40010/path/to/file" On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger <[hidden email]> wrote:
|
HI Flavio Here is the example from Marton: You can used env.writeAsText method directly. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(PerisitentKafkaSource(..)) .map(/* do you operations*/) .wirteAsText("hdfs://<namenode_name>:<namenode_port>/path/to/your/file"); Check out the relevant section of the streaming docs for more info. [1] On Thu, Jun 25, 2015 at 6:25 AM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |