writeAsCsv on HDFS

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

writeAsCsv on HDFS

Flavio Pompermaier
Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
The code I use is 
  myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());

If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:

Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more

The core-site.xml is present in the fat jar and contains the property

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://myServerX:8020</value>
  </property>

I compiled flink with the following command:

 mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

How can I fix that?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

rmetzger0
Hi,

Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"

On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
The code I use is 
  myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());

If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:

Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more

The core-site.xml is present in the fat jar and contains the property

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://myServerX:8020</value>
  </property>

I compiled flink with the following command:

 mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

How can I fix that?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

Flavio Pompermaier
Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?

On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
Hi,

Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"

On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
The code I use is 
  myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());

If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:

Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more

The core-site.xml is present in the fat jar and contains the property

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://myServerX:8020</value>
  </property>

I compiled flink with the following command:

 mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

How can I fix that?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

rmetzger0
Hi Flavio,

there is a file called "conf/flink-conf.yaml"
Add a new line in the file with the following contents:

fs.hdfs.hadoopconf: /path/to/your/hadoop/config

This should fix the problem.
Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.

What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).

I think the easiest approach is using Flink's configuration file.


On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?

On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
Hi,

Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"

On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
The code I use is 
  myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());

If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:

Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more

The core-site.xml is present in the fat jar and contains the property

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://myServerX:8020</value>
  </property>

I compiled flink with the following command:

 mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

How can I fix that?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

Flavio Pompermaier
fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?

On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
Hi Flavio,

there is a file called "conf/flink-conf.yaml"
Add a new line in the file with the following contents:

fs.hdfs.hadoopconf: /path/to/your/hadoop/config

This should fix the problem.
Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.

What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).

I think the easiest approach is using Flink's configuration file.


On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?

On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
Hi,

Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"

On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
The code I use is 
  myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());

If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:

Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more

The core-site.xml is present in the fat jar and contains the property

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://myServerX:8020</value>
  </property>

I compiled flink with the following command:

 mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

How can I fix that?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

Chiwan Park-2
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


> On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote:
>
> fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
>
> On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
> Hi Flavio,
>
> there is a file called "conf/flink-conf.yaml"
> Add a new line in the file with the following contents:
>
> fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>
> This should fix the problem.
> Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.
>
> What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).
>
> I think the easiest approach is using Flink's configuration file.
>
>
> On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
> Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
> Hi,
>
> Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"
>
> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
> Hi to all,
> I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
> The code I use is
>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>
> If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>
> Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
> at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
> ... 25 more
>
> The core-site.xml is present in the fat jar and contains the property
>
> <property>
>     <name>fs.defaultFS</name>
>     <value>hdfs://myServerX:8020</value>
>   </property>
>
> I compiled flink with the following command:
>
>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos
>
> How can I fix that?
>
> Best,
> Flavio
>

Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

Flavio Pompermaier
Do I have to put the hadoop conf file on each task manager or just on the job-manager?

On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <[hidden email]> wrote:
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


> On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote:
>
> fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
>
> On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
> Hi Flavio,
>
> there is a file called "conf/flink-conf.yaml"
> Add a new line in the file with the following contents:
>
> fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>
> This should fix the problem.
> Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.
>
> What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).
>
> I think the easiest approach is using Flink's configuration file.
>
>
> On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
> Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
> Hi,
>
> Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"
>
> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
> Hi to all,
> I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
> The code I use is
>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>
> If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>
> Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
>       at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>       at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>       at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>       at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>       ... 25 more
>
> The core-site.xml is present in the fat jar and contains the property
>
> <property>
>     <name>fs.defaultFS</name>
>     <value>hdfs://myServerX:8020</value>
>   </property>
>
> I compiled flink with the following command:
>
>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos
>
> How can I fix that?
>
> Best,
> Flavio
>



Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

rmetzger0
You have to put it into all machines

On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <[hidden email]> wrote:
Do I have to put the hadoop conf file on each task manager or just on the job-manager?

On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <[hidden email]> wrote:
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


