Hi,
I have a batch job that I run on yarn that creates files in HDFS. I want to avoid running this job at all if the output already exists. So in my code (before submitting the job into yarn-session) I do this: String directory = "foo"; Path directory = new Path(directoryName); FileSystem fs = directory.getFileSystem(); if (!fs.exists(directory)) { // run the job } What I found is that this code apparently checks the 'wrong' file system. (I always get 'false' even if it exists in hdfs) I checked the API of the execution environment yet I was unable to get the 'correct' filesystem from there. What is the proper way to check this? -- Best regards / Met vriendelijke groeten,
Niels Basjes |
HI Niels,
Have you tried specifying the fully-qualified path? The default is the local file system. For example, hdfs:///path/to/foo If that doesn't work, do you have the same Hadoop configuration on the machine where you test? Cheers, Max On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes <[hidden email]> wrote: > Hi, > > I have a batch job that I run on yarn that creates files in HDFS. > I want to avoid running this job at all if the output already exists. > > So in my code (before submitting the job into yarn-session) I do this: > > String directory = "foo"; > > Path directory = new Path(directoryName); > FileSystem fs = directory.getFileSystem(); > > if (!fs.exists(directory)) { > > // run the job > > } > > What I found is that this code apparently checks the 'wrong' file system. (I > always get 'false' even if it exists in hdfs) > > I checked the API of the execution environment yet I was unable to get the > 'correct' filesystem from there. > > What is the proper way to check this? > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
In reply to this post by Niels Basjes
Hi Niels, I assume the directoryName you are passing doesn't have the file system prefix (hdfs:// or s3://, ...) specified. In those cases, Path.getFileSystem() is looking up the default file system prefix from the configuration. Probably the environment where you are submitting the job from doesn't have the correct Flink configuration. How are you starting the Flink job? Does the conf/flink-conf.yaml have the correct hdfs configuration? Regards, Robert On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes <[hidden email]> wrote:
|
In reply to this post by Maximilian Michels
Ooops. Looks like Google Mail / Apache / the internet needs 13 minutes to deliver an email. Sorry for double answering. On Fri, Aug 19, 2016 at 3:07 PM, Maximilian Michels <[hidden email]> wrote: HI Niels, |
Yes, that did the trick. Thanks. I was using a relative path without any FS specification. So my path was "foo" and on the cluster this resolves to "hdfs:///user/nbasjes/foo" Locally this resolved to "file:///home/nbasjes/foo" and hence the mismatch I was looking at. For now I can work with this fine. Yet I think having a method on the ExecutionEnvironment instance 'getFileSystem()' that would return me the actual filesystem against which my job "is going to be executed" would solve this in an easier way. That way I can use a relative path (i.e. "foo") and run it anywhere (local, Yarn, Mesos, etc.) without any problems. What do you guys think? Is this desirable? Possible? Niels. On Fri, Aug 19, 2016 at 3:22 PM, Robert Metzger <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels,
The problem is that such method only works reliably if the cluster configuration, e.g. Flink and Hadoop config files, are present on the client machine. Also, the environment variables have to be set correctly. This is usually not the case when working from the IDE. But seems like your code is really in the jar which you execute against /bin/flink, so everything should be configured then. If so, you can add the following before your existing code: Configuration config = GlobalConfiguration.loadConfiguration(); FileSystem.setDefaultScheme(config); Then you're good to go. We could think about adding this code to ExecutionEnvironment. The main problem, however, is that the location of the config file has to be supplied when working from an IDE, where the environment variables are usually not set.* Cheers, Max * You can use GlobalConfiguration.loadConfiguration("/path/to/config/directory") from the IDE to load the config. Alternatively, set FLINK_CONF_DIR environment variable. On Mon, Aug 22, 2016 at 10:55 AM, Niels Basjes <[hidden email]> wrote: > Yes, that did the trick. Thanks. > I was using a relative path without any FS specification. > So my path was "foo" and on the cluster this resolves to > "hdfs:///user/nbasjes/foo" > Locally this resolved to "file:///home/nbasjes/foo" and hence the mismatch I > was looking at. > > For now I can work with this fine. > > Yet I think having a method on the ExecutionEnvironment instance > 'getFileSystem()' that would return me the actual filesystem against which > my job "is going to be executed" would solve this in an easier way. That way > I can use a relative path (i.e. "foo") and run it anywhere (local, Yarn, > Mesos, etc.) without any problems. > > What do you guys think? > Is this desirable? Possible? > > Niels. > > > > On Fri, Aug 19, 2016 at 3:22 PM, Robert Metzger <[hidden email]> wrote: >> >> Ooops. Looks like Google Mail / Apache / the internet needs 13 minutes to >> deliver an email. >> Sorry for double answering. >> >> On Fri, Aug 19, 2016 at 3:07 PM, Maximilian Michels <[hidden email]> >> wrote: >>> >>> HI Niels, >>> >>> Have you tried specifying the fully-qualified path? The default is the >>> local file system. >>> >>> For example, hdfs:///path/to/foo >>> >>> If that doesn't work, do you have the same Hadoop configuration on the >>> machine where you test? >>> >>> Cheers, >>> Max >>> >>> On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes <[hidden email]> wrote: >>> > Hi, >>> > >>> > I have a batch job that I run on yarn that creates files in HDFS. >>> > I want to avoid running this job at all if the output already exists. >>> > >>> > So in my code (before submitting the job into yarn-session) I do this: >>> > >>> > String directory = "foo"; >>> > >>> > Path directory = new Path(directoryName); >>> > FileSystem fs = directory.getFileSystem(); >>> > >>> > if (!fs.exists(directory)) { >>> > >>> > // run the job >>> > >>> > } >>> > >>> > What I found is that this code apparently checks the 'wrong' file >>> > system. (I >>> > always get 'false' even if it exists in hdfs) >>> > >>> > I checked the API of the execution environment yet I was unable to get >>> > the >>> > 'correct' filesystem from there. >>> > >>> > What is the proper way to check this? >>> > >>> > >>> > -- >>> > Best regards / Met vriendelijke groeten, >>> > >>> > Niels Basjes >> >> > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Forgot to mention, this is on the master. For Flink < 1.2.x, you will
have to use GlobalConfiguration.get(); On Wed, Aug 24, 2016 at 12:23 PM, Maximilian Michels <[hidden email]> wrote: > Hi Niels, > > The problem is that such method only works reliably if the cluster > configuration, e.g. Flink and Hadoop config files, are present on the > client machine. Also, the environment variables have to be set > correctly. This is usually not the case when working from the IDE. But > seems like your code is really in the jar which you execute against > /bin/flink, so everything should be configured then. If so, you can > add the following before your existing code: > > Configuration config = GlobalConfiguration.loadConfiguration(); > FileSystem.setDefaultScheme(config); > > Then you're good to go. We could think about adding this code to > ExecutionEnvironment. The main problem, however, is that the location > of the config file has to be supplied when working from an IDE, where > the environment variables are usually not set.* > > Cheers, > Max > > * You can use GlobalConfiguration.loadConfiguration("/path/to/config/directory") > from the IDE to load the config. Alternatively, set FLINK_CONF_DIR > environment variable. > > On Mon, Aug 22, 2016 at 10:55 AM, Niels Basjes <[hidden email]> wrote: >> Yes, that did the trick. Thanks. >> I was using a relative path without any FS specification. >> So my path was "foo" and on the cluster this resolves to >> "hdfs:///user/nbasjes/foo" >> Locally this resolved to "file:///home/nbasjes/foo" and hence the mismatch I >> was looking at. >> >> For now I can work with this fine. >> >> Yet I think having a method on the ExecutionEnvironment instance >> 'getFileSystem()' that would return me the actual filesystem against which >> my job "is going to be executed" would solve this in an easier way. That way >> I can use a relative path (i.e. "foo") and run it anywhere (local, Yarn, >> Mesos, etc.) without any problems. >> >> What do you guys think? >> Is this desirable? Possible? >> >> Niels. >> >> >> >> On Fri, Aug 19, 2016 at 3:22 PM, Robert Metzger <[hidden email]> wrote: >>> >>> Ooops. Looks like Google Mail / Apache / the internet needs 13 minutes to >>> deliver an email. >>> Sorry for double answering. >>> >>> On Fri, Aug 19, 2016 at 3:07 PM, Maximilian Michels <[hidden email]> >>> wrote: >>>> >>>> HI Niels, >>>> >>>> Have you tried specifying the fully-qualified path? The default is the >>>> local file system. >>>> >>>> For example, hdfs:///path/to/foo >>>> >>>> If that doesn't work, do you have the same Hadoop configuration on the >>>> machine where you test? >>>> >>>> Cheers, >>>> Max >>>> >>>> On Thu, Aug 18, 2016 at 2:02 PM, Niels Basjes <[hidden email]> wrote: >>>> > Hi, >>>> > >>>> > I have a batch job that I run on yarn that creates files in HDFS. >>>> > I want to avoid running this job at all if the output already exists. >>>> > >>>> > So in my code (before submitting the job into yarn-session) I do this: >>>> > >>>> > String directory = "foo"; >>>> > >>>> > Path directory = new Path(directoryName); >>>> > FileSystem fs = directory.getFileSystem(); >>>> > >>>> > if (!fs.exists(directory)) { >>>> > >>>> > // run the job >>>> > >>>> > } >>>> > >>>> > What I found is that this code apparently checks the 'wrong' file >>>> > system. (I >>>> > always get 'false' even if it exists in hdfs) >>>> > >>>> > I checked the API of the execution environment yet I was unable to get >>>> > the >>>> > 'correct' filesystem from there. >>>> > >>>> > What is the proper way to check this? >>>> > >>>> > >>>> > -- >>>> > Best regards / Met vriendelijke groeten, >>>> > >>>> > Niels Basjes >>> >>> >> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes |
Free forum by Nabble | Edit this page |