streaming hdfs sub folders

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

streaming hdfs sub folders

Martin Neumann
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin

Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Stephan Ewen
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin


Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Martin Neumann
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin



Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Martin Neumann
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin




Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Stephan Ewen
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin





Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Martin Neumann
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin






Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Martin Neumann
I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? 
See the code below:

public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.fromElements("00", "01", "02","03","22","23")
.flatMap(new FileExtractor())
.filter(new LocationFiter())
.flatMap(new PreProcessEndSongClean())
.writeAsCsv(outPath);

env.execute("something");
}
private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{

@Override
public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws Exception {
AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
}
}
}

On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <[hidden email]> wrote:
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin







Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

rmetzger0
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <[hidden email]> wrote:
I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? 
See the code below:

public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.fromElements("00", "01", "02","03","22","23")
.flatMap(new FileExtractor())
.filter(new LocationFiter())
.flatMap(new PreProcessEndSongClean())
.writeAsCsv(outPath);

env.execute("something");
}
private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{

@Override
public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws Exception {
AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
}
}
}

On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <[hidden email]> wrote:
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin








Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Martin Neumann
I'm not very familiar with the inner workings of the InputFomat's. calling .open() got rid of the Nullpointer but the stream still produces no output.

As a temporary solution I wrote a batch job that just unions all the different datasets and puts them (sorted) into a single folder.

cheers Martin 

On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <[hidden email]> wrote:
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <[hidden email]> wrote:
I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? 
See the code below:

public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.fromElements("00", "01", "02","03","22","23")
.flatMap(new FileExtractor())
.filter(new LocationFiter())
.flatMap(new PreProcessEndSongClean())
.writeAsCsv(outPath);

env.execute("something");
}
private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{

@Override
public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws Exception {
AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
}
}
}

On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <[hidden email]> wrote:
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin









Reply | Threaded
Open this post in threaded view
|

Re: streaming hdfs sub folders

Stephan Ewen
Hi!

Have a look at the class-level comments in "InputFormat". They should describe how input formats first generate splits (for parallelization) on the master, and the workers open each split.

So you need something like this:
AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
        for (FileInputSplit split : avroInputFormat.createInputSplits()) {
            avroInputFormat.open(split);
            while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
} } }

Hope that helps.

Stephan



On Tue, Feb 23, 2016 at 12:04 PM, Martin Neumann <[hidden email]> wrote:
I'm not very familiar with the inner workings of the InputFomat's. calling .open() got rid of the Nullpointer but the stream still produces no output.

As a temporary solution I wrote a batch job that just unions all the different datasets and puts them (sorted) into a single folder.

cheers Martin 

On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <[hidden email]> wrote:
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <[hidden email]> wrote:
I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? 
See the code below:

public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.fromElements("00", "01", "02","03","22","23")
.flatMap(new FileExtractor())
.filter(new LocationFiter())
.flatMap(new PreProcessEndSongClean())
.writeAsCsv(outPath);

env.execute("something");
}
private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{

@Override
public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws Exception {
AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);
while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
}
}
}

On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <[hidden email]> wrote:
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge).

I give it a try and see how much performance I loose.

cheers Martin

On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
Martin,

I think you can approximate this in an easy way like this:

  - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list.
  - Then you have a source "env.fromElements(paths)".
  - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements)

That gives you pretty much full control about in which order the files should be processed.

What do you think?

Stephan


On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) 
See the code Snipped below:

DataStream<EndSongCleanedPq> inStream =
env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath), EndSongCleanedPq.class), filePath);

On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them.

I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events.
I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir?

Also can you point me to some documentation or something where I can see how to set the Flag?

cheers Martin




On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read.

Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work.

Is your training program a DataStream or a DataSet program?`

Stephan

On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
Hi,

I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS.

Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. 

Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01.

Has anyone an idea on how to do this?

cheers Martin