> On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote:
>
> fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
>
> On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
> Hi Flavio,
>
> there is a file called "conf/flink-conf.yaml"
> Add a new line in the file with the following contents:
>
> fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>
> This should fix the problem.
> Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.
>
> What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).
>
> I think the easiest approach is using Flink's configuration file.
>
>
> On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
> Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
> Hi,
>
> Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"
>
> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
> Hi to all,
> I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
> The code I use is
>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>
> If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>
> Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
>       at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>       at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>       at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>       at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>       ... 25 more
>
> The core-site.xml is present in the fat jar and contains the property
>
> <property>
>     <name>fs.defaultFS</name>
>     <value>hdfs://myServerX:8020</value>
>   </property>
>
> I compiled flink with the following command:
>
>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos
>
> How can I fix that?
>
> Best,
> Flavio
>




Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

Stephan Ewen
You could also just qualify the HDFS URL, if that is simpler (put host and port of the namenode in there): "hdfs://myhost:40010/path/to/file"

On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger <[hidden email]> wrote:
You have to put it into all machines

On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <[hidden email]> wrote:
Do I have to put the hadoop conf file on each task manager or just on the job-manager?

On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <[hidden email]> wrote:
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


> On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote:
>
> fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
>
> On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
> Hi Flavio,
>
> there is a file called "conf/flink-conf.yaml"
> Add a new line in the file with the following contents:
>
> fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>
> This should fix the problem.
> Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.
>
> What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).
>
> I think the easiest approach is using Flink's configuration file.
>
>
> On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
> Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
> Hi,
>
> Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"
>
> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
> Hi to all,
> I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
> The code I use is
>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>
> If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>
> Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
>       at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>       at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>       at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>       at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>       ... 25 more
>
> The core-site.xml is present in the fat jar and contains the property
>
> <property>
>     <name>fs.defaultFS</name>
>     <value>hdfs://myServerX:8020</value>
>   </property>
>
> I compiled flink with the following command:
>
>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos
>
> How can I fix that?
>
> Best,
> Flavio
>





Reply | Threaded
Open this post in threaded view
|

Re: writeAsCsv on HDFS

hawin
HI  Flavio

Here is the example from Marton:
You can used env.writeAsText method directly. 


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(PerisitentKafkaSource(..))
      .map(/* do you operations*/)
      .wirteAsText("hdfs://<namenode_name>:<namenode_port>/path/to/your/file");

Check out the relevant section of the streaming docs for more info. [1]



On Thu, Jun 25, 2015 at 6:25 AM, Stephan Ewen <[hidden email]> wrote:
You could also just qualify the HDFS URL, if that is simpler (put host and port of the namenode in there): "hdfs://myhost:40010/path/to/file"

On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger <[hidden email]> wrote:
You have to put it into all machines

On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier <[hidden email]> wrote:
Do I have to put the hadoop conf file on each task manager or just on the job-manager?

On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park <[hidden email]> wrote:
It represents the folder containing the hadoop config files. :)

Regards,
Chiwan Park


> On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier <[hidden email]> wrote:
>
> fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)?
>
> On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger <[hidden email]> wrote:
> Hi Flavio,
>
> there is a file called "conf/flink-conf.yaml"
> Add a new line in the file with the following contents:
>
> fs.hdfs.hadoopconf: /path/to/your/hadoop/config
>
> This should fix the problem.
> Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader.
>
> What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines).
>
> I think the easiest approach is using Flink's configuration file.
>
>
> On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier <[hidden email]> wrote:
> Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar?
>
> On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger <[hidden email]> wrote:
> Hi,
>
> Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration "fs.hdfs.hadoopconf"
>
> On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier <[hidden email]> wrote:
> Hi to all,
> I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0.
> The code I use is
>   myDataset.writeAsCsv(new Path("hdfs:///tmp", "myFile.csv").toString());
>
> If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception:
>
> Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port.
>       at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258)
>       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309)
>       at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273)
>       at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
>       at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>       ... 25 more
>
> The core-site.xml is present in the fat jar and contains the property
>
> <property>
>     <name>fs.defaultFS</name>
>     <value>hdfs://myServerX:8020</value>
>   </property>
>
> I compiled flink with the following command:
>
>  mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos
>
> How can I fix that?
>
> Best,
> Flavio
>