Hi,
I have a streaming machine learning job that usually runs with input from kafka. To tweak the models I need to run on some old data from HDFS. Unfortunately the data on HDFS is spread out over several subfolders. Basically I have a datum with one subfolder for each hour within those are the actual input files I'm interested in. Basically what I need is a source that goes through the subfolder in order and streams the files into the program. I'm using event timestamps so all files in 00 need to be processed before 01. Has anyone an idea on how to do this? cheers Martin |
Hi! Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read. Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work. Is your training program a DataStream or a DataSet program?` Stephan On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <[hidden email]> wrote:
|
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them. I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events. I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir? Also can you point me to some documentation or something where I can see how to set the Flag? cheers Martin On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <[hidden email]> wrote:
|
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) See the code Snipped below: DataStream<EndSongCleanedPq> inStream = On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <[hidden email]> wrote:
|
Martin, I think you can approximate this in an easy way like this: - On the client, you traverse your directories to collect all files that you need, collect all file paths in a list. - Then you have a source "env.fromElements(paths)". - Then you flatMap and in the FlatMap, run the Avro input format (open it per path, then call it to get all elements) That gives you pretty much full control about in which order the files should be processed. What do you think? Stephan On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <[hidden email]> wrote:
|
I guess I need to set the parallelism for the FlatMap to 1 to make sure I read one file at a time. The downside I see with this is that I will be not able to read in parallel from HDFS (and the files are Huge). I give it a try and see how much performance I loose. cheers Martin On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <[hidden email]> wrote:
|
I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? See the code below: public static void main(String[] args) throws Exception { private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{ On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <[hidden email]> wrote:
|
Hi Martin, where is the null pointer exception thrown? I think you didn't call the open() method of the AvroInputFormat. Maybe that's the issue. On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <[hidden email]> wrote:
|
I'm not very familiar with the inner workings of the InputFomat's. calling .open() got rid of the Nullpointer but the stream still produces no output. As a temporary solution I wrote a batch job that just unions all the different datasets and puts them (sorted) into a single folder. cheers Martin On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <[hidden email]> wrote:
|
Hi! Have a look at the class-level comments in "InputFormat". They should describe how input formats first generate splits (for parallelization) on the master, and the workers open each split. So you need something like this: AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class); for (FileInputSplit split : avroInputFormat.createInputSplits()) { avroInputFormat.open(split); while (! avroInputFormat.reachedEnd()){ Hope that helps. Stephan On Tue, Feb 23, 2016 at 12:04 PM, Martin Neumann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |