I have many lzo files on HDFS in such path format: /logs/{id}/{date}/xxx[1-100].lzo /logs/a/ds=2018-01-01/xxx1.lzo /logs/b/ds=2018-01-01/xxx1.lzo ... /logs/z/ds=2018-01-02/xxx1.lzo ... /logs/z/ds=2020-05-01/xxx100.lzo I'am using Flink Dataset to read those files by a range of {date} and apply some transformation. Since Flink official does not provide lzo inputformat so I use HadoopInputFormat to implement this. I currently cannot find a good way to give one fileglob path to include all files I need so I have to do it in a loop. for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) { FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo")); for (FileStatus fileStatus: fileStatuses) { String path = fileStatus.getPath().toString(); lzoFiles.add(path) } } // ... // I have to initialize the source like that Job job = new Job(conf); org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, String.join(",", lzoFiles)); lzoSets = env.createInput(HadoopInputs.createHadoopInput( new LzoTextInputFormat(), LongWritable.class, Text.class, job)).name("lzo source").map(x -> x.f1.toString()); However when I submit this job to Flink. If I have a relatively long date range files to read, the job submission take so much time(10mins, even 20mins and more, I have already increase akka.timeout and web.timeout) to be prepared which I cannot accept this. It seems Flink take so much time to optimize the execution plan. Is there and good approach to make my program prepared quickly? |
Do you have any logs that could help us identify the issue? How many files is a long date range? In general, you could try out the same program with the DataStream API (use StreamExecutionEnvironment#readFile [1] with PROCESS_ONCE to get a behavior equivalent to batch). DataStreams are only slightly optimized. Since DataSet API will be eventually deprecated, that is also the recommended way if you don't need feature specific to DataSet API. On Tue, May 19, 2020 at 7:48 AM ysnakie <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